Skip to content

Commit f55dd06

Browse files
committed
process partition key and sorting key from IcebergS3
update
1 parent f9a9a99 commit f55dd06

File tree

5 files changed

+273
-9
lines changed

5 files changed

+273
-9
lines changed

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ class IDataLakeMetadata : boost::noncopyable
5757
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
5858
virtual std::optional<size_t> totalBytes(ContextPtr) const { return {}; }
5959

60+
virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
61+
virtual std::optional<String> sortingKey(ContextPtr) const { return {}; }
62+
6063
protected:
6164
ObjectIterator createKeysIterator(
6265
Strings && data_files_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 217 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,198 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
492492
return previous_snapshot_schema_id != relevant_snapshot_schema_id;
493493
}
494494

495+
namespace
496+
{
497+
498+
using IdToName = std::unordered_map<Int32, String>;
499+
500+
IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj)
501+
{
502+
IdToName map;
503+
if (!metadata_obj || !metadata_obj->has("current-schema-id") || !metadata_obj->has("schemas"))
504+
return map;
505+
506+
const auto current_schema_id = metadata_obj->getValue<Int32>("current-schema-id");
507+
auto schemas = metadata_obj->getArray("schemas");
508+
if (!schemas)
509+
return map;
510+
511+
for (size_t i = 0; i < schemas->size(); ++i)
512+
{
513+
auto schema = schemas->getObject(i);
514+
if (!schema || !schema->has("schema-id"))
515+
continue;
516+
if (schema->getValue<Int32>("schema-id") != current_schema_id)
517+
continue;
518+
519+
if (auto fields = schema->getArray("fields"))
520+
{
521+
for (size_t j = 0; j < fields->size(); ++j)
522+
{
523+
auto f = fields->getObject(j);
524+
if (!f || !f->has("id") || !f->has("name"))
525+
continue;
526+
map.emplace(f->getValue<Int32>("id"), f->getValue<String>("name"));
527+
}
528+
}
529+
break;
530+
}
531+
return map;
532+
}
533+
534+
String formatTransform(
535+
const String & transform,
536+
const Poco::JSON::Object::Ptr & field_obj,
537+
const IdToName & id_to_name)
538+
{
539+
Int32 source_id = (field_obj && field_obj->has("source-id"))
540+
? field_obj->getValue<Int32>("source-id")
541+
: -1;
542+
543+
const auto it = id_to_name.find(source_id);
544+
const String col = (it != id_to_name.end()) ? it->second : ("col_" + toString(source_id));
545+
546+
String base = transform;
547+
String param;
548+
if (const auto lpos = transform.find('['); lpos != String::npos && transform.back() == ']')
549+
{
550+
base = transform.substr(0, lpos);
551+
param = transform.substr(lpos + 1, transform.size() - lpos - 2); // strip [ and ]
552+
}
553+
554+
String result;
555+
if (base == "identity")
556+
result = col;
557+
else if (base == "year" || base == "month" || base == "day" || base == "hour")
558+
result = base + "(" + col + ")";
559+
else if (base != "void")
560+
{
561+
if (!param.empty())
562+
result = base + "(" + param + ", " + col + ")";
563+
else
564+
result = base + "(" + col + ")";
565+
}
566+
return result;
567+
}
568+
569+
Poco::JSON::Array::Ptr findActivePartitionFields(const Poco::JSON::Object::Ptr & metadata_obj)
570+
{
571+
if (!metadata_obj)
572+
return nullptr;
573+
574+
if (metadata_obj->has("partition-spec"))
575+
return metadata_obj->getArray("partition-spec");
576+
577+
// If for some reason there is no partition-spec, try partition-specs + default-
578+
if (metadata_obj->has("partition-specs") && metadata_obj->has("default-spec-id"))
579+
{
580+
const auto default_spec_id = metadata_obj->getValue<Int32>("default-spec-id");
581+
if (auto specs = metadata_obj->getArray("partition-specs"))
582+
{
583+
for (size_t i = 0; i < specs->size(); ++i)
584+
{
585+
auto spec = specs->getObject(i);
586+
if (!spec || !spec->has("spec-id"))
587+
continue;
588+
if (spec->getValue<Int32>("spec-id") == default_spec_id)
589+
return spec->has("fields") ? spec->getArray("fields") : nullptr;
590+
}
591+
}
592+
}
593+
594+
return nullptr;
595+
}
596+
597+
Poco::JSON::Array::Ptr findActiveSortFields(const Poco::JSON::Object::Ptr & metadata_obj)
598+
{
599+
if (!metadata_obj || !metadata_obj->has("default-sort-order-id") || !metadata_obj->has("sort-orders"))
600+
return nullptr;
601+
602+
const auto default_sort_order_id = metadata_obj->getValue<Int32>("default-sort-order-id");
603+
auto orders = metadata_obj->getArray("sort-orders");
604+
if (!orders)
605+
return nullptr;
606+
607+
for (size_t i = 0; i < orders->size(); ++i)
608+
{
609+
auto order = orders->getObject(i);
610+
if (!order || !order->has("order-id"))
611+
continue;
612+
if (order->getValue<Int32>("order-id") == default_sort_order_id)
613+
return order->has("fields") ? order->getArray("fields") : nullptr;
614+
}
615+
return nullptr;
616+
}
617+
618+
String composeList(
619+
const Poco::JSON::Array::Ptr & fields,
620+
const IdToName & id_to_name,
621+
bool lookup_sort_modifiers)
622+
{
623+
if (!fields || fields->size() == 0)
624+
return {};
625+
626+
Strings parts;
627+
parts.reserve(fields->size());
628+
629+
for (size_t i = 0; i < fields->size(); ++i)
630+
{
631+
auto field = fields->getObject(i);
632+
if (!field)
633+
continue;
634+
635+
const String transform = field->has("transform") ? field->getValue<String>("transform") : "identity";
636+
String expr = formatTransform(transform, field, id_to_name);
637+
if (expr.empty())
638+
continue;
639+
640+
if (lookup_sort_modifiers)
641+
{
642+
if (field->has("direction"))
643+
{
644+
auto d = field->getValue<String>("direction");
645+
expr += (Poco::icompare(d, "desc") == 0) ? "DESC" : "ASC";
646+
}
647+
if (field->has("null-order"))
648+
{
649+
auto n = field->getValue<String>("null-order");
650+
expr += (Poco::icompare(n, "nulls-last") == 0) ? "NULLS LAST" : "NULLS FIRST";
651+
}
652+
}
653+
654+
parts.push_back(std::move(expr));
655+
}
656+
657+
if (parts.empty())
658+
return {};
659+
660+
String res;
661+
for (size_t i = 0; i < parts.size(); ++i)
662+
{
663+
if (i) res += ", ";
664+
res += parts[i];
665+
}
666+
return res;
667+
}
668+
669+
std::pair<std::optional<String>, std::optional<String>> extractIcebergKeys(const Poco::JSON::Object::Ptr & metadata_obj)
670+
{
671+
std::optional<String> partition_key;
672+
std::optional<String> sort_key;
673+
674+
if (metadata_obj)
675+
{
676+
auto id_to_name = buildIdToNameMap(metadata_obj);
677+
678+
partition_key = composeList(findActivePartitionFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ false);
679+
sort_key = composeList(findActiveSortFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ true);
680+
}
681+
682+
return {partition_key, sort_key};
683+
}
684+
685+
}
686+
495687
void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object)
496688
{
497689
auto configuration_ptr = configuration.lock();
@@ -526,10 +718,11 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
526718
total_bytes = summary_object->getValue<Int64>(f_total_files_size);
527719
}
528720

