diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 6f2e6872df10..bbc018052362 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -672,6 +672,9 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con const auto is_secondary_query = context_->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY; + const auto catalog_uuid = table_metadata.getTableUUID(); + const UUID table_uuid = catalog_uuid ? parseFromString(*catalog_uuid) : UUIDHelpers::Nil; + std::string cluster_name = configuration->isClusterSupported() ? settings[DatabaseDataLakeSetting::object_storage_cluster].value : ""; if (cluster_name.empty() && can_use_parallel_replicas && !is_secondary_query) @@ -680,8 +683,8 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con auto storage_cluster = std::make_shared( cluster_name, configuration, - configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name))), - StorageID(getDatabaseName(), name), + configuration->createObjectStorage(context_copy, /* is_readonly */ false, catalog->getCredentialsConfigurationCallback(StorageID(getDatabaseName(), name, table_uuid))), + StorageID(getDatabaseName(), name, table_uuid), /* columns */columns, /* constraints */ConstraintsDescription{}, /* partition_by */nullptr, diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index d49efa86211b..2c29b2d1836d 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -68,6 +68,9 @@ class TableMetadata void setDataLakeSpecificProperties(std::optional && metadata); std::optional getDataLakeSpecificProperties() const; + void setTableUUID(const std::string & uuid_) { table_uuid = uuid_; } + std::optional getTableUUID() const { return table_uuid; } + bool requiresLocation() const { return with_location; } bool requiresSchema() const { return with_schema; } bool requiresCredentials() const { return with_storage_credentials; } @@ -126,6 +129,7 @@ class TableMetadata std::optional data_lake_specific_metadata; std::string reason_why_table_is_not_readable; + std::optional table_uuid; bool is_default_readable_table = true; diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index d7bf5ff9dd3c..7c8448982d66 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -968,6 +968,9 @@ bool RestCatalog::getTableMetadataImpl( } } + if (metadata_object->has("table-uuid")) + result.setTableUUID(metadata_object->get("table-uuid").extract()); + return true; } diff --git a/src/Databases/DataLake/UnityCatalog.cpp b/src/Databases/DataLake/UnityCatalog.cpp index 107722dc001b..05709fd14c89 100644 --- a/src/Databases/DataLake/UnityCatalog.cpp +++ b/src/Databases/DataLake/UnityCatalog.cpp @@ -271,6 +271,9 @@ bool UnityCatalog::tryGetTableMetadata( LOG_DEBUG(log, "Doesn't require schema"); } + if (hasValueAndItsNotNone("table_id", object)) + result.setTableUUID(object->get("table_id").extract()); + if (result.isDefaultReadableTable() && result.requiresCredentials()) getCredentials(object->get("table_id"), result); diff --git a/src/Formats/FormatFilterInfo.cpp b/src/Formats/FormatFilterInfo.cpp index ec7bb5f4443b..45bc96269dba 100644 --- a/src/Formats/FormatFilterInfo.cpp +++ b/src/Formats/FormatFilterInfo.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -9,6 +10,8 @@ #include #include +#include + namespace DB { @@ -18,6 +21,11 @@ namespace ErrorCodes extern const int ICEBERG_SPECIFICATION_VIOLATION; } +namespace Setting +{ + extern const SettingsBool use_query_condition_cache; +} + void ColumnMapper::setStorageColumnEncoding(std::unordered_map && storage_encoding_) { chassert(storage_encoding.empty()); @@ -60,6 +68,13 @@ FormatFilterInfo::FormatFilterInfo( , prewhere_info(std::move(prewhere_info_)) , column_mapper(column_mapper_) { + bool use_query_condition_cache = context_->getSettingsRef()[Setting::use_query_condition_cache]; + if (use_query_condition_cache && filter_actions_dag) + { + const auto & outputs = filter_actions_dag->getOutputs(); + if (outputs.size() == 1 && VirtualColumnUtils::isDeterministic(outputs[0])) + condition_hash = filter_actions_dag->getHash(); + } } FormatFilterInfo::FormatFilterInfo() = default; diff --git a/src/Formats/FormatFilterInfo.h b/src/Formats/FormatFilterInfo.h index cd80aa4d306e..7f81d04c7a10 100644 --- a/src/Formats/FormatFilterInfo.h +++ b/src/Formats/FormatFilterInfo.h @@ -72,6 +72,7 @@ struct FormatFilterInfo ColumnMapperPtr column_mapper; + std::optional condition_hash; private: /// For lazily initializing the fields above. std::once_flag init_flag; diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index 3ed63fdbbaa2..a4cde45cbd38 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -10,6 +10,7 @@ namespace DB { + ChunkInfoRowNumbers::ChunkInfoRowNumbers(size_t row_num_offset_, std::optional applied_filter_) : row_num_offset(row_num_offset_), applied_filter(std::move(applied_filter_)) { } diff --git a/src/Processors/Formats/IInputFormat.h b/src/Processors/Formats/IInputFormat.h index ab693ba7a212..3ce1e6d1fbe1 100644 --- a/src/Processors/Formats/IInputFormat.h +++ b/src/Processors/Formats/IInputFormat.h @@ -54,6 +54,7 @@ struct FileBucketInfo virtual void deserialize(ReadBuffer & buffer) = 0; virtual String getIdentifier() const = 0; virtual String getFormatName() const = 0; + virtual std::shared_ptr filterByMatchingRowGroups(const std::vector & matching_row_groups) const = 0; virtual ~FileBucketInfo() = default; }; @@ -128,6 +129,8 @@ class IInputFormat : public ISource void needOnlyCount() { need_only_count = true; } + virtual std::optional, size_t>> getMatchedBuckets() const { return std::nullopt; } + protected: ReadBuffer & getReadBuffer() const { chassert(in); return *in; } diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 2a88c7f000c9..28c4221839c8 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -638,6 +638,22 @@ ParquetFileBucketInfo::ParquetFileBucketInfo(const std::vector & row_gro { } +std::shared_ptr ParquetFileBucketInfo::filterByMatchingRowGroups(const std::vector & matching_row_groups) const +{ + if (matching_row_groups.empty()) + return nullptr; + if (row_group_ids.empty()) + return std::make_shared(matching_row_groups); + std::unordered_set matching_set(matching_row_groups.begin(), matching_row_groups.end()); + std::vector filtered; + for (size_t rg : row_group_ids) + if (matching_set.contains(rg)) + filtered.push_back(rg); + if (filtered.empty()) + return nullptr; + return std::make_shared(std::move(filtered)); +} + void registerParquetFileBucketInfo(std::unordered_map & instances) { instances.emplace("Parquet", std::make_shared()); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 1fea5f0609c1..5ec3e69a88f1 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -64,6 +64,7 @@ struct ParquetFileBucketInfo : public FileBucketInfo { return "Parquet"; } + std::shared_ptr filterByMatchingRowGroups(const std::vector & matching_row_groups) const override; }; using ParquetFileBucketInfoPtr = std::shared_ptr; diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp index c9498e6dd8c0..7e41165b6f44 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.cpp @@ -141,6 +141,32 @@ Chunk ParquetV3BlockInputFormat::read() return std::move(res.chunk); } +std::optional, size_t>> ParquetV3BlockInputFormat::getMatchedBuckets() const +{ + if (!reader) + return std::nullopt; + std::vector matched; + for (const auto & row_group : reader->reader.row_groups) + { + if (!row_group.need_to_process) + continue; + + bool produced_rows = false; + for (const auto & subgroup : row_group.subgroups) + { + if (subgroup.filter.rows_pass > 0) + { + produced_rows = true; + break; + } + } + + if (produced_rows) + matched.push_back(row_group.row_group_idx); + } + return std::make_pair(std::move(matched), reader->reader.file_metadata.row_groups.size()); +} + void ParquetV3BlockInputFormat::setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) { if (reader) diff --git a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h index dffd7c28605e..c93b5c3b3af0 100644 --- a/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetV3BlockInputFormat.h @@ -38,6 +38,8 @@ class ParquetV3BlockInputFormat : public IInputFormat void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override; + std::optional, size_t>> getMatchedBuckets() const override; + private: Chunk read() override; diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp index 827c53b3ed6b..aaf19b71e89f 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.cpp @@ -28,6 +28,7 @@ namespace Setting ReadFromObjectStorageStep::ReadFromObjectStorageStep( + const StorageID & storage_id_, ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration_, const Names & columns_to_read, @@ -42,6 +43,7 @@ ReadFromObjectStorageStep::ReadFromObjectStorageStep( size_t max_block_size_, size_t num_streams_) : SourceStepWithFilter(std::make_shared(info_.source_header), columns_to_read, query_info_, storage_snapshot_, context_) + , storage_id(storage_id_) , object_storage(object_storage_) , configuration(configuration_) , info(std::move(info_)) @@ -110,6 +112,7 @@ void ReadFromObjectStorageStep::initializePipeline(QueryPipelineBuilder & pipeli for (size_t i = 0; i < num_streams; ++i) { auto source = std::make_shared( + storage_id, getName(), object_storage, configuration, diff --git a/src/Processors/QueryPlan/ReadFromObjectStorageStep.h b/src/Processors/QueryPlan/ReadFromObjectStorageStep.h index 2b45cb29b581..b3a2bdb0cce3 100644 --- a/src/Processors/QueryPlan/ReadFromObjectStorageStep.h +++ b/src/Processors/QueryPlan/ReadFromObjectStorageStep.h @@ -12,6 +12,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter { public: ReadFromObjectStorageStep( + const StorageID & storage_id_, ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration_, const Names & columns_to_read, @@ -46,6 +47,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter InputOrderInfoPtr getDataOrder() const; private: + StorageID storage_id; ObjectStoragePtr object_storage; StorageObjectStorageConfigurationPtr configuration; std::shared_ptr iterator_wrapper; diff --git a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp index d02094697ef2..be48c77381db 100644 --- a/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp +++ b/src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFileIterator.cpp @@ -347,20 +347,19 @@ ProcessedManifestFileEntryPtr ManifestFileIterator::processRow(size_t row_index) const auto schema_id_opt = schema_processor_ptr->tryGetSchemaIdForSnapshot(resolved_snapshot_id); if (!schema_id_opt.has_value()) { - /// Error logged but not thrown to avoid breaking whole query because of backward compatibility reasons. - /// That's actually an error because it can lead to incorrect query results, so we are creating an exception to put it to system.error_log. - try - { - throw Exception( - ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, - "Cannot read Iceberg table: manifest file '{}' has entry with snapshot_id '{}' for which write file schema is unknown", - manifest_file_name, - resolved_snapshot_id); - } - catch (const Exception &) - { - tryLogCurrentException("ICEBERG_SPECIFICATION_VIOLATION", "", LogsLevel::error); - } + /// This is expected when the referenced snapshot was expired by the catalog (snapshot expiry is a + /// normal Iceberg housekeeping operation). For example, after a compaction ("replace" operation), + /// the new snapshot's manifest list inherits manifests from the now-expired parent snapshot, and + /// those manifests still carry the original snapshot_id. The manifest file's own Avro header + /// records the correct schema_id for the data files it describes, so falling back to + /// manifest_schema_id is safe and correct in this case. + LOG_DEBUG( + getLogger("ManifestFileIterator"), + "Manifest file '{}' has entry with snapshot_id '{}' whose snapshot metadata is not present " + "(snapshot may have been expired by the catalog). Falling back to manifest schema_id {}.", + path_to_manifest_file, + resolved_snapshot_id, + manifest_schema_id); } const auto resolved_schema_id = schema_id_opt.has_value() ? *schema_id_opt : manifest_schema_id; diff --git a/src/Storages/ObjectStorage/IObjectIterator.cpp b/src/Storages/ObjectStorage/IObjectIterator.cpp index 6ae0373c2baf..13c0ea82a934 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.cpp +++ b/src/Storages/ObjectStorage/IObjectIterator.cpp @@ -1,7 +1,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -107,13 +109,19 @@ ObjectIteratorSplitByBuckets::ObjectIteratorSplitByBuckets( ObjectIterator iterator_, const String & format_, ObjectStoragePtr object_storage_, - const ContextPtr & context_) + const ContextPtr & context_, + const StorageID & storage_id_, + FormatFilterInfoPtr format_filter_info_) : WithContext(context_) , iterator(iterator_) , format(format_) , object_storage(object_storage_) , format_settings(getFormatSettings(context_)) + , storage_id(storage_id_) + , format_filter_info(std::move(format_filter_info_)) { + if (format_filter_info && format_filter_info->condition_hash) + query_condition_cache = context_->getQueryConditionCache(); } ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) @@ -127,13 +135,43 @@ ObjectInfoPtr ObjectIteratorSplitByBuckets::next(size_t id) auto splitter = FormatFactory::instance().getSplitter(format); if (splitter) { + std::vector matching_row_groups; + bool has_cache_entry = false; + if (query_condition_cache) + { + auto matching_marks = query_condition_cache->read( + storage_id.uuid, + last_object_info->getFileName(), + *format_filter_info->condition_hash); + if (matching_marks.has_value()) + { + has_cache_entry = true; + const auto & marks = *matching_marks; + for (size_t i = 0; i < marks.size(); ++i) + if (marks[i]) + matching_row_groups.push_back(i); + if (matching_row_groups.empty()) + continue; + } + } + auto buffer = createReadBuffer(last_object_info->relative_path_with_metadata, object_storage, getContext(), log); size_t bucket_size = getContext()->getSettingsRef()[Setting::cluster_table_function_buckets_batch_size]; - auto file_bucket_info = splitter->splitToBuckets(bucket_size, *buffer, format_settings); - for (const auto & file_bucket : file_bucket_info) + auto file_bucket_infos = splitter->splitToBuckets(bucket_size, *buffer, format_settings); + for (const auto & file_bucket : file_bucket_infos) { auto copy_object_info = *last_object_info; - copy_object_info.file_bucket_info = file_bucket; + if (has_cache_entry) + { + auto filtered = file_bucket->filterByMatchingRowGroups(matching_row_groups); + if (!filtered) + continue; + copy_object_info.file_bucket_info = std::move(filtered); + } + else + { + copy_object_info.file_bucket_info = file_bucket; + } pending_objects_info.push(std::make_shared(copy_object_info)); } } diff --git a/src/Storages/ObjectStorage/IObjectIterator.h b/src/Storages/ObjectStorage/IObjectIterator.h index 83c645e915fd..0c05d0831fd0 100644 --- a/src/Storages/ObjectStorage/IObjectIterator.h +++ b/src/Storages/ObjectStorage/IObjectIterator.h @@ -2,6 +2,9 @@ #include #include #include +#include +#include +#include #include #include #include @@ -109,7 +112,9 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext ObjectIterator iterator_, const String & format_, ObjectStoragePtr object_storage_, - const ContextPtr & context_); + const ContextPtr & context_, + const StorageID & storage_id_ = StorageID::createEmpty(), + FormatFilterInfoPtr format_filter_info_ = nullptr); ObjectInfoPtr next(size_t) override; size_t estimatedKeysCount() override { return iterator->estimatedKeysCount(); } @@ -120,6 +125,9 @@ class ObjectIteratorSplitByBuckets : public IObjectIterator, private WithContext String format; ObjectStoragePtr object_storage; FormatSettings format_settings; + StorageID storage_id; + FormatFilterInfoPtr format_filter_info; + QueryConditionCachePtr query_condition_cache; std::queue pending_objects_info; const LoggerPtr log = getLogger("GlobIterator"); diff --git a/src/Storages/ObjectStorage/StorageObjectStorage.cpp b/src/Storages/ObjectStorage/StorageObjectStorage.cpp index 18caf93c588f..446fe89076b8 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorage.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorage.cpp @@ -507,6 +507,7 @@ void StorageObjectStorage::read( configuration->modifyFormatSettings(modified_format_settings.value(), *local_context); auto read_step = std::make_unique( + storage_id, object_storage, configuration, column_names, diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 07ef358d0396..a18977a9756c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -1,6 +1,7 @@ #include -#include #include +#include +#include #include #include #include @@ -38,6 +39,8 @@ #include #include #include +#include +#include #include #include #include @@ -51,6 +54,9 @@ #include #include +#include +#include + namespace fs = std::filesystem; namespace ProfileEvents { @@ -98,6 +104,7 @@ namespace ErrorCodes } StorageObjectStorageSource::StorageObjectStorageSource( + const StorageID & storage_id_, String name_, ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration_, @@ -111,6 +118,7 @@ StorageObjectStorageSource::StorageObjectStorageSource( FormatFilterInfoPtr format_filter_info_, bool need_only_count_) : ISource(std::make_shared(info.source_header), false) + , storage_id(storage_id_) , name(std::move(name_)) , object_storage(object_storage_) , configuration(configuration_) @@ -524,6 +532,63 @@ Chunk StorageObjectStorageSource::generate() return chunk; } + else if (format_filter_info->condition_hash) + { + const auto & object_info = reader.getObjectInfo(); + try + { + const auto * input_format = reader.getInputFormat(); + if (input_format) + { + auto buckets_opt = input_format->getMatchedBuckets(); + + if (buckets_opt.has_value()) + { + const auto & matched_groups = buckets_opt->first; + size_t total_groups = buckets_opt->second; + + std::unordered_set matched_set(matched_groups.begin(), matched_groups.end()); + MarkRanges unmatched_ranges; + for (size_t i = 0; i < total_groups; ++i) + { + if (!matched_set.contains(i)) + { + if (!unmatched_ranges.empty() && unmatched_ranges.back().end == i) + unmatched_ranges.back().end++; + else + unmatched_ranges.push_back({UInt64(i), UInt64(i + 1)}); + } + } + + size_t unmatched_count = total_groups - matched_groups.size(); + LOG_DEBUG(log, + "Query condition cache: storing {}/{} unmatched row groups for condition {} in file {}.", + unmatched_count, + total_groups, + format_filter_info->filter_actions_dag->dumpNames(), + object_info->getFileName()); + + if (!unmatched_ranges.empty()) + { + auto query_condition_cache = Context::getGlobalContextInstance()->getQueryConditionCache(); + query_condition_cache->write( + storage_id.uuid, + object_info->getFileName(), + *format_filter_info->condition_hash, + format_filter_info->filter_actions_dag->dumpNames(), + unmatched_ranges, + total_groups, + false + ); + } + } + } + } + catch (...) + { + tryLogCurrentException(getLogger("StorageObjectStorageSource"), "Failed to write to query condition cache"); + } + } if (reader.getInputFormat() && read_context->getSettingsRef()[Setting::use_cache_for_count_from_files] && !format_filter_info->filter_actions_dag) @@ -562,6 +627,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade { return createReader( 0, + storage_id, file_iterator, configuration, object_storage, @@ -578,6 +644,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReader( size_t processor, + const StorageID & storage_id, const std::shared_ptr & file_iterator, const StorageObjectStorageConfigurationPtr & configuration, const ObjectStoragePtr & object_storage, @@ -594,11 +661,12 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade ObjectInfoPtr object_info; auto query_settings = configuration->getQuerySettings(context_); - bool not_a_path = false; + QueryConditionCachePtr query_condition_cache; + if (format_filter_info && format_filter_info->condition_hash) + query_condition_cache = Context::getGlobalContextInstance()->getQueryConditionCache(); - do + while (true) { - not_a_path = false; object_info = file_iterator->next(processor); if (!object_info) @@ -609,7 +677,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade auto retry_after_us = object_info->relative_path_with_metadata.getCommand().getRetryAfterUs(); if (retry_after_us.has_value()) { - not_a_path = true; /// TODO: Make asyncronous waiting without sleep in thread /// Now this sleep is on executor node in worker thread /// Does not block query initiator @@ -623,7 +690,6 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade if (object_info->getPath().empty()) return {}; - if (!object_info->getObjectMetadata()) { bool with_tags = read_from_format_info.requested_virtual_columns.contains("_tags"); @@ -640,11 +706,48 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade else object_info->setObjectMetadata(object_storage->getObjectMetadata(path, with_tags)); } + + if (query_settings.skip_empty_files && object_info->getObjectMetadata()->size_bytes == 0 + && object_info->getObjectMetadata()->is_size_known) + continue; + + if (query_condition_cache && !object_info->file_bucket_info) + { + auto matching_marks = query_condition_cache->read( + storage_id.uuid, object_info->getFileName(), *format_filter_info->condition_hash); + if (matching_marks.has_value()) + { + const auto & marks = *matching_marks; + size_t total_row_groups = marks.size(); + std::vector matching_row_groups; + for (size_t i = 0; i < total_row_groups; ++i) + if (marks[i]) + matching_row_groups.push_back(i); + + size_t dropped_row_groups = total_row_groups - matching_row_groups.size(); + LOG_DEBUG(log, + "Query condition cache has dropped {}/{} row groups for condition {} in file {}.", + dropped_row_groups, + total_row_groups, + format_filter_info->filter_actions_dag->dumpNames(), + object_info->getFileName()); + + if (matching_row_groups.empty()) + continue; + + auto file_bucket_info = FormatFactory::instance().getFileBucketInfo( + object_info->getFileFormat().value_or(configuration->getFormat())); + if (file_bucket_info) + { + auto filtered = file_bucket_info->filterByMatchingRowGroups(matching_row_groups); + if (!filtered) + continue; + object_info->file_bucket_info = std::move(filtered); + } + } + } + break; } - while (not_a_path - || (query_settings.skip_empty_files - && object_info->getObjectMetadata()->size_bytes == 0 - && object_info->getObjectMetadata()->is_size_known)); ProfileEvents::increment(ProfileEvents::ObjectStorageClusterProcessedTasks); diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.h b/src/Storages/ObjectStorage/StorageObjectStorageSource.h index 40b1c0890088..6326fa6ee89f 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.h +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.h @@ -29,6 +29,7 @@ class StorageObjectStorageSource : public ISource class ArchiveIterator; StorageObjectStorageSource( + const StorageID & storage_id_, String name_, ObjectStoragePtr object_storage_, StorageObjectStorageConfigurationPtr configuration, @@ -71,6 +72,7 @@ class StorageObjectStorageSource : public ISource const StorageObjectStorageConfiguration & configuration, const ObjectInfo & object_info, bool include_connection_info = true); protected: + StorageID storage_id; const String name; ObjectStoragePtr object_storage; const StorageObjectStorageConfigurationPtr configuration; @@ -138,6 +140,7 @@ class StorageObjectStorageSource : public ISource /// Recreate ReadBuffer and Pipeline for each file. static ReaderHolder createReader( size_t processor, + const StorageID & storage_id, const std::shared_ptr & file_iterator, const StorageObjectStorageConfigurationPtr & configuration, const ObjectStoragePtr & object_storage, diff --git a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp index 94f67d150a05..71391b183567 100644 --- a/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp +++ b/src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp @@ -1100,6 +1100,7 @@ Chunk ObjectStorageQueueSource::generateImpl() const auto context = getContext(); reader = StorageObjectStorageSource::createReader( processor_id, + storage_id, file_iterator, configuration, object_storage, diff --git a/tests/integration/test_storage_iceberg_with_spark/test_query_condition_cache.py b/tests/integration/test_storage_iceberg_with_spark/test_query_condition_cache.py new file mode 100644 index 000000000000..89e8ca2ac70d --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_query_condition_cache.py @@ -0,0 +1,170 @@ +import pytest + +from helpers.iceberg_utils import ( + execute_spark_query_general, + get_creation_expression, + get_uuid_str, +) + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_query_condition_cache(started_cluster_iceberg_with_spark, storage_type): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_query_condition_cache_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} (id INT, val STRING) + USING iceberg + OPTIONS('format-version'='2') + """ + ) + + for i in range(1, 10): + execute_spark_query( + f"INSERT INTO {TABLE_NAME} VALUES ({i}, 'value_{i}')" + ) + + instance.query( + get_creation_expression( + storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=False + ) + ) + creation_expression = TABLE_NAME + + instance.query("SYSTEM DROP QUERY CONDITION CACHE") + + filter_condition = "WHERE id % 7 = 0" + select_query = f"SELECT * FROM {creation_expression} {filter_condition} ORDER BY id" + settings = { + "use_query_condition_cache": 1, + "allow_experimental_analyzer": 1, + } + + query_id_first = f"{TABLE_NAME}_first" + result_first = instance.query(select_query, query_id=query_id_first, settings=settings) + + instance.query("SYSTEM FLUSH LOGS") + + misses_first = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheMisses'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_first}' AND type = 'QueryFinish'" + ) + ) + hits_first = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheHits'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_first}' AND type = 'QueryFinish'" + ) + ) + assert misses_first > 0, f"Expected cache misses on first run, got {misses_first}" + assert hits_first == 0, f"Expected no cache hits on first run, got {hits_first}" + + query_id_second = f"{TABLE_NAME}_second" + result_second = instance.query(select_query, query_id=query_id_second, settings=settings) + assert result_second == result_first + + instance.query("SYSTEM FLUSH LOGS") + + hits_second = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheHits'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_second}' AND type = 'QueryFinish'" + ) + ) + assert hits_second > 0 + + instance.query("SYSTEM DROP QUERY CONDITION CACHE") + + query_id_after_drop = f"{TABLE_NAME}_after_drop" + instance.query(select_query, query_id=query_id_after_drop, settings=settings) + + instance.query("SYSTEM FLUSH LOGS") + + misses_after_drop = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheMisses'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_after_drop}' AND type = 'QueryFinish'" + ) + ) + hits_after_drop = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheHits'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_after_drop}' AND type = 'QueryFinish'" + ) + ) + assert misses_after_drop > 0 + assert hits_after_drop == 0 + + instance.query(f"DROP TABLE IF EXISTS {TABLE_NAME}") + + +@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"]) +def test_query_condition_cache_nondeterministic_functions( + started_cluster_iceberg_with_spark, storage_type +): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = ( + "test_qcc_nondeterministic_" + storage_type + "_" + get_uuid_str() + ) + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} (id INT, val STRING) + USING iceberg + OPTIONS('format-version'='2') + """ + ) + + for i in range(1, 10): + execute_spark_query(f"INSERT INTO {TABLE_NAME} VALUES ({i}, 'value_{i}')") + + instance.query( + get_creation_expression( + storage_type, + TABLE_NAME, + started_cluster_iceberg_with_spark, + table_function=False, + ) + ) + + instance.query("SYSTEM DROP QUERY CONDITION CACHE") + + select_query = f"SELECT * FROM {TABLE_NAME} WHERE id = rand() FORMAT Null" + settings = { + "use_query_condition_cache": 1, + "allow_experimental_analyzer": 1, + } + + instance.query(select_query, settings=settings) + + cache_size = int(instance.query("SELECT count() FROM system.query_condition_cache")) + assert cache_size == 0 + + instance.query(f"DROP TABLE IF EXISTS {TABLE_NAME}") diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 2ab756654d7b..d245ce1feeb1 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -3257,3 +3257,97 @@ def test_gcs_decompressive_transcoding(started_cluster): "'http://resolver:8084/bucket/data.jsonl', NOSIGN, 'LineAsString', 'line String')" ) assert result.strip() == '{"id":1}\n{"id":2}\n{"id":3}' + +def test_query_condition_cache(started_cluster): + instance = started_cluster.instances["dummy"] + bucket = started_cluster.minio_bucket + table_name = f"test_qcc_{generate_random_string()}" + url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{table_name}.parquet" + + instance.query( + f""" + CREATE TABLE {table_name} (id Int64, val String) + ENGINE = S3('{url}', 'minio', '{minio_secret_key}', 'Parquet') + SETTINGS output_format_parquet_row_group_size = 1 + """ + ) + + instance.query( + f""" + INSERT INTO {table_name} + SELECT number AS id, toString(number) AS val + FROM numbers(1000) + """ + ) + + instance.query("SYSTEM DROP QUERY CONDITION CACHE") + + select_query = f"SELECT * FROM {table_name} WHERE id < 100 ORDER BY id" + settings = { + "use_query_condition_cache": 1, + "allow_experimental_analyzer": 1, + } + + query_id_first = f"qcc_s3_first_{generate_random_string()}" + result_first = instance.query(select_query, query_id=query_id_first, settings=settings) + + instance.query("SYSTEM FLUSH LOGS") + + misses_first = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheMisses'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_first}' AND type = 'QueryFinish'" + ) + ) + hits_first = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheHits'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_first}' AND type = 'QueryFinish'" + ) + ) + assert misses_first > 0, f"Expected cache misses on first run, got {misses_first}" + assert hits_first == 0, f"Expected no cache hits on first run, got {hits_first}" + + query_id_second = f"qcc_s3_second_{generate_random_string()}" + result_second = instance.query(select_query, query_id=query_id_second, settings=settings) + assert result_second == result_first + + instance.query("SYSTEM FLUSH LOGS") + + hits_second = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheHits'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_second}' AND type = 'QueryFinish'" + ) + ) + assert hits_second > 0, f"Expected cache hits on second run, got {hits_second}" + + instance.query("SYSTEM DROP QUERY CONDITION CACHE") + + query_id_after_drop = f"qcc_s3_after_drop_{generate_random_string()}" + instance.query(select_query, query_id=query_id_after_drop, settings=settings) + + instance.query("SYSTEM FLUSH LOGS") + + misses_after_drop = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheMisses'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_after_drop}' AND type = 'QueryFinish'" + ) + ) + hits_after_drop = int( + instance.query( + f"SELECT ProfileEvents['QueryConditionCacheHits'] " + f"FROM system.query_log " + f"WHERE query_id = '{query_id_after_drop}' AND type = 'QueryFinish'" + ) + ) + assert misses_after_drop > 0, f"Expected cache misses after drop, got {misses_after_drop}" + assert hits_after_drop == 0, f"Expected no hits after drop, got {hits_after_drop}" + + instance.query(f"DROP TABLE {table_name}") +