-
Notifications
You must be signed in to change notification settings - Fork 8
Antalya 25.6.5: Expose IcebergS3 partition_key and sorting_key in system.tables #959
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a8c82cf
4980ec2
15bd458
d76eaeb
f790be1
69c3025
c082b3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -529,6 +529,197 @@ bool IcebergMetadata::update(const ContextPtr & local_context) | |
return previous_snapshot_schema_id != relevant_snapshot_schema_id; | ||
} | ||
|
||
namespace | ||
{ | ||
|
||
using IdToName = std::unordered_map<Int32, String>; | ||
|
||
IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj) | ||
{ | ||
IdToName map; | ||
if (!metadata_obj || !metadata_obj->has("current-schema-id") || !metadata_obj->has("schemas")) | ||
return map; | ||
|
||
const auto current_schema_id = metadata_obj->getValue<Int32>("current-schema-id"); | ||
auto schemas = metadata_obj->getArray("schemas"); | ||
if (!schemas) | ||
return map; | ||
|
||
for (size_t i = 0; i < schemas->size(); ++i) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
and the same way in other cycles over JSON::Array There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does not look any better: in range-based loop, type will be wrong and will need to be converted explicitly later. This will not be better than it is already: for (const auto & v : *schemas)
{
const auto & schema = v.extract<Poco::JSON::Object::Ptr>(); |
||
{ | ||
auto schema = schemas->getObject(i); | ||
|
||
if (!schema || !schema->has("schema-id") || (schema->getValue<Int32>("schema-id") != current_schema_id)) | ||
continue; | ||
|
||
if (auto fields = schema->getArray("fields")) | ||
{ | ||
for (size_t j = 0; j < fields->size(); ++j) | ||
{ | ||
auto f = fields->getObject(j); | ||
if (!f || !f->has("id") || !f->has("name")) | ||
continue; | ||
map.emplace(f->getValue<Int32>("id"), f->getValue<String>("name")); | ||
} | ||
} | ||
break; | ||
} | ||
return map; | ||
} | ||
|
||
String formatTransform( | ||
const String & transform, | ||
const Poco::JSON::Object::Ptr & field_obj, | ||
const IdToName & id_to_name) | ||
{ | ||
Int32 source_id = (field_obj && field_obj->has("source-id")) | ||
? field_obj->getValue<Int32>("source-id") | ||
: -1; | ||
|
||
const auto it = id_to_name.find(source_id); | ||
const String col = (it != id_to_name.end()) ? it->second : ("col_" + toString(source_id)); | ||
|
||
String base = transform; | ||
String param; | ||
if (const auto lpos = transform.find('['); lpos != String::npos && transform.back() == ']') | ||
{ | ||
base = transform.substr(0, lpos); | ||
param = transform.substr(lpos + 1, transform.size() - lpos - 2); // strip [ and ] | ||
} | ||
|
||
String result; | ||
if (base == "identity") | ||
result = col; | ||
else if (base == "year" || base == "month" || base == "day" || base == "hour") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am I right that these types can't have There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, correct |
||
result = base + "(" + col + ")"; | ||
else if (base != "void") | ||
{ | ||
if (!param.empty()) | ||
result = base + "(" + param + ", " + col + ")"; | ||
else | ||
result = base + "(" + col + ")"; | ||
} | ||
return result; | ||
} | ||
|
||
Poco::JSON::Array::Ptr findActivePartitionFields(const Poco::JSON::Object::Ptr & metadata_obj) | ||
{ | ||
if (!metadata_obj) | ||
return nullptr; | ||
|
||
if (metadata_obj->has("partition-spec")) | ||
return metadata_obj->getArray("partition-spec"); | ||
|
||
// If for some reason there is no partition-spec, try partition-specs + default- | ||
if (metadata_obj->has("partition-specs") && metadata_obj->has("default-spec-id")) | ||
{ | ||
const auto default_spec_id = metadata_obj->getValue<Int32>("default-spec-id"); | ||
if (auto specs = metadata_obj->getArray("partition-specs")) | ||
{ | ||
for (size_t i = 0; i < specs->size(); ++i) | ||
{ | ||
auto spec = specs->getObject(i); | ||
if (!spec || !spec->has("spec-id")) | ||
continue; | ||
if (spec->getValue<Int32>("spec-id") == default_spec_id) | ||
return spec->has("fields") ? spec->getArray("fields") : nullptr; | ||
} | ||
} | ||
} | ||
|
||
return nullptr; | ||
} | ||
|
||
Poco::JSON::Array::Ptr findActiveSortFields(const Poco::JSON::Object::Ptr & metadata_obj) | ||
{ | ||
if (!metadata_obj || !metadata_obj->has("default-sort-order-id") || !metadata_obj->has("sort-orders")) | ||
return nullptr; | ||
|
||
const auto default_sort_order_id = metadata_obj->getValue<Int32>("default-sort-order-id"); | ||
auto orders = metadata_obj->getArray("sort-orders"); | ||
if (!orders) | ||
return nullptr; | ||
|
||
for (size_t i = 0; i < orders->size(); ++i) | ||
{ | ||
auto order = orders->getObject(i); | ||
if (!order || !order->has("order-id")) | ||
continue; | ||
if (order->getValue<Int32>("order-id") == default_sort_order_id) | ||
return order->has("fields") ? order->getArray("fields") : nullptr; | ||
} | ||
return nullptr; | ||
} | ||
|
||
String composeList( | ||
const Poco::JSON::Array::Ptr & fields, | ||
const IdToName & id_to_name, | ||
bool lookup_sort_modifiers) | ||
{ | ||
if (!fields || fields->size() == 0) | ||
return {}; | ||
|
||
Strings parts; | ||
parts.reserve(fields->size()); | ||
|
||
for (size_t i = 0; i < fields->size(); ++i) | ||
{ | ||
auto field = fields->getObject(i); | ||
if (!field) | ||
continue; | ||
|
||
const String transform = field->has("transform") ? field->getValue<String>("transform") : "identity"; | ||
String expr = formatTransform(transform, field, id_to_name); | ||
if (expr.empty()) | ||
continue; | ||
|
||
if (lookup_sort_modifiers) | ||
{ | ||
if (field->has("direction")) | ||
{ | ||
auto d = field->getValue<String>("direction"); | ||
expr += (Poco::icompare(d, "desc") == 0) ? "DESC" : "ASC"; | ||
} | ||
if (field->has("null-order")) | ||
{ | ||
auto n = field->getValue<String>("null-order"); | ||
expr += (Poco::icompare(n, "nulls-last") == 0) ? "NULLS LAST" : "NULLS FIRST"; | ||
} | ||
} | ||
|
||
parts.push_back(std::move(expr)); | ||
} | ||
|
||
if (parts.empty()) | ||
return {}; | ||
|
||
String res; | ||
for (size_t i = 0; i < parts.size(); ++i) | ||
{ | ||
if (i) res += ", "; | ||
res += parts[i]; | ||
} | ||
return res; | ||
} | ||
|
||
std::pair<std::optional<String>, std::optional<String>> extractIcebergKeys(const Poco::JSON::Object::Ptr & metadata_obj) | ||
{ | ||
std::optional<String> partition_key; | ||
std::optional<String> sort_key; | ||
|
||
if (metadata_obj) | ||
{ | ||
auto id_to_name = buildIdToNameMap(metadata_obj); | ||
|
||
partition_key = composeList(findActivePartitionFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ false); | ||
sort_key = composeList(findActiveSortFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ true); | ||
} | ||
|
||
return {partition_key, sort_key}; | ||
} | ||
|
||
} | ||
|
||
void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object) | ||
{ | ||
auto configuration_ptr = configuration.lock(); | ||
|
@@ -563,10 +754,11 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec | |
total_bytes = summary_object->getValue<Int64>(f_total_files_size); | ||
} | ||
|
||
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object); | ||
relevant_snapshot = IcebergSnapshot{ | ||
getManifestList(local_context, getProperFilePathFromMetadataInfo( | ||
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())), | ||
relevant_snapshot_id, total_rows, total_bytes}; | ||
relevant_snapshot_id, total_rows, total_bytes, partition_key, sorting_key}; | ||
|
||
if (!snapshot->has(f_schema_id)) | ||
throw Exception( | ||
|
@@ -1011,6 +1203,19 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons | |
return result; | ||
} | ||
|
||
std::optional<String> IcebergMetadata::partitionKey(ContextPtr) const | ||
{ | ||
SharedLockGuard lock(mutex); | ||
return relevant_snapshot->partition_key; | ||
} | ||
|
||
std::optional<String> IcebergMetadata::sortingKey(ContextPtr) const | ||
{ | ||
SharedLockGuard lock(mutex); | ||
return relevant_snapshot->sorting_key; | ||
} | ||
|
||
|
||
ObjectIterator IcebergMetadata::iterate( | ||
const ActionsDAG * filter_dag, | ||
FileProgressCallback callback, | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which docs/sample metadata object this implementation is based on?
Is it just https://iceberg.apache.org/spec/?h=schema+id#table-metadata-fields or is there anything else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, based on docs and test