Skip to content

25.6 Antalya port of #709, #760 - Rendezvous hashing #923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: antalya-25.6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class ReadFromCluster : public SourceStepWithFilter

std::optional<RemoteQueryExecutor::Extension> extension;

void createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas);
void createExtension(const ActionsDAG::Node * predicate);
ContextPtr updateSettings(const Settings & settings);
};

Expand All @@ -105,19 +105,15 @@ void ReadFromCluster::applyFilters(ActionDAGNodes added_filter_nodes)
if (filter_actions_dag)
predicate = filter_actions_dag->getOutputs().at(0);

auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1)
max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value);

createExtension(predicate, max_replicas_to_use);
createExtension(predicate);
}

void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate, size_t number_of_replicas)
void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
{
if (extension)
return;

extension = storage->getTaskIteratorExtension(predicate, context, number_of_replicas);
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
}

/// The code executes on initiator
Expand Down Expand Up @@ -196,7 +192,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
if (current_settings[Setting::max_parallel_replicas] > 1)
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);

createExtension(nullptr, max_replicas_to_use);
createExtension(nullptr);

for (const auto & shard_info : cluster->getShardsInfo())
{
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/IStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ class IStorageCluster : public IStorage

ClusterPtr getCluster(ContextPtr context) const;
/// Query is needed for pruning by virtual columns (_file, _path)
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const = 0;
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr cluster) const = 0;

QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;

Expand Down
20 changes: 17 additions & 3 deletions src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include <Storages/extractTableFunctionFromSelectQuery.h>
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>


namespace DB
{
namespace Setting
Expand Down Expand Up @@ -177,13 +176,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
}

RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & local_context, const size_t number_of_replicas) const
const ActionsDAG::Node * predicate,
const ContextPtr & local_context,
ClusterPtr cluster) const
{
auto iterator = StorageObjectStorageSource::createFileIterator(
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
local_context, predicate, {}, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true, /*skip_object_metadata=*/true);

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, number_of_replicas);
std::vector<std::string> ids_of_hosts;
for (const auto & shard : cluster->getShardsInfo())
{
if (shard.per_replica_pools.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
for (const auto & replica : shard.per_replica_pools)
{
if (!replica)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
ids_of_hosts.push_back(replica->getAddress());
}
}

auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);

auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica) mutable -> String {
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ class StorageObjectStorageCluster : public IStorageCluster
std::string getName() const override;

RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr cluster) const override;

String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ namespace DB

StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
size_t number_of_replicas_)
std::vector<std::string> ids_of_nodes_)
: iterator(std::move(iterator_))
, connection_to_files(number_of_replicas_)
, connection_to_files(ids_of_nodes_.size())
, ids_of_nodes(ids_of_nodes_)
, iterator_exhausted(false)
{
}
Expand All @@ -37,13 +38,39 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz

size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
{
return ConsistentHashing(sipHash64(file_path), connection_to_files.size());
size_t nodes_count = ids_of_nodes.size();

/// Trivial case
if (nodes_count < 2)
return 0;

/// Rendezvous hashing
size_t best_id = 0;
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
for (size_t id = 1; id < nodes_count; ++id)
{
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
if (weight > best_weight)
{
best_weight = weight;
best_id = id;
}
}
return best_id;
}

std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
{
std::lock_guard lock(mutex);

if (connection_to_files.size() <= number_of_current_replica)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Replica number {} is out of range. Expected range: [0, {})",
number_of_current_replica,
connection_to_files.size()
);

auto & files = connection_to_files[number_of_current_replica];

while (!files.empty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Common/Logger.h>
#include <Interpreters/Cluster.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
#include <unordered_set>
#include <vector>
#include <mutex>
Expand All @@ -17,7 +18,7 @@ class StorageObjectStorageStableTaskDistributor
public:
StorageObjectStorageStableTaskDistributor(
std::shared_ptr<IObjectIterator> iterator_,
size_t number_of_replicas_);
std::vector<std::string> ids_of_nodes_);

std::optional<String> getNextTask(size_t number_of_current_replica);

