Skip to content

Commit

Permalink
[HUDI-4732] Add support for confluent schema registry with proto (apa…
Browse files Browse the repository at this point in the history
…che#11070)

Co-authored-by: Y Ethan Guo <[email protected]>
  • Loading branch information
the-other-tim-brown and yihua authored May 12, 2024
1 parent 61f54a0 commit aa5bb0d
Show file tree
Hide file tree
Showing 13 changed files with 288 additions and 37 deletions.
7 changes: 4 additions & 3 deletions hudi-utilities/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,10 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
Expand All @@ -376,7 +374,10 @@
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-protobuf-serializer</artifactId>
</dependency>

<!-- Httpcomponents -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import javax.annotation.concurrent.Immutable;

import static org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
Expand Down Expand Up @@ -120,6 +122,12 @@ public class KafkaSourceConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("Kafka consumer strategy for reading data.");

public static final ConfigProperty<String> KAFKA_PROTO_VALUE_DESERIALIZER_CLASS = ConfigProperty
.key(PREFIX + "proto.value.deserializer.class")
.defaultValue(ByteArrayDeserializer.class.getName())
.sinceVersion("0.15.0")
.withDocumentation("Kafka Proto Payload Deserializer Class");

/**
* Kafka reset offset strategies.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void configure(Map<String, ?> configs, boolean isKey) {
/**
* We need to inject sourceSchema instead of reader schema during deserialization or later stages of the pipeline.
*
* @param includeSchemaAndVersion
* @param topic
* @param isKey
* @param payload
Expand All @@ -70,13 +69,12 @@ public void configure(Map<String, ?> configs, boolean isKey) {
*/
@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchema)
throws SerializationException {
return super.deserialize(includeSchemaAndVersion, topic, isKey, payload, sourceSchema);
return super.deserialize(topic, isKey, payload, sourceSchema);
}

