Skip to content

Commit a1c3292

Browse files
ksseniizvonand
authored andcommitted
Merge pull request ClickHouse#82131 from ClickHouse/fix-partiton-pruning-in-data-lake-cluster-functions
Fix partition pruning with data lake cluster functions
1 parent 8de897c commit a1c3292

15 files changed

+74
-19
lines changed

src/Planner/Planner.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include <Storages/StorageDistributed.h>
5353
#include <Storages/StorageDummy.h>
5454
#include <Storages/StorageMerge.h>
55+
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
5556

5657
#include <AggregateFunctions/IAggregateFunction.h>
5758

@@ -231,6 +232,11 @@ FiltersForTableExpressionMap collectFiltersForAnalysis(const QueryTreeNodePtr &
231232
collect_filters = true;
232233
break;
233234
}
235+
if (typeid_cast<const StorageObjectStorageCluster *>(storage.get()))
236+
{
237+
collect_filters = true;
238+
break;
239+
}
234240
}
235241

236242
if (!collect_filters)

src/Storages/IStorageCluster.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,11 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t
117117
if (extension)
118118
return;
119119

120-
extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
120+
extension = storage->getTaskIteratorExtension(
121+
predicate,
122+
filter_actions_dag.has_value() ? &filter_actions_dag.value() : query_info.filter_actions_dag.get(),
123+
context,
124+
number_of_replicas);
121125
}
122126

123127
/// The code executes on initiator

src/Storages/IStorageCluster.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,13 @@ class IStorageCluster : public IStorage
3434
size_t /*num_streams*/) override;
3535

3636
ClusterPtr getCluster(ContextPtr context) const;
37+
3738
/// Query is needed for pruning by virtual columns (_file, _path)
38-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
39+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
40+
const ActionsDAG::Node * predicate,
41+
const ActionsDAG * filter_actions_dag,
42+
const ContextPtr & context,
43+
size_t number_of_replicas) const = 0;
3944

4045
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4146

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ class ReadFromObjectStorageStep : public SourceStepWithFilter
451451
auto context = getContext();
452452
iterator_wrapper = StorageObjectStorageSource::createFileIterator(
453453
configuration, configuration->getQuerySettings(context), object_storage, distributed_processing,
454-
context, predicate, filter_actions_dag, virtual_columns, info.hive_partition_columns_to_read_from_file_path, nullptr, context->getFileProgressCallback());
454+
context, predicate, filter_actions_dag.has_value() ? &filter_actions_dag.value() : nullptr, virtual_columns, info.hive_partition_columns_to_read_from_file_path, nullptr, context->getFileProgressCallback());
455455
}
456456
};
457457
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,12 +213,27 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
213213
}
214214
}
215215

216+
216217
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
217-
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
218+
const ActionsDAG::Node * predicate,
219+
const ActionsDAG * filter,
220+
const ContextPtr & local_context,
221+
const size_t number_of_replicas) const
218222
{
219223
auto iterator = StorageObjectStorageSource::createFileIterator(
220-
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
221-
local_context, predicate, {}, virtual_columns, hive_partition_columns_to_read_from_file_path, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);
224+
configuration,
225+
configuration->getQuerySettings(local_context),
226+
object_storage,
227+
/* distributed_processing */false,
228+
local_context,
229+
predicate,
230+
filter,
231+
virtual_columns,
232+
hive_partition_columns_to_read_from_file_path,
233+
nullptr,
234+
local_context->getFileProgressCallback(),
235+
/*ignore_archive_globs=*/true,
236+
/*skip_object_metadata=*/true);
222237

223238
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
224239

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ class StorageObjectStorageCluster : public IStorageCluster
2525
std::string getName() const override;
2626

2727
RemoteQueryExecutor::Extension getTaskIteratorExtension(
28-
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
28+
const ActionsDAG::Node * predicate,
29+
const ActionsDAG * filter,
30+
const ContextPtr & context,
31+
size_t number_of_replicas) const override;
2932

3033
String getPathSample(ContextPtr context);
3134

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(
130130
bool distributed_processing,
131131
const ContextPtr & local_context,
132132
const ActionsDAG::Node * predicate,
133-
const std::optional<ActionsDAG> & filter_actions_dag,
133+
const ActionsDAG * filter_actions_dag,
134134
const NamesAndTypesList & virtual_columns,
135135
const NamesAndTypesList & hive_columns,
136136
ObjectInfos * read_keys,
@@ -171,7 +171,7 @@ std::shared_ptr<IObjectIterator> StorageObjectStorageSource::createFileIterator(
171171
else if (configuration->supportsFileIterator())
172172
{
173173
return configuration->iterate(
174-
filter_actions_dag.has_value() ? &filter_actions_dag.value() : nullptr,
174+
filter_actions_dag,
175175
file_progress_callback,
176176
query_settings.list_object_keys_size,
177177
local_context);

src/Storages/ObjectStorage/StorageObjectStorageSource.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class StorageObjectStorageSource : public SourceWithKeyCondition
5757
bool distributed_processing,
5858
const ContextPtr & local_context,
5959
const ActionsDAG::Node * predicate,
60-
const std::optional<ActionsDAG> & filter_actions_dag,
60+
const ActionsDAG * filter_actions_dag,
6161
const NamesAndTypesList & virtual_columns,
6262
const NamesAndTypesList & hive_columns,
6363
ObjectInfos * read_keys,

src/Storages/StorageDistributed.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1318,7 +1318,8 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
13181318

13191319
/// Select query is needed for pruining on virtual columns
13201320
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
1321-
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
1321+
auto extension = src_storage_cluster.getTaskIteratorExtension(
1322+
predicate, filter.has_value() ? &filter.value() : nullptr, local_context, number_of_replicas);
13221323

13231324
/// Here we take addresses from destination cluster and assume source table exists on these nodes
13241325
size_t replica_index = 0;

src/Storages/StorageFileCluster.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,11 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
100100
);
101101
}
102102

103-
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const
103+
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
104+
const ActionsDAG::Node * predicate,
105+
const ActionsDAG * /* filter */,
106+
const ContextPtr & context,
107+
const size_t) const
104108
{
105109
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), hive_partition_columns_to_read_from_file_path, context);
106110
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });

0 commit comments

Comments
 (0)