Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ DEFINE_mBool(variant_use_cloud_schema_dict_cache, "true");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mInt32(variant_max_json_key_length, "255");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_mBool(variant_enable_duplicate_json_path_check, "false");
DEFINE_mBool(enable_vertical_compact_variant_subcolumns, "true");
DEFINE_mBool(enable_variant_doc_sparse_write_subcolumns, "true");
// Maximum depth of nested arrays to track with NestedGroup
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,8 @@ DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
DECLARE_mInt32(variant_max_json_key_length);
// Treat invalid json format str as string, instead of throwing exception if false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);
// Enable duplicate path check when parsing json into variant subcolumns/jsonb.
DECLARE_mBool(variant_enable_duplicate_json_path_check);
// Enable vertical compact subcolumns of variant column
DECLARE_mBool(enable_vertical_compact_variant_subcolumns);
DECLARE_mBool(enable_variant_doc_sparse_write_subcolumns);
Expand Down
6 changes: 4 additions & 2 deletions be/src/core/data_type_serde/data_type_variant_serde.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string>

#include "common/cast_set.h"
#include "common/config.h"
#include "common/exception.h"
#include "common/status.h"
#include "core/assert_cast.h"
Expand Down Expand Up @@ -107,10 +108,11 @@ Status DataTypeVariantSerDe::serialize_one_cell_to_json(const IColumn& column, i

Status DataTypeVariantSerDe::deserialize_one_cell_from_json(IColumn& column, Slice& slice,
const FormatOptions& options) const {
ParseConfig config;
ParseConfig parse_config;
parse_config.check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
StringRef json_ref(slice.data, slice.size);
RETURN_IF_CATCH_EXCEPTION(
variant_util::parse_json_to_variant(column, json_ref, nullptr, config));
variant_util::parse_json_to_variant(column, json_ref, nullptr, parse_config));
return Status::OK();
}

Expand Down
37 changes: 33 additions & 4 deletions be/src/exec/common/variant_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2016,36 +2016,64 @@ void parse_json_to_variant_impl(IColumn& column, const char* src, size_t length,
}
};

auto is_plain_path = [](const PathInData& path) {
for (const auto& part : path.get_parts()) {
if (part.is_nested || part.anonymous_array_level != 0) {
return false;
}
}
return true;
};

auto get_or_create_subcolumn = [&](const PathInData& path, size_t index_hint,
const FieldInfo& field_info) -> ColumnVariant::Subcolumn* {
if (column_variant.get_subcolumn(path, index_hint) == nullptr) {
auto* subcolumn = column_variant.get_subcolumn(path, index_hint);
if (subcolumn == nullptr && !path.empty() && config.check_duplicate_json_path &&
is_plain_path(path)) {
const auto& path_name = path.get_path();
for (auto& entry : column_variant.get_subcolumns()) {
if (entry->path.get_path() == path_name && is_plain_path(entry->path)) {
subcolumn = &entry->data;
break;
}
}
}
if (subcolumn == nullptr) {
if (path.has_nested_part()) {
column_variant.add_nested_subcolumn(path, field_info, old_num_rows);
} else {
column_variant.add_sub_column(path, old_num_rows);
}
subcolumn = column_variant.get_subcolumn(path, index_hint);
}
auto* subcolumn = column_variant.get_subcolumn(path, index_hint);
if (!subcolumn) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
path.get_path());
}
return subcolumn;
};

auto normalize_plain_path = [&](const PathInData& path) {
if (!config.check_duplicate_json_path || path.empty() || !is_plain_path(path)) {
return path;
}
return PathInData(path.get_path());
};

