Skip to content

Kafka Connect: Support VARIANT when record convert#15283

Open
seokyun-ha-toss wants to merge 4 commits intoapache:mainfrom
seokyun-ha-toss:support-variant-for-sink-connector
Open

Kafka Connect: Support VARIANT when record convert#15283
seokyun-ha-toss wants to merge 4 commits intoapache:mainfrom
seokyun-ha-toss:support-variant-for-sink-connector

Conversation

@seokyun-ha-toss
Copy link

Summary

Add support for converting arbitrary Java objects (e.g. Map<String, Object>, lists, primitives) into Iceberg Variant type in the Kafka Connect RecordConverter. Nested maps and lists are converted recursively so that structures like {"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}} are correctly represented as a single Variant.

Motivation

Kafka Connect payloads often come as schema-less or JSON-like maps. To write them into Iceberg tables with a Variant column, the connector must convert these Java objects into the Variant format (metadata + value) and support nested maps/arrays without losing structure or key names.

Behaviour

Input Result
Primitives (String, int, long, boolean, etc.) Single metadata (empty) + corresponding Variant primitive.
Flat map e.g. {"a": 1, "b": "x"} One metadata with keys ["a", "b"], one ShreddedObject with two fields.
Nested map e.g. {"user": {"name": "alice", "address": {"city": "Seoul", "zip": "12345"}}} One shared metadata for all keys; root and nested objects as ShreddedObjects with consistent field IDs.
Lists Converted to VariantArray with elements converted recursively.
Already Variant / ByteBuffer Pass-through or Variant.from(ByteBuffer) where appropriate.

Relates

Thanks, Good Day!

return convertLocalDateTime(value);
}

protected Variant convertVariantValue(Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is protected? If it is intended to be overridden it would be good to document the contract that implementors must follow (and what the default implementation does).

I guess other methods in this class don't do that, but seems like maybe we should start?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed the existing pattern of the convertXXX() methods, which are all defined as protected. For consistency, I kept the same visibility here.

Please let me know if you think it should be adjusted.

if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
List<String> names = Lists.newArrayList();
map.keySet().stream().map(Object::toString).forEach(names::add);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do the iteration once by visiting the entry set below?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems more consistent with the loop below on line 509

return convertLocalDateTime(value);
}

protected Variant convertVariantValue(Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it is generally done in this project but it might be good to consider having protection against stack overflow on recursion

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're recursing through the schema/object structure, we don't assume the need to protect against stack overflow (either you can traverse the structure or not). I guess there may be some way to construct an object that will cycle, but that's not typical with how the object mapping works in kafka (it would probably fail upstream of this point anyway).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. However, as @danielcweeks mentioned, actual Kafka values don't have cycles, so the recursion ends at some point.

In my opinion, the stack overflow concern is unavoidable when traversing arbitrarily nested values. It depends on the size and depth of the input data, and in many cases we cannot control it.

Also, some methods (convertValue(), convertStructValue(), convertListValue()) already work recursively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern was more around, very deeply nested values (not necessarily infinite recursion), but if we don't check these in the project, then I don't have concern for not doing so here

* Collects all field names (map keys) from the entire object tree. Used to build a single
* VariantMetadata for the whole Variant (required for nested maps).
*/
private static List<String> collectFieldNames(Object value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like you can maybe avoid some object churn by passing through a Set that gets elements added to as it goes down on recursion?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to @emkornfield's comment. We should avoid navigating the object multiple times.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! I'll apply your feedbacks! Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if two pass is strictly avoidable as it looks like we are trying to sort keys globally?

if (value instanceof byte[]) {
return Variants.of(ByteBuffer.wrap((byte[]) value));
}
if (value instanceof BigDecimal) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be folded underneath Number handling?

if (num.doubleValue() == num.longValue()) {
return Variants.of(num.longValue());
}
return Variants.of(num.doubleValue());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems safer to throw an exception here, otherwise we potentially have data loss (or at least this should be optional)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we should convert it to BigDecimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with failing if it's not a known type. We shouldn't try to massage numeric values if we don't know exactly what is being represented.

assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class);
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be more maintainable to write tests against convertVariantValue instead of the static helpers.

Also it seems like there should be tests for convertVariantValue specifically?

if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
List<String> names = Lists.newArrayList();
map.keySet().stream().map(Object::toString).forEach(names::add);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variant keys must be strings, but an arbitrary map passed in through KC does not have that requirement, so we should protect against it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants