From aa5bb0dda34bf643d61e96f51a456cf876c0a0eb Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Sun, 12 May 2024 19:59:45 -0400 Subject: [PATCH] [HUDI-4732] Add support for confluent schema registry with proto (#11070) Co-authored-by: Y Ethan Guo --- hudi-utilities/pom.xml | 7 ++- .../utilities/config/KafkaSourceConfig.java | 8 +++ .../deser/KafkaAvroSchemaDeserializer.java | 4 +- .../schema/ProtoClassBasedSchemaProvider.java | 10 +-- .../ProtoSchemaToAvroSchemaConverter.java | 43 +++++++++++++ .../utilities/sources/ProtoKafkaSource.java | 40 +++++++++--- .../sources/helpers/ProtoConversionUtil.java | 56 ++++++++++++++++- .../TestKafkaAvroSchemaDeserializer.java | 8 +-- .../TestProtoSchemaToAvroSchemaConverter.java | 50 +++++++++++++++ .../sources/TestProtoKafkaSource.java | 63 +++++++++++++++++-- packaging/hudi-utilities-bundle/pom.xml | 1 + packaging/hudi-utilities-slim-bundle/pom.xml | 1 + pom.xml | 34 +++++++++- 13 files changed, 288 insertions(+), 37 deletions(-) create mode 100644 hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 3a7a9d6a712d4..47c172b7791ac 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -361,12 +361,10 @@ io.confluent kafka-avro-serializer - ${confluent.version} io.confluent common-config - ${confluent.version} io.confluent @@ -376,7 +374,10 @@ io.confluent kafka-schema-registry-client - ${confluent.version} + + + io.confluent + kafka-protobuf-serializer diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java index 024712f8cdd22..6215e99d66533 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java @@ -24,6 +24,8 @@ import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.HoodieConfig; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; + import javax.annotation.concurrent.Immutable; import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX; @@ -120,6 +122,12 @@ public class KafkaSourceConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Kafka consumer strategy for reading data."); + public static final ConfigProperty KAFKA_PROTO_VALUE_DESERIALIZER_CLASS = ConfigProperty + .key(PREFIX + "proto.value.deserializer.class") + .defaultValue(ByteArrayDeserializer.class.getName()) + .sinceVersion("0.15.0") + .withDocumentation("Kafka Proto Payload Deserializer Class"); + /** * Kafka reset offset strategies. */ diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java index 246be5f8ec614..4673eceed1577 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java @@ -60,7 +60,6 @@ public void configure(Map configs, boolean isKey) { /** * We need to inject sourceSchema instead of reader schema during deserialization or later stages of the pipeline. * - * @param includeSchemaAndVersion * @param topic * @param isKey * @param payload @@ -70,13 +69,12 @@ public void configure(Map configs, boolean isKey) { */ @Override protected Object deserialize( - boolean includeSchemaAndVersion, String topic, Boolean isKey, byte[] payload, Schema readerSchema) throws SerializationException { - return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema); + return super.deserialize(topic, isKey, payload, sourceSchema); } protected TypedProperties getConvertToTypedProperties(Map configs) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java index 7d6981efb40d6..a4b485e1634ef 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java @@ -32,13 +32,8 @@ import java.util.Collections; import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties; -import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; -import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys; import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME; -import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH; -import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS; -import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS; /** * A schema provider that takes in a class name for a generated protobuf class that is on the classpath. @@ -75,10 +70,7 @@ public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jss super(props, jssc); checkRequiredConfigProperties(props, Collections.singletonList(PROTO_SCHEMA_CLASS_NAME)); String className = getStringWithAltKeys(config, PROTO_SCHEMA_CLASS_NAME); - boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS); - int maxRecursionDepth = getIntWithAltKeys(props, PROTO_SCHEMA_MAX_RECURSION_DEPTH); - boolean timestampsAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS); - ProtoConversionUtil.SchemaConfig schemaConfig = new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords); + ProtoConversionUtil.SchemaConfig schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(props); try { schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString(); } catch (Exception e) { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java new file mode 100644 index 0000000000000..78ef25e9a040b --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema.converter; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.schema.SchemaRegistryProvider; +import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil; + +import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema; + +import java.io.IOException; + +/** + * Converts a protobuf schema from the schema registry to an Avro schema. + */ +public class ProtoSchemaToAvroSchemaConverter implements SchemaRegistryProvider.SchemaConverter { + private final ProtoConversionUtil.SchemaConfig schemaConfig; + + public ProtoSchemaToAvroSchemaConverter(TypedProperties config) { + this.schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(config); + } + + @Override + public String convert(String schema) throws IOException { + ProtobufSchema protobufSchema = new ProtobufSchema(schema); + return ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(), schemaConfig).toString(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java index 1dc731b5f95d8..a56c991bebd17 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java @@ -19,9 +19,12 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.utilities.UtilHelpers; +import org.apache.hudi.utilities.config.KafkaSourceConfig; import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig; import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; @@ -31,6 +34,8 @@ import org.apache.hudi.utilities.streamer.StreamContext; import com.google.protobuf.Message; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.api.java.JavaRDD; @@ -52,8 +57,8 @@ * Reads protobuf serialized Kafka data, based on a provided class name. */ public class ProtoKafkaSource extends KafkaSource> { - - private final String className; + private final Option className; + private final String deserializerName; public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession, SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) { @@ -63,11 +68,18 @@ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Sp public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) { super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics, new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), properties, sparkContext), streamContext.getSourceProfileSupplier())); - checkRequiredConfigProperties(props, Collections.singletonList( - ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME)); - props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class); - props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class); - className = getStringWithAltKeys(props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME); + this.deserializerName = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true); + if (!deserializerName.equals(ByteArrayDeserializer.class.getName()) && !deserializerName.equals(KafkaProtobufDeserializer.class.getName())) { + throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and KafkaProtobufDeserializer are supported for ProtoKafkaSource"); + } + if (deserializerName.equals(ByteArrayDeserializer.class.getName())) { + checkRequiredConfigProperties(props, Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME)); + className = Option.of(getStringWithAltKeys(props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME)); + } else { + className = Option.empty(); + } + props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName()); + props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, deserializerName); this.offsetGen = new KafkaOffsetGen(props); if (this.shouldAddOffsets) { throw new HoodieReadFromSourceException("Appending kafka offsets to ProtoKafkaSource is not supported"); @@ -76,9 +88,17 @@ public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContex @Override protected JavaRDD toBatch(OffsetRange[] offsetRanges) { - ProtoDeserializer deserializer = new ProtoDeserializer(className); - return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, - LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value())); + if (deserializerName.equals(ByteArrayDeserializer.class.getName())) { + ValidationUtils.checkArgument( + className.isPresent(), + ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + " config must be present."); + ProtoDeserializer deserializer = new ProtoDeserializer(className.get()); + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value())); + } else { + return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges, + LocationStrategies.PreferConsistent()).map(ConsumerRecord::value); + } } private static class ProtoDeserializer implements Serializable { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java index cf8532d65c855..c16c7e085cb1f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java @@ -17,15 +17,18 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.internal.schema.HoodieSchemaException; import com.google.protobuf.BoolValue; import com.google.protobuf.ByteString; import com.google.protobuf.BytesValue; import com.google.protobuf.Descriptors; import com.google.protobuf.DoubleValue; +import com.google.protobuf.DynamicMessage; import com.google.protobuf.FloatValue; import com.google.protobuf.Int32Value; import com.google.protobuf.Int64Value; @@ -56,7 +59,12 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys; +import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH; +import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS; +import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS; /** * A utility class to help translate from Proto to Avro. @@ -74,6 +82,17 @@ public static Schema getAvroSchemaForMessageClass(Class clazz, SchemaConfig sche return new AvroSupport(schemaConfig).getSchema(clazz); } + /** + * Creates an Avro {@link Schema} for the provided {@link Descriptors.Descriptor}. + * Intended for use when the descriptor is provided by an external registry. + * @param descriptor The protobuf descriptor + * @param schemaConfig configuration used to determine how to handle particular cases when converting from the proto schema + * @return An Avro schema + */ + public static Schema getAvroSchemaForMessageDescriptor(Descriptors.Descriptor descriptor, SchemaConfig schemaConfig) { + return new AvroSupport(schemaConfig).getSchema(descriptor); + } + /** * Converts the provided {@link Message} into an avro {@link GenericRecord} with the provided schema. * @param schema target schema to convert into @@ -101,6 +120,13 @@ public SchemaConfig(boolean wrappedPrimitivesAsRecords, int maxRecursionDepth, b this.timestampsAsRecords = timestampsAsRecords; } + public static SchemaConfig fromProperties(TypedProperties props) { + boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS); + int maxRecursionDepth = getIntWithAltKeys(props, PROTO_SCHEMA_MAX_RECURSION_DEPTH); + boolean timestampsAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS); + return new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords); + } + public boolean isWrappedPrimitivesAsRecords() { return wrappedPrimitivesAsRecords; } @@ -157,11 +183,11 @@ private AvroSupport(SchemaConfig schemaConfig) { this.timestampsAsRecords = schemaConfig.isTimestampsAsRecords(); } - public static GenericRecord convert(Schema schema, Message message) { + static GenericRecord convert(Schema schema, Message message) { return (GenericRecord) convertObject(schema, message); } - public Schema getSchema(Class c) { + Schema getSchema(Class c) { return SCHEMA_CACHE.computeIfAbsent(new SchemaCacheKey(c, wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords), key -> { try { Object descriptor = c.getMethod("getDescriptor").invoke(null); @@ -177,6 +203,16 @@ public Schema getSchema(Class c) { }); } + /** + * Translates a Proto Message descriptor into an Avro Schema. + * Does not cache since external system may evolve the schema and that can result in a stale version of the avro schema. + * @param descriptor the descriptor for the proto message + * @return an avro schema + */ + Schema getSchema(Descriptors.Descriptor descriptor) { + return getMessageSchema(descriptor, new CopyOnWriteMap<>(), getNamespace(descriptor.getFullName())); + } + private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) { List symbols = new ArrayList<>(enumDescriptor.getValues().size()); for (Descriptors.EnumValueDescriptor valueDescriptor : enumDescriptor.getValues()) { @@ -402,7 +438,21 @@ private static Object convertObject(Schema schema, Object value) { if (value instanceof Message) { // check if this is a Timestamp if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) { - return Timestamps.toMicros((Timestamp) value); + if (value instanceof Timestamp) { + return Timestamps.toMicros((Timestamp) value); + } else if (value instanceof DynamicMessage) { + Timestamp.Builder builder = Timestamp.newBuilder(); + ((DynamicMessage) value).getAllFields().forEach((fieldDescriptor, fieldValue) -> { + if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.seconds")) { + builder.setSeconds((Long) fieldValue); + } else if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.nanos")) { + builder.setNanos((Integer) fieldValue); + } + }); + return Timestamps.toMicros(builder.build()); + } else { + throw new HoodieSchemaException("Unexpected message type while handling timestamps: " + value.getClass().getName()); + } } else { tmpValue = getWrappedValue(value); } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java index 16d190ac45d15..4fa582209ae17 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java @@ -93,7 +93,7 @@ private IndexedRecord createExtendUserRecord() { } /** - * Tests {@link KafkaAvroSchemaDeserializer#deserialize(Boolean, String, Boolean, byte[], Schema)}. + * Tests {@link KafkaAvroSchemaDeserializer#deserialize(String, Boolean, byte[], Schema)}. */ @Test public void testKafkaAvroSchemaDeserializer() { @@ -105,7 +105,7 @@ public void testKafkaAvroSchemaDeserializer() { avroDeserializer.configure(new HashMap(config), false); bytesOrigRecord = avroSerializer.serialize(topic, avroRecord); // record is serialized in orig schema and deserialized using same schema. - assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema)); + assertEquals(avroRecord, avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema)); IndexedRecord avroRecordWithAllField = createExtendUserRecord(); byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField); @@ -115,12 +115,12 @@ public void testKafkaAvroSchemaDeserializer() { avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config)); avroDeserializer.configure(new HashMap(config), false); // record is serialized w/ evolved schema, and deserialized w/ evolved schema - IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema); + IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesExtendedRecord, evolSchema); assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual); assertEquals(avroRecordWithAllFieldActual.getSchema(), evolSchema); // read old record w/ evolved schema. - IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema); + IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema); // record won't be equal to original record as we read w/ evolved schema. "age" will be added w/ default value of null assertNotEquals(avroRecord, actualRec); GenericRecord genericRecord = (GenericRecord) actualRec; diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java new file mode 100644 index 0000000000000..fed4bc5e0ed2e --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.schema.converter; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig; +import org.apache.hudi.utilities.test.proto.Parent; + +import org.apache.avro.Schema; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class TestProtoSchemaToAvroSchemaConverter { + @Test + void testConvert() throws Exception { + TypedProperties properties = new TypedProperties(); + properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName()); + Schema.Parser parser = new Schema.Parser(); + String actual = new ProtoSchemaToAvroSchemaConverter(properties).convert(getProtoSchemaString()); + Schema actualSchema = new Schema.Parser().parse(actual); + + Schema expectedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_default_limit.avsc")); + assertEquals(expectedSchema, actualSchema); + } + + private String getProtoSchemaString() throws IOException, URISyntaxException { + return new String(Files.readAllBytes(Paths.get(getClass().getClassLoader().getResource("schema-provider/proto/recursive.proto").toURI()))); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java index 662cd1dd985f9..b63c7c29a24da 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java @@ -24,6 +24,7 @@ import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig; import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider; import org.apache.hudi.utilities.schema.SchemaProvider; +import org.apache.hudi.utilities.schema.SchemaRegistryProvider; import org.apache.hudi.utilities.streamer.DefaultStreamContext; import org.apache.hudi.utilities.streamer.SourceFormatAdapter; import org.apache.hudi.utilities.test.proto.Nested; @@ -37,10 +38,14 @@ import com.google.protobuf.FloatValue; import com.google.protobuf.Int32Value; import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; import com.google.protobuf.StringValue; import com.google.protobuf.UInt32Value; import com.google.protobuf.UInt64Value; +import com.google.protobuf.util.JsonFormat; import com.google.protobuf.util.Timestamps; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; +import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer; import org.apache.avro.generic.GenericRecord; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; @@ -55,6 +60,7 @@ import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -64,13 +70,16 @@ import java.util.stream.IntStream; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS; import static org.junit.jupiter.api.Assertions.assertEquals; /** * Tests against {@link ProtoKafkaSource}. */ public class TestProtoKafkaSource extends BaseTestKafkaSource { + private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace(); private static final Random RANDOM = new Random(); + private static final String MOCK_REGISTRY_URL = "mock://127.0.0.1:8081"; protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) { TypedProperties props = new TypedProperties(); @@ -93,6 +102,28 @@ protected SourceFormatAdapter createSource(TypedProperties props) { return new SourceFormatAdapter(protoKafkaSource); } + @Test + public void testProtoKafkaSourceWithConfluentProtoDeserialization() { + final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceWithConfluentDeserializer"; + testUtils.createTopic(topic, 2); + TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); + props.put(KAFKA_PROTO_VALUE_DESERIALIZER_CLASS.key(), + "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"); + props.put("schema.registry.url", MOCK_REGISTRY_URL); + props.put("hoodie.streamer.schemaprovider.registry.url", MOCK_REGISTRY_URL); + props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true"); + // class name is not required so we'll remove it + props.remove(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key()); + SchemaProvider schemaProvider = new SchemaRegistryProvider(props, jsc()); + ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), schemaProvider, metrics); + List messages = createSampleMessages(1000); + sendMessagesToKafkaWithConfluentSerializer(topic, 2, messages); + // Assert messages are read correctly + JavaRDD messagesRead = protoKafkaSource.fetchNext(Option.empty(), 1000).getBatch().get(); + assertEquals(messages.stream().map(this::protoToJson).collect(Collectors.toSet()), + new HashSet<>(messagesRead.map(message -> PRINTER.print(message)).collect())); + } + @Test public void testProtoKafkaSourceWithFlattenWrappedPrimitives() { @@ -196,7 +227,7 @@ private static Nested generateRandomNestedMessage() { @Override protected void sendMessagesToKafka(String topic, int count, int numPartitions) { List messages = createSampleMessages(count); - try (Producer producer = new KafkaProducer<>(getProducerProperties())) { + try (Producer producer = new KafkaProducer<>(getProducerProperties(false))) { for (int i = 0; i < messages.size(); i++) { // use consistent keys to get even spread over partitions for test expectations producer.send(new ProducerRecord<>(topic, Integer.toString(i % numPartitions), messages.get(i).toByteArray())); @@ -204,14 +235,38 @@ protected void sendMessagesToKafka(String topic, int count, int numPartitions) { } } - private Properties getProducerProperties() { + private void sendMessagesToKafkaWithConfluentSerializer(String topic, int numPartitions, List messages) { + try (Producer producer = new KafkaProducer<>(getProducerProperties(true))) { + for (int i = 0; i < messages.size(); i++) { + // use consistent keys to get even spread over partitions for test expectations + producer.send(new ProducerRecord<>(topic, Integer.toString(i % numPartitions), messages.get(i))); + } + } + } + + private Properties getProducerProperties(boolean useConfluentProtobufSerializer) { Properties props = new Properties(); props.put("bootstrap.servers", testUtils.brokerAddress()); - props.put("value.serializer", ByteArraySerializer.class.getName()); - // Key serializer is required. + if (useConfluentProtobufSerializer) { + props.put("value.serializer", KafkaProtobufSerializer.class.getName()); + props.put("value.deserializer", KafkaProtobufDeserializer.class.getName()); + props.put("schema.registry.url", MOCK_REGISTRY_URL); + props.put("auto.register.schemas", "true"); + } else { + props.put("value.serializer", ByteArraySerializer.class.getName()); + // Key serializer is required. + } props.put("key.serializer", StringSerializer.class.getName()); // wait for all in-sync replicas to ack sends props.put("acks", "all"); return props; } + + private String protoToJson(Message input) { + try { + return PRINTER.print(input); + } catch (Exception e) { + throw new RuntimeException("Failed to convert proto to json", e); + } + } } diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml index b992e5bbeb8c4..10c1cccd4e2ec 100644 --- a/packaging/hudi-utilities-bundle/pom.xml +++ b/packaging/hudi-utilities-bundle/pom.xml @@ -133,6 +133,7 @@ io.confluent:common-config io.confluent:common-utils io.confluent:kafka-schema-registry-client + io.confluent:kafka-protobuf-serializer io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite io.dropwizard.metrics:metrics-jmx diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml b/packaging/hudi-utilities-slim-bundle/pom.xml index 3919b103465c4..0a2c271e6ce5c 100644 --- a/packaging/hudi-utilities-slim-bundle/pom.xml +++ b/packaging/hudi-utilities-slim-bundle/pom.xml @@ -119,6 +119,7 @@ io.confluent:common-config io.confluent:common-utils io.confluent:kafka-schema-registry-client + io.confluent:kafka-protobuf-serializer io.dropwizard.metrics:metrics-core io.dropwizard.metrics:metrics-graphite io.dropwizard.metrics:metrics-jmx diff --git a/pom.xml b/pom.xml index 3b9cc55562a2a..2fd99672a2f94 100644 --- a/pom.xml +++ b/pom.xml @@ -107,7 +107,7 @@ 2.4.5 3.1.1.4 3.4.1.1 - 5.3.4 + 5.5.0 2.17 3.0.1-b12 1.10.1 @@ -934,6 +934,11 @@ ${glassfish.el.version} provided + + org.glassfish.jersey.ext + jersey-bean-validation + ${glassfish.version} + @@ -1772,6 +1777,33 @@ + + + + io.confluent + kafka-avro-serializer + ${confluent.version} + + + io.confluent + common-config + ${confluent.version} + + + io.confluent + common-utils + ${confluent.version} + + + io.confluent + kafka-schema-registry-client + ${confluent.version} + + + io.confluent + kafka-protobuf-serializer + ${confluent.version} +