auto insert_into_subcolumn = [&](size_t i,
bool check_size_mismatch) -> ColumnVariant::Subcolumn* {
FieldInfo field_info;
get_field_info(values[i], &field_info);
if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
return nullptr;
}
auto* subcolumn = get_or_create_subcolumn(paths[i], i, field_info);
auto path = normalize_plain_path(paths[i]);
auto* subcolumn = get_or_create_subcolumn(path, i, field_info);
flush_defaults(subcolumn);
if (check_size_mismatch && subcolumn->size() != old_num_rows) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"subcolumn {} size missmatched, may contains duplicated entry",
paths[i].get_path());
path.get_path());
}
subcolumn->insert(std::move(values[i]), std::move(field_info));
return subcolumn;
Expand Down Expand Up @@ -2309,6 +2337,7 @@ Status parse_and_materialize_variant_columns(Block& block, const TabletSchema& t
// Deprecated legacy flatten-nested switch. Distinct from variant_enable_nested_group.
configs[i].deprecated_enable_flatten_nested =
tablet_schema.deprecated_variant_flatten_nested();
configs[i].check_duplicate_json_path = config::variant_enable_duplicate_json_path_check;
const auto& column = tablet_schema.column(variant_schema_pos[i]);
if (!column.is_variant_type()) {
return Status::InternalError("column is not variant type, column name: {}",
Expand Down
53 changes: 40 additions & 13 deletions be/src/util/json/json_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <algorithm>
#include <cassert>
#include <string_view>
#include <vector>

#include "common/cast_set.h"
// IWYU pragma: keep
Expand All @@ -46,6 +47,7 @@ std::optional<ParseResult> JSONDataParser<ParserImpl>::parse(const char* begin,
// deprecated_enable_flatten_nested controls nested path traversal
// NestedGroup expansion is now handled at storage layer
context.deprecated_enable_flatten_nested = config.deprecated_enable_flatten_nested;
context.check_duplicate_json_path = config.check_duplicate_json_path;
context.is_top_array = document.isArray();
traverse(document, context);
ParseResult result;
Expand All @@ -72,35 +74,57 @@ void JSONDataParser<ParserImpl>::traverse(const Element& element, ParseContext&
// Parse nested arrays to JsonbField
JsonbWriter writer;
traverseArrayAsJsonb(element.getArray(), writer);
ctx.paths.push_back(ctx.builder.get_parts());
ctx.values.push_back(Field::create_field<TYPE_JSONB>(
JsonbField(writer.getOutput()->getBuffer(), writer.getOutput()->getSize())));
tryAppendValue(
ctx, ctx.builder.get_parts(),
Field::create_field<TYPE_JSONB>(JsonbField(writer.getOutput()->getBuffer(),
writer.getOutput()->getSize())));
} else {
traverseArray(element.getArray(), ctx);
}
// we should set has_nested_in_flatten to false when traverse array finished for next array otherwise it will be true for next array
ctx.has_nested_in_flatten = false;
} else {
ctx.paths.push_back(ctx.builder.get_parts());
ctx.values.push_back(getValueAsField(element));
tryAppendValue(ctx, ctx.builder.get_parts(), getValueAsField(element));
}
}

template <typename ParserImpl>
bool JSONDataParser<ParserImpl>::tryAppendValue(ParseContext& ctx, const PathInData::Parts& path,
Field&& value, bool check_duplicate_path) {
if (check_duplicate_path && ctx.check_duplicate_json_path) {
PathInData path_in_data(path);
if (!ctx.visited_path_names.emplace(path_in_data.get_path()).second) {
return false;
}
}
ctx.paths.push_back(path);
ctx.values.push_back(std::move(value));
return true;
}

template <typename ParserImpl>
void JSONDataParser<ParserImpl>::traverseObject(const JSONObject& object, ParseContext& ctx) {
ctx.paths.reserve(ctx.paths.size() + object.size());
ctx.values.reserve(ctx.values.size() + object.size());
for (auto it = object.begin(); it != object.end(); ++it) {
const auto& [key, value] = *it;
auto check_key_length = [](const auto& key) {
const size_t max_key_length = cast_set<size_t>(config::variant_max_json_key_length);
if (key.size() > max_key_length) {
throw doris::Exception(
doris::ErrorCode::INVALID_ARGUMENT,
fmt::format("Key length exceeds maximum allowed size of {} bytes.",
max_key_length));
}
};
auto traverse_object_member = [&](const auto& key, const auto& value) {
check_key_length(key);
ctx.builder.append(key, false);
traverse(value, ctx);
ctx.builder.pop_back();
};

for (auto it = object.begin(); it != object.end(); ++it) {
const auto& [key, value] = *it;
traverse_object_member(key, value);
}
}

Expand Down Expand Up @@ -176,23 +200,24 @@ void JSONDataParser<ParserImpl>::traverseArray(const JSONArray& array, ParseCont
ParseArrayContext array_ctx;
array_ctx.has_nested_in_flatten = ctx.has_nested_in_flatten;
array_ctx.is_top_array = ctx.is_top_array;
array_ctx.check_duplicate_json_path = ctx.check_duplicate_json_path;
array_ctx.total_size = array.size();
for (auto it = array.begin(); it != array.end(); ++it) {
traverseArrayElement(*it, array_ctx);
++array_ctx.current_size;
}
auto&& arrays_by_path = array_ctx.arrays_by_path;
if (arrays_by_path.empty()) {
ctx.paths.push_back(ctx.builder.get_parts());
ctx.values.push_back(Field::create_field<TYPE_ARRAY>(Array()));
tryAppendValue(ctx, ctx.builder.get_parts(), Field::create_field<TYPE_ARRAY>(Array()));
} else {
ctx.paths.reserve(ctx.paths.size() + arrays_by_path.size());
ctx.values.reserve(ctx.values.size() + arrays_by_path.size());
for (auto it = arrays_by_path.begin(); it != arrays_by_path.end(); ++it) {
auto&& [path, path_array] = it->second;
/// Merge prefix path and path of array element.
ctx.paths.push_back(ctx.builder.append(path, true).get_parts());
ctx.values.push_back(Field::create_field<TYPE_ARRAY>(std::move(path_array)));
ctx.builder.append(path, true);
tryAppendValue(ctx, ctx.builder.get_parts(),
Field::create_field<TYPE_ARRAY>(std::move(path_array)));
ctx.builder.pop_back(path.size());
}
}
Expand All @@ -204,10 +229,12 @@ void JSONDataParser<ParserImpl>::traverseArrayElement(const Element& element,
ParseContext element_ctx;
element_ctx.has_nested_in_flatten = ctx.has_nested_in_flatten;
element_ctx.is_top_array = ctx.is_top_array;
element_ctx.check_duplicate_json_path = ctx.check_duplicate_json_path;
traverse(element, element_ctx);
auto& [_, paths, values, deprecated_flatten_nested, __, is_top_array] = element_ctx;
auto& paths = element_ctx.paths;
auto& values = element_ctx.values;

if (element_ctx.has_nested_in_flatten && is_top_array) {
if (element_ctx.has_nested_in_flatten && element_ctx.is_top_array) {
checkAmbiguousStructure(ctx, paths);
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/util/json/json_parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ void writeValueAsJsonb(const Element& element, JsonbWriter& writer) {

struct ParseConfig {
bool deprecated_enable_flatten_nested = false;
bool check_duplicate_json_path = false;
enum class ParseTo {
OnlySubcolumns = 0,
OnlyDocValueColumn = 1,
Expand All @@ -127,7 +128,9 @@ class JSONDataParser {
PathInDataBuilder builder;
std::vector<PathInData::Parts> paths;
std::vector<Field> values;
phmap::flat_hash_set<std::string> visited_path_names;
bool deprecated_enable_flatten_nested = false;
bool check_duplicate_json_path = false;
bool has_nested_in_flatten = false;
bool is_top_array = false;
};
Expand All @@ -141,10 +144,13 @@ class JSONDataParser {
KeyToSizes nested_sizes_by_key;
bool has_nested_in_flatten = false;
bool is_top_array = false;
bool check_duplicate_json_path = false;
};
void traverse(const Element& element, ParseContext& ctx);
void traverseObject(const JSONObject& object, ParseContext& ctx);
void traverseArray(const JSONArray& array, ParseContext& ctx);
bool tryAppendValue(ParseContext& ctx, const PathInData::Parts& path, Field&& value,
bool check_duplicate_path = true);
void traverseArrayElement(const Element& element, ParseArrayContext& ctx);
void checkAmbiguousStructure(const ParseArrayContext& ctx,
const std::vector<PathInData::Parts>& paths);
Expand Down
Loading
Loading