721+
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object);
529722
relevant_snapshot = IcebergSnapshot{
530723
getManifestList(local_context, getProperFilePathFromMetadataInfo(
531724
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location)),
532-
relevant_snapshot_id, total_rows, total_bytes};
725+
relevant_snapshot_id, total_rows, total_bytes, partition_key, sorting_key};
533726

534727
if (!snapshot->has(f_schema_id))
535728
throw Exception(
@@ -973,6 +1166,29 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
9731166
return result;
9741167
}
9751168

1169+
std::optional<String> IcebergMetadata::partitionKey(ContextPtr) const
1170+
{
1171+
SharedLockGuard lock(mutex);
1172+
if (relevant_snapshot->partition_key.has_value())
1173+
{
1174+
return relevant_snapshot->partition_key;
1175+
}
1176+
1177+
return std::nullopt;
1178+
}
1179+
1180+
std::optional<String> IcebergMetadata::sortingKey(ContextPtr) const
1181+
{
1182+
SharedLockGuard lock(mutex);
1183+
if (relevant_snapshot->sorting_key.has_value())
1184+
{
1185+
return relevant_snapshot->sorting_key;
1186+
}
1187+
1188+
return std::nullopt;
1189+
}
1190+
1191+
9761192
ObjectIterator IcebergMetadata::iterate(
9771193
const ActionsDAG * filter_dag,
9781194
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,9 @@ class IcebergMetadata : public IDataLakeMetadata
7575
std::optional<size_t> totalRows(ContextPtr Local_context) const override;
7676
std::optional<size_t> totalBytes(ContextPtr Local_context) const override;
7777

78+
std::optional<String> partitionKey(ContextPtr) const override;
79+
std::optional<String> sortingKey(ContextPtr) const override;
80+
7881
protected:
7982
ObjectIterator iterate(
8083
const ActionsDAG * filter_dag,

src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ struct IcebergSnapshot
1616
Int64 snapshot_id;
1717
std::optional<size_t> total_rows;
1818
std::optional<size_t> total_bytes;
19+
std::optional<String> partition_key;
20+
std::optional<String> sorting_key;
1921
};
2022

2123
struct IcebergHistoryRecord

src/Storages/System/StorageSystemTables.cpp

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <QueryPipeline/Pipe.h>
2323
#include <QueryPipeline/QueryPipelineBuilder.h>
2424
#include <Storages/MergeTree/MergeTreeData.h>
25+
#include <Storages/ObjectStorage/StorageObjectStorage.h>
26+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
2527
#include <Storages/SelectQueryInfo.h>
2628
#include <Storages/StorageView.h>
2729
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
@@ -595,18 +597,56 @@ class TablesBlockSource : public ISource
595597
ASTPtr expression_ptr;
596598
if (columns_mask[src_index++])
597599
{
598-
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
599-
res_columns[res_index++]->insert(format({context, *expression_ptr}));
600-
else
601-
res_columns[res_index++]->insertDefault();
600+
bool inserted = false;
601+
602+
// Extract from specific DataLake metadata if suitable
603+
if (auto * obj = typeid_cast<StorageObjectStorage *>(table.get()))
604+
{
605+
if (auto * dl_meta = obj->getExternalMetadata(context))
606+
{
607+
if (auto p = dl_meta->partitionKey(context); p.has_value())
608+
{
609+
res_columns[res_index++]->insert(*p);
610+
inserted = true;
611+
}
612+
}
613+
614+
}
615+
616+
if (!inserted)
617+
{
618+
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
619+
res_columns[res_index++]->insert(format({context, *expression_ptr}));
620+
else
621+
res_columns[res_index++]->insertDefault();
622+
}
602623
}
603624

604625
if (columns_mask[src_index++])
605626
{
606-
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
607-
res_columns[res_index++]->insert(format({context, *expression_ptr}));
608-
else
609-
res_columns[res_index++]->insertDefault();
627+
bool inserted = false;
628+
629+
// Extract from specific DataLake metadata if suitable
630+
if (auto * obj = typeid_cast<StorageObjectStorage *>(table.get()))
631+
{
632+
if (auto * dl_meta = obj->getExternalMetadata(context))
633+
{
634+
if (auto p = dl_meta->sortingKey(context); p.has_value())
635+
{
636+
res_columns[res_index++]->insert(*p);
637+
inserted = true;
638+
}
639+
}
640+
641+
}
642+
643+
if (!inserted)
644+
{
645+
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
646+
res_columns[res_index++]->insert(format({context, *expression_ptr}));
647+
else
648+
res_columns[res_index++]->insertDefault();
649+
}
610650
}
611651

612652
if (columns_mask[src_index++])

0 commit comments

Comments
 (0)