protected TypedProperties getConvertToTypedProperties(Map<String, ?> configs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,8 @@
import java.util.Collections;

import static org.apache.hudi.common.util.ConfigUtils.checkRequiredConfigProperties;
import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;

/**
* A schema provider that takes in a class name for a generated protobuf class that is on the classpath.
Expand Down Expand Up @@ -75,10 +70,7 @@ public ProtoClassBasedSchemaProvider(TypedProperties props, JavaSparkContext jss
super(props, jssc);
checkRequiredConfigProperties(props, Collections.singletonList(PROTO_SCHEMA_CLASS_NAME));
String className = getStringWithAltKeys(config, PROTO_SCHEMA_CLASS_NAME);
boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS);
int maxRecursionDepth = getIntWithAltKeys(props, PROTO_SCHEMA_MAX_RECURSION_DEPTH);
boolean timestampsAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS);
ProtoConversionUtil.SchemaConfig schemaConfig = new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords);
ProtoConversionUtil.SchemaConfig schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(props);
try {
schemaString = ProtoConversionUtil.getAvroSchemaForMessageClass(ReflectionUtils.getClass(className), schemaConfig).toString();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.utilities.schema.converter;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
import org.apache.hudi.utilities.sources.helpers.ProtoConversionUtil;

import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchema;

import java.io.IOException;

/**
* Converts a protobuf schema from the schema registry to an Avro schema.
*/
public class ProtoSchemaToAvroSchemaConverter implements SchemaRegistryProvider.SchemaConverter {
private final ProtoConversionUtil.SchemaConfig schemaConfig;

public ProtoSchemaToAvroSchemaConverter(TypedProperties config) {
this.schemaConfig = ProtoConversionUtil.SchemaConfig.fromProperties(config);
}

@Override
public String convert(String schema) throws IOException {
ProtobufSchema protobufSchema = new ProtobufSchema(schema);
return ProtoConversionUtil.getAvroSchemaForMessageDescriptor(protobufSchema.toDescriptor(), schemaConfig).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.KafkaSourceConfig;
import org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
Expand All @@ -31,6 +34,8 @@
import org.apache.hudi.utilities.streamer.StreamContext;

import com.google.protobuf.Message;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
Expand All @@ -52,8 +57,8 @@
* Reads protobuf serialized Kafka data, based on a provided class name.
*/
public class ProtoKafkaSource extends KafkaSource<JavaRDD<Message>> {

private final String className;
private final Option<String> className;
private final String deserializerName;

public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider, HoodieIngestionMetrics metrics) {
Expand All @@ -63,11 +68,18 @@ public ProtoKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Sp
public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContext, SparkSession sparkSession, HoodieIngestionMetrics metrics, StreamContext streamContext) {
super(properties, sparkContext, sparkSession, SourceType.PROTO, metrics,
new DefaultStreamContext(UtilHelpers.getSchemaProviderForKafkaSource(streamContext.getSchemaProvider(), properties, sparkContext), streamContext.getSourceProfileSupplier()));
checkRequiredConfigProperties(props, Collections.singletonList(
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class);
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, ByteArrayDeserializer.class);
className = getStringWithAltKeys(props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME);
this.deserializerName = ConfigUtils.getStringWithAltKeys(props, KafkaSourceConfig.KAFKA_PROTO_VALUE_DESERIALIZER_CLASS, true);
if (!deserializerName.equals(ByteArrayDeserializer.class.getName()) && !deserializerName.equals(KafkaProtobufDeserializer.class.getName())) {
throw new HoodieReadFromSourceException("Only ByteArrayDeserializer and KafkaProtobufDeserializer are supported for ProtoKafkaSource");
}
if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
checkRequiredConfigProperties(props, Collections.singletonList(ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
className = Option.of(getStringWithAltKeys(props, ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME));
} else {
className = Option.empty();
}
props.put(NATIVE_KAFKA_KEY_DESERIALIZER_PROP, StringDeserializer.class.getName());
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, deserializerName);
this.offsetGen = new KafkaOffsetGen(props);
if (this.shouldAddOffsets) {
throw new HoodieReadFromSourceException("Appending kafka offsets to ProtoKafkaSource is not supported");
Expand All @@ -76,9 +88,17 @@ public ProtoKafkaSource(TypedProperties properties, JavaSparkContext sparkContex

@Override
protected JavaRDD<Message> toBatch(OffsetRange[] offsetRanges) {
ProtoDeserializer deserializer = new ProtoDeserializer(className);
return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value()));
if (deserializerName.equals(ByteArrayDeserializer.class.getName())) {
ValidationUtils.checkArgument(
className.isPresent(),
ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_CLASS_NAME.key() + " config must be present.");
ProtoDeserializer deserializer = new ProtoDeserializer(className.get());
return KafkaUtils.<String, byte[]>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(obj -> deserializer.parse(obj.value()));
} else {
return KafkaUtils.<String, Message>createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
LocationStrategies.PreferConsistent()).map(ConsumerRecord::value);
}
}

private static class ProtoDeserializer implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.hudi.utilities.sources.helpers;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.HoodieSchemaException;

import com.google.protobuf.BoolValue;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DoubleValue;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.FloatValue;
import com.google.protobuf.Int32Value;
import com.google.protobuf.Int64Value;
Expand Down Expand Up @@ -56,7 +59,12 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.hudi.common.util.ConfigUtils.getBooleanWithAltKeys;
import static org.apache.hudi.common.util.ConfigUtils.getIntWithAltKeys;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_MAX_RECURSION_DEPTH;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS;
import static org.apache.hudi.utilities.config.ProtoClassBasedSchemaProviderConfig.PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS;

/**
* A utility class to help translate from Proto to Avro.
Expand All @@ -74,6 +82,17 @@ public static Schema getAvroSchemaForMessageClass(Class clazz, SchemaConfig sche
return new AvroSupport(schemaConfig).getSchema(clazz);
}

/**
* Creates an Avro {@link Schema} for the provided {@link Descriptors.Descriptor}.
* Intended for use when the descriptor is provided by an external registry.
* @param descriptor The protobuf descriptor
* @param schemaConfig configuration used to determine how to handle particular cases when converting from the proto schema
* @return An Avro schema
*/
public static Schema getAvroSchemaForMessageDescriptor(Descriptors.Descriptor descriptor, SchemaConfig schemaConfig) {
return new AvroSupport(schemaConfig).getSchema(descriptor);
}

/**
* Converts the provided {@link Message} into an avro {@link GenericRecord} with the provided schema.
* @param schema target schema to convert into
Expand Down Expand Up @@ -101,6 +120,13 @@ public SchemaConfig(boolean wrappedPrimitivesAsRecords, int maxRecursionDepth, b
this.timestampsAsRecords = timestampsAsRecords;
}

public static SchemaConfig fromProperties(TypedProperties props) {
boolean wrappedPrimitivesAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_WRAPPED_PRIMITIVES_AS_RECORDS);
int maxRecursionDepth = getIntWithAltKeys(props, PROTO_SCHEMA_MAX_RECURSION_DEPTH);
boolean timestampsAsRecords = getBooleanWithAltKeys(props, PROTO_SCHEMA_TIMESTAMPS_AS_RECORDS);
return new ProtoConversionUtil.SchemaConfig(wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords);
}

public boolean isWrappedPrimitivesAsRecords() {
return wrappedPrimitivesAsRecords;
}
Expand Down Expand Up @@ -157,11 +183,11 @@ private AvroSupport(SchemaConfig schemaConfig) {
this.timestampsAsRecords = schemaConfig.isTimestampsAsRecords();
}

public static GenericRecord convert(Schema schema, Message message) {
static GenericRecord convert(Schema schema, Message message) {
return (GenericRecord) convertObject(schema, message);
}

public Schema getSchema(Class c) {
Schema getSchema(Class c) {
return SCHEMA_CACHE.computeIfAbsent(new SchemaCacheKey(c, wrappedPrimitivesAsRecords, maxRecursionDepth, timestampsAsRecords), key -> {
try {
Object descriptor = c.getMethod("getDescriptor").invoke(null);
Expand All @@ -177,6 +203,16 @@ public Schema getSchema(Class c) {
});
}

/**
* Translates a Proto Message descriptor into an Avro Schema.
* Does not cache since external system may evolve the schema and that can result in a stale version of the avro schema.
* @param descriptor the descriptor for the proto message
* @return an avro schema
*/
Schema getSchema(Descriptors.Descriptor descriptor) {
return getMessageSchema(descriptor, new CopyOnWriteMap<>(), getNamespace(descriptor.getFullName()));
}

private Schema getEnumSchema(Descriptors.EnumDescriptor enumDescriptor) {
List<String> symbols = new ArrayList<>(enumDescriptor.getValues().size());
for (Descriptors.EnumValueDescriptor valueDescriptor : enumDescriptor.getValues()) {
Expand Down Expand Up @@ -402,7 +438,21 @@ private static Object convertObject(Schema schema, Object value) {
if (value instanceof Message) {
// check if this is a Timestamp
if (LogicalTypes.timestampMicros().equals(schema.getLogicalType())) {
return Timestamps.toMicros((Timestamp) value);
if (value instanceof Timestamp) {
return Timestamps.toMicros((Timestamp) value);
} else if (value instanceof DynamicMessage) {
Timestamp.Builder builder = Timestamp.newBuilder();
((DynamicMessage) value).getAllFields().forEach((fieldDescriptor, fieldValue) -> {
if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.seconds")) {
builder.setSeconds((Long) fieldValue);
} else if (fieldDescriptor.getFullName().equals("google.protobuf.Timestamp.nanos")) {
builder.setNanos((Integer) fieldValue);
}
});
return Timestamps.toMicros(builder.build());
} else {
throw new HoodieSchemaException("Unexpected message type while handling timestamps: " + value.getClass().getName());
}
} else {
tmpValue = getWrappedValue(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private IndexedRecord createExtendUserRecord() {
}

/**
* Tests {@link KafkaAvroSchemaDeserializer#deserialize(Boolean, String, Boolean, byte[], Schema)}.
* Tests {@link KafkaAvroSchemaDeserializer#deserialize(String, Boolean, byte[], Schema)}.
*/
@Test
public void testKafkaAvroSchemaDeserializer() {
Expand All @@ -105,7 +105,7 @@ public void testKafkaAvroSchemaDeserializer() {
avroDeserializer.configure(new HashMap(config), false);
bytesOrigRecord = avroSerializer.serialize(topic, avroRecord);
// record is serialized in orig schema and deserialized using same schema.
assertEquals(avroRecord, avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema));
assertEquals(avroRecord, avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema));

IndexedRecord avroRecordWithAllField = createExtendUserRecord();
byte[] bytesExtendedRecord = avroSerializer.serialize(topic, avroRecordWithAllField);
Expand All @@ -115,12 +115,12 @@ public void testKafkaAvroSchemaDeserializer() {
avroDeserializer = new KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
avroDeserializer.configure(new HashMap(config), false);
// record is serialized w/ evolved schema, and deserialized w/ evolved schema
IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesExtendedRecord, evolSchema);
IndexedRecord avroRecordWithAllFieldActual = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesExtendedRecord, evolSchema);
assertEquals(avroRecordWithAllField, avroRecordWithAllFieldActual);
assertEquals(avroRecordWithAllFieldActual.getSchema(), evolSchema);

// read old record w/ evolved schema.
IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(false, topic, false, bytesOrigRecord, origSchema);
IndexedRecord actualRec = (IndexedRecord) avroDeserializer.deserialize(topic, false, bytesOrigRecord, origSchema);
// record won't be equal to original record as we read w/ evolved schema. "age" will be added w/ default value of null
assertNotEquals(avroRecord, actualRec);
GenericRecord genericRecord = (GenericRecord) actualRec;
Expand Down
Loading

0 comments on commit aa5bb0d

Please sign in to comment.