Expand All @@ -32,6 +33,8 @@ class StorageObjectStorageStableTaskDistributor
std::vector<std::vector<String>> connection_to_files;
std::unordered_set<String> unprocessed_files;

std::vector<std::string> ids_of_nodes;

std::mutex mutex;
bool iterator_exhausted = false;

Expand Down
3 changes: 1 addition & 2 deletions src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1300,8 +1300,7 @@ std::optional<QueryPipeline> StorageDistributed::distributedWriteFromClusterStor
const auto cluster = getCluster();

/// Select query is needed for pruining on virtual columns
auto number_of_replicas = static_cast<UInt64>(cluster->getShardsInfo().size());
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, number_of_replicas);
auto extension = src_storage_cluster.getTaskIteratorExtension(predicate, local_context, cluster);

/// Here we take addresses from destination cluster and assume source table exists on these nodes
size_t replica_index = 0;
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/StorageFileCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
);
}

RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, const size_t) const
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr) const
{
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), context);
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/StorageFileCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ class StorageFileCluster : public IStorageCluster
const ConstraintsDescription & constraints_);

std::string getName() const override { return "FileCluster"; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr) const override;

private:
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;
Expand Down
3 changes: 1 addition & 2 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6089,8 +6089,7 @@ std::optional<QueryPipeline> StorageReplicatedMergeTree::distributedWriteFromClu
ContextMutablePtr query_context = Context::createCopy(local_context);
query_context->increaseDistributedDepth();

auto number_of_replicas = static_cast<UInt64>(src_cluster->getShardsAddresses().size());
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, number_of_replicas);
auto extension = src_storage_cluster->getTaskIteratorExtension(nullptr, local_context, src_cluster);

size_t replica_index = 0;
for (const auto & replicas : src_cluster->getShardsAddresses())
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/StorageURLCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS
);
}

RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t) const
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr) const
{
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(
uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context);
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/StorageURLCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ class StorageURLCluster : public IStorageCluster
const StorageURL::Configuration & configuration_);

std::string getName() const override { return "URLCluster"; }
RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context, size_t number_of_replicas) const override;
RemoteQueryExecutor::Extension getTaskIteratorExtension(
const ActionsDAG::Node * predicate,
const ContextPtr & context,
ClusterPtr) const override;

private:
void updateQueryToSendIfNeeded(ASTPtr & query, const StorageSnapshotPtr & storage_snapshot, const ContextPtr & context) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<test_s3>
<url>http://minio1:9001/root/data/*</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key>
<format>CSV</format>>
</test_s3>
</named_collections>
Expand Down
10 changes: 6 additions & 4 deletions tests/integration/test_s3_cache_locality/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import pytest

from helpers.cluster import ClickHouseCluster
from helpers.config_cluster import minio_secret_key


logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
Expand Down Expand Up @@ -81,7 +83,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
result_first = node.query(
f"""
SELECT count(*)
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64')
FROM s3Cluster('{cluster_first}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
SETTINGS
enable_filesystem_cache={enable_filesystem_cache},
Expand All @@ -95,7 +97,7 @@ def check_s3_gets(cluster, node, expected_result, cluster_first, cluster_second,
result_second = node.query(
f"""
SELECT count(*)
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64')
FROM s3Cluster('{cluster_second}', 'http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
SETTINGS
enable_filesystem_cache={enable_filesystem_cache},
Expand Down Expand Up @@ -148,9 +150,9 @@ def test_cache_locality(started_cluster):
node = started_cluster.instances["clickhouse0"]

expected_result = node.query(
"""
f"""
SELECT count(*)
FROM s3('http://minio1:9001/root/data/generated/*', 'minio', 'minio123', 'CSV', 'a String, b UInt64')
FROM s3('http://minio1:9001/root/data/generated/*', 'minio', '{minio_secret_key}', 'CSV', 'a String, b UInt64')
WHERE b=42
"""
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<s3_with_type>
<url>http://minio1:9001/root/</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<secret_access_key>ClickHouse_Minio_P@ssw0rd</secret_access_key>
<storage_type>s3</storage_type>
</s3_with_type>
<azure_with_type>
Expand Down
Loading