diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index 3a7a9d6a712d4..47c172b7791ac 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -361,12 +361,10 @@
io.confluent
kafka-avro-serializer
- ${confluent.version}
io.confluent
common-config
- ${confluent.version}
io.confluent
@@ -376,7 +374,10 @@
io.confluent
kafka-schema-registry-client
- ${confluent.version}
+
+
+ io.confluent
+ kafka-protobuf-serializer
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 024712f8cdd22..6215e99d66533 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -24,6 +24,8 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
import javax.annotation.concurrent.Immutable;
import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
@@ -120,6 +122,12 @@ public class KafkaSourceConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Kafka consumer strategy for reading data.");
+ public static final ConfigProperty KAFKA_PROTO_VALUE_DESERIALIZER_CLASS = ConfigProperty
+ .key(PREFIX + "proto.value.deserializer.class")
+ .defaultValue(ByteArrayDeserializer.class.getName())
+ .sinceVersion("0.15.0")
+ .withDocumentation("Kafka Proto Payload Deserializer Class");
+
/**
* Kafka reset offset strategies.
*/
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
index 246be5f8ec614..4673eceed1577 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java
@@ -60,7 +60,6 @@ public void configure(Map configs, boolean isKey) {
/**
* We need to inject sourceSchema instead of reader schema during deserialization or later stages of the pipeline.
*
- * @param includeSchemaAndVersion
* @param topic
* @param isKey
* @param payload
@@ -70,13 +69,12 @@ public void configure(Map configs, boolean isKey) {
*/
@Override
protected Object deserialize(
- boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchema)
throws SerializationException {
- return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema);
+ return super.deserialize(topic, isKey, payload, sourceSchema);
}
protected TypedProperties getConvertToTypedProperties(Map configs) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
index 7d6981efb40d6..a4b485e1634ef 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/ProtoClassBasedSchemaProvider.java
@@ -32,13 +32,8 @@
import java.util.Collections;
import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
-import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
-import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME;
-import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
-import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
-import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;
/**
* A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
@@ -75,10 +70,7 @@ public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jss
super(props, jssc);
checkRequiredConfigProperties(props, Collections.singletonList(PROTO_SCHEMA_CLASS_NAME));
String className = getStringWithAltKeys(config, PROTO_SCHEMA_CLASS_NAME);
- boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS);
- int maxRecursionDepth = getIntWithAltKeys(props, PROTO_SCHEMA_MAX_RECURSION_DEPTH);
- boolean timestampsAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS);
- ProtoConversionUtil.SchemaConfig schemaConfig = new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords);
+ ProtoConversionUtil.SchemaConfig schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(props);
try {
schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString();
} catch (Exception e) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
new file mode 100644
index 0000000000000..78ef25e9a040b
--- /dev/null
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/converter/ProtoSchemaToAvroSchemaConverter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema.converter;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
+import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;
+
+import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;
+
+import java.io.IOException;
+
+/**
+ * Converts a protobuf schema from the schema registry to an Avro schema.
+ */
+public class ProtoSchemaToAvroSchemaConverter implements SchemaRegistryProvider.SchemaConverter {
+ private final ProtoConversionUtil.SchemaConfig schemaConfig;
+
+ public ProtoSchemaToAvroSchemaConverter(TypedProperties config) {
+ this.schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(config);
+ }
+
+ @Override
+ public String convert(String schema) throws IOException {
+ ProtobufSchema protobufSchema = new ProtobufSchema(schema);
+ return ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(), schemaConfig).toString();
+ }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
index 1dc731b5f95d8..a56c991bebd17 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ProtoKafkaSource.java
@@ -19,9 +19,12 @@
package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
@@ -31,6 +34,8 @@
import org.apache.hudi.utilities.streamer.StreamContext;
import com.google.protobuf.Message;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
@@ -52,8 +57,8 @@
* Reads protobuf serialized Kafka data, based on a provided class name.
*/
public class ProtoKafkaSource extends KafkaSource> {
-
- private final String className;
+ private final Option className;
+ private final String deserializerName;
public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
@@ -63,11 +68,18 @@ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Sp
public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics,
new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), properties, sparkContext), streamContext.getSourceProfileSupplier()));
- checkRequiredConfigProperties(props, Collections.singletonList(
- ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
- props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
- props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
- className = getStringWithAltKeys(props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME);
+ this.deserializerName = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true);
+ if (!deserializerName.equals(ByteArrayDeserializer.class.getName()) && !deserializerName.equals(KafkaProtobufDeserializer.class.getName())) {
+ throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and KafkaProtobufDeserializer are supported for ProtoKafkaSource");
+ }
+ if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
+ checkRequiredConfigProperties(props, Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
+ className = Option.of(getStringWithAltKeys(props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
+ } else {
+ className = Option.empty();
+ }
+ props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
+ props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, deserializerName);
this.offsetGen = new KafkaOffsetGen(props);
if (this.shouldAddOffsets) {
throw new HoodieReadFromSourceException("Appending kafka offsets to ProtoKafkaSource is not supported");
@@ -76,9 +88,17 @@ public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContex
@Override
protected JavaRDD toBatch(OffsetRange[] offsetRanges) {
- ProtoDeserializer deserializer = new ProtoDeserializer(className);
- return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
- LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value()));
+ if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
+ ValidationUtils.checkArgument(
+ className.isPresent(),
+ ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + " config must be present.");
+ ProtoDeserializer deserializer = new ProtoDeserializer(className.get());
+ return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value()));
+ } else {
+ return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
+ LocationStrategies.PreferConsistent()).map(ConsumerRecord::value);
+ }
}
private static class ProtoDeserializer implements Serializable {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
index cf8532d65c855..c16c7e085cb1f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/ProtoConversionUtil.java
@@ -17,15 +17,18 @@
package org.apache.hudi.utilities.sources.helpers;
+import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.internal.schema.HoodieSchemaException;
import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DoubleValue;
+import com.google.protobuf.DynamicMessage;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
@@ -56,7 +59,12 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
+import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
+import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
+import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;
/**
* A utility class to help translate from Proto to Avro.
@@ -74,6 +82,17 @@ public static Schema getAvroSchemaForMessageClass(Class clazz, SchemaConfig sche
return new AvroSupport(schemaConfig).getSchema(clazz);
}
+ /**
+ * Creates an Avro {@link Schema} for the provided {@link Descriptors.Descriptor}.
+ * Intended for use when the descriptor is provided by an external registry.
+ * @param descriptor The protobuf descriptor
+ * @param schemaConfig configuration used to determine how to handle particular cases when converting from the proto schema
+ * @return An Avro schema
+ */
+ public static Schema getAvroSchemaForMessageDescriptor(Descriptors.Descriptor descriptor, SchemaConfig schemaConfig) {
+ return new AvroSupport(schemaConfig).getSchema(descriptor);
+ }
+
/**
* Converts the provided {@link Message} into an avro {@link GenericRecord} with the provided schema.
* @param schema target schema to convert into
@@ -101,6 +120,13 @@ public SchemaConfig(boolean wrappedPrimitivesAsRecords, int maxRecursionDepth, b
this.timestampsAsRecords = timestampsAsRecords;
}
+ public static SchemaConfig fromProperties(TypedProperties props) {
+ boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS);
+ int maxRecursionDepth = getIntWithAltKeys(props, PROTO_SCHEMA_MAX_RECURSION_DEPTH);
+ boolean timestampsAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS);
+ return new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords);
+ }
+
public boolean isWrappedPrimitivesAsRecords() {
return wrappedPrimitivesAsRecords;
}
@@ -157,11 +183,11 @@ private AvroSupport(SchemaConfig schemaConfig) {
this.timestampsAsRecords = schemaConfig.isTimestampsAsRecords();
}
- public static GenericRecord convert(Schema schema, Message message) {
+ static GenericRecord convert(Schema schema, Message message) {
return (GenericRecord) convertObject(schema, message);
}
- public Schema getSchema(Class c) {
+ Schema getSchema(Class c) {
return SCHEMA_CACHE.computeIfAbsent(new SchemaCacheKey(c, wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords), key -> {
try {
Object descriptor = c.getMethod("getDescriptor").invoke(null);
@@ -177,6 +203,16 @@ public Schema getSchema(Class c) {
});
}
+ /**
+ * Translates a Proto Message descriptor into an Avro Schema.
+ * Does not cache since external system may evolve the schema and that can result in a stale version of the avro schema.
+ * @param descriptor the descriptor for the proto message
+ * @return an avro schema
+ */
+ Schema getSchema(Descriptors.Descriptor descriptor) {
+ return getMessageSchema(descriptor, new CopyOnWriteMap<>(), getNamespace(descriptor.getFullName()));
+ }
+
private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) {
List symbols = new ArrayList<>(enumDescriptor.getValues().size());
for (Descriptors.EnumValueDescriptor valueDescriptor : enumDescriptor.getValues()) {
@@ -402,7 +438,21 @@ private static Object convertObject(Schema schema, Object value) {
if (value instanceof Message) {
// check if this is a Timestamp
if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
- return Timestamps.toMicros((Timestamp) value);
+ if (value instanceof Timestamp) {
+ return Timestamps.toMicros((Timestamp) value);
+ } else if (value instanceof DynamicMessage) {
+ Timestamp.Builder builder = Timestamp.newBuilder();
+ ((DynamicMessage) value).getAllFields().forEach((fieldDescriptor, fieldValue) -> {
+ if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.seconds")) {
+ builder.setSeconds((Long) fieldValue);
+ } else if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.nanos")) {
+ builder.setNanos((Integer) fieldValue);
+ }
+ });
+ return Timestamps.toMicros(builder.build());
+ } else {
+ throw new HoodieSchemaException("Unexpected message type while handling timestamps: " + value.getClass().getName());
+ }
} else {
tmpValue = getWrappedValue(value);
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
index 16d190ac45d15..4fa582209ae17 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java
@@ -93,7 +93,7 @@ private IndexedRecord createExtendUserRecord() {
}
/**
- * Tests {@link KafkaAvroSchemaDeserializer#deserialize(Boolean, String, Boolean, byte[], Schema)}.
+ * Tests {@link KafkaAvroSchemaDeserializer#deserialize(String, Boolean, byte[], Schema)}.
*/
@Test
public void testKafkaAvroSchemaDeserializer() {
@@ -105,7 +105,7 @@ public void testKafkaAvroSchemaDeserializer() {
avroDeserializer.configure(new HashMap(config), false);
bytesOrigRecord = avroSerializer.serialize(topic, avroRecord);
// record is serialized in orig schema and deserialized using same schema.
- assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema));
+ assertEquals(avroRecord, avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema));
IndexedRecord avroRecordWithAllField = createExtendUserRecord();
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);
@@ -115,12 +115,12 @@ public void testKafkaAvroSchemaDeserializer() {
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
// record is serialized w/ evolved schema, and deserialized w/ evolved schema
- IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema);
+ IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesExtendedRecord, evolSchema);
assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual);
assertEquals(avroRecordWithAllFieldActual.getSchema(), evolSchema);
// read old record w/ evolved schema.
- IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema);
+ IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema);
// record won't be equal to original record as we read w/ evolved schema. "age" will be added w/ default value of null
assertNotEquals(avroRecord, actualRec);
GenericRecord genericRecord = (GenericRecord) actualRec;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
new file mode 100644
index 0000000000000..fed4bc5e0ed2e
--- /dev/null
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/converter/TestProtoSchemaToAvroSchemaConverter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.schema.converter;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
+import org.apache.hudi.utilities.test.proto.Parent;
+
+import org.apache.avro.Schema;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class TestProtoSchemaToAvroSchemaConverter {
+ @Test
+ void testConvert() throws Exception {
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key(), Parent.class.getName());
+ Schema.Parser parser = new Schema.Parser();
+ String actual = new ProtoSchemaToAvroSchemaConverter(properties).convert(getProtoSchemaString());
+ Schema actualSchema = new Schema.Parser().parse(actual);
+
+ Schema expectedSchema = parser.parse(getClass().getClassLoader().getResourceAsStream("schema-provider/proto/parent_schema_recursive_default_limit.avsc"));
+ assertEquals(expectedSchema, actualSchema);
+ }
+
+ private String getProtoSchemaString() throws IOException, URISyntaxException {
+ return new String(Files.readAllBytes(Paths.get(getClass().getClassLoader().getResource("schema-provider/proto/recursive.proto").toURI())));
+ }
+}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index 662cd1dd985f9..b63c7c29a24da 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -24,6 +24,7 @@
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.schema.ProtoClassBasedSchemaProvider;
import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.streamer.DefaultStreamContext;
import org.apache.hudi.utilities.streamer.SourceFormatAdapter;
import org.apache.hudi.utilities.test.proto.Nested;
@@ -37,10 +38,14 @@
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
+import com.google.protobuf.Message;
import com.google.protobuf.StringValue;
import com.google.protobuf.UInt32Value;
import com.google.protobuf.UInt64Value;
+import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.Timestamps;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
+import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -55,6 +60,7 @@
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -64,13 +70,16 @@
import java.util.stream.IntStream;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.apache.hudi.utilities.config.KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Tests against {@link ProtoKafkaSource}.
*/
public class TestProtoKafkaSource extends BaseTestKafkaSource {
+ private static final JsonFormat.Printer PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();
private static final Random RANDOM = new Random();
+ private static final String MOCK_REGISTRY_URL = "mock://127.0.0.1:8081";
protected TypedProperties createPropsForKafkaSource(String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
@@ -93,6 +102,28 @@ protected SourceFormatAdapter createSource(TypedProperties props) {
return new SourceFormatAdapter(protoKafkaSource);
}
+ @Test
+ public void testProtoKafkaSourceWithConfluentProtoDeserialization() {
+ final String topic = TEST_TOPIC_PREFIX + "testProtoKafkaSourceWithConfluentDeserializer";
+ testUtils.createTopic(topic, 2);
+ TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
+ props.put(KAFKA_PROTO_VALUE_DESERIALIZER_CLASS.key(),
+ "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
+ props.put("schema.registry.url", MOCK_REGISTRY_URL);
+ props.put("hoodie.streamer.schemaprovider.registry.url", MOCK_REGISTRY_URL);
+ props.setProperty(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS.key(), "true");
+ // class name is not required so we'll remove it
+ props.remove(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key());
+ SchemaProvider schemaProvider = new SchemaRegistryProvider(props, jsc());
+ ProtoKafkaSource protoKafkaSource = new ProtoKafkaSource(props, jsc(), spark(), schemaProvider, metrics);
+ List messages = createSampleMessages(1000);
+ sendMessagesToKafkaWithConfluentSerializer(topic, 2, messages);
+ // Assert messages are read correctly
+ JavaRDD messagesRead = protoKafkaSource.fetchNext(Option.empty(), 1000).getBatch().get();
+ assertEquals(messages.stream().map(this::protoToJson).collect(Collectors.toSet()),
+ new HashSet<>(messagesRead.map(message -> PRINTER.print(message)).collect()));
+ }
+
@Test
public void testProtoKafkaSourceWithFlattenWrappedPrimitives() {
@@ -196,7 +227,7 @@ private static Nested generateRandomNestedMessage() {
@Override
protected void sendMessagesToKafka(String topic, int count, int numPartitions) {
List messages = createSampleMessages(count);
- try (Producer producer = new KafkaProducer<>(getProducerProperties())) {
+ try (Producer producer = new KafkaProducer<>(getProducerProperties(false))) {
for (int i = 0; i < messages.size(); i++) {
// use consistent keys to get even spread over partitions for test expectations
producer.send(new ProducerRecord<>(topic, Integer.toString(i % numPartitions), messages.get(i).toByteArray()));
@@ -204,14 +235,38 @@ protected void sendMessagesToKafka(String topic, int count, int numPartitions) {
}
}
- private Properties getProducerProperties() {
+ private void sendMessagesToKafkaWithConfluentSerializer(String topic, int numPartitions, List messages) {
+ try (Producer producer = new KafkaProducer<>(getProducerProperties(true))) {
+ for (int i = 0; i < messages.size(); i++) {
+ // use consistent keys to get even spread over partitions for test expectations
+ producer.send(new ProducerRecord<>(topic, Integer.toString(i % numPartitions), messages.get(i)));
+ }
+ }
+ }
+
+ private Properties getProducerProperties(boolean useConfluentProtobufSerializer) {
Properties props = new Properties();
props.put("bootstrap.servers", testUtils.brokerAddress());
- props.put("value.serializer", ByteArraySerializer.class.getName());
- // Key serializer is required.
+ if (useConfluentProtobufSerializer) {
+ props.put("value.serializer", KafkaProtobufSerializer.class.getName());
+ props.put("value.deserializer", KafkaProtobufDeserializer.class.getName());
+ props.put("schema.registry.url", MOCK_REGISTRY_URL);
+ props.put("auto.register.schemas", "true");
+ } else {
+ props.put("value.serializer", ByteArraySerializer.class.getName());
+ // Key serializer is required.
+ }
props.put("key.serializer", StringSerializer.class.getName());
// wait for all in-sync replicas to ack sends
props.put("acks", "all");
return props;
}
+
+ private String protoToJson(Message input) {
+ try {
+ return PRINTER.print(input);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to convert proto to json", e);
+ }
+ }
}
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index b992e5bbeb8c4..10c1cccd4e2ec 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -133,6 +133,7 @@
io.confluent:common-config
io.confluent:common-utils
io.confluent:kafka-schema-registry-client
+ io.confluent:kafka-protobuf-serializer
io.dropwizard.metrics:metrics-core
io.dropwizard.metrics:metrics-graphite
io.dropwizard.metrics:metrics-jmx
diff --git a/packaging/hudi-utilities-slim-bundle/pom.xml b/packaging/hudi-utilities-slim-bundle/pom.xml
index 3919b103465c4..0a2c271e6ce5c 100644
--- a/packaging/hudi-utilities-slim-bundle/pom.xml
+++ b/packaging/hudi-utilities-slim-bundle/pom.xml
@@ -119,6 +119,7 @@
io.confluent:common-config
io.confluent:common-utils
io.confluent:kafka-schema-registry-client
+ io.confluent:kafka-protobuf-serializer
io.dropwizard.metrics:metrics-core
io.dropwizard.metrics:metrics-graphite
io.dropwizard.metrics:metrics-jmx
diff --git a/pom.xml b/pom.xml
index 3b9cc55562a2a..2fd99672a2f94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
2.4.5
3.1.1.4
3.4.1.1
- 5.3.4
+ 5.5.0
2.17
3.0.1-b12
1.10.1
@@ -934,6 +934,11 @@
${glassfish.el.version}
provided
+
+ org.glassfish.jersey.ext
+ jersey-bean-validation
+ ${glassfish.version}
+
@@ -1772,6 +1777,33 @@
+
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${confluent.version}
+
+
+ io.confluent
+ common-config
+ ${confluent.version}
+
+
+ io.confluent
+ common-utils
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-schema-registry-client
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-protobuf-serializer
+ ${confluent.version}
+