Skip to content

Commit

Permalink
[dvc][compat] Add blob transfer support for DaVinciRecordTransformer (#…
Browse files Browse the repository at this point in the history
…1471)

This PR makes DVRT compatible with blob transfer. Here are the changes made:

1. Throw an exception when database checksum verification is enabled. This is because DVRT transforms the values, and when the transformed checksum is compared with the expected checksum the validation will fail. Due to the nature of this feature, it makes sense to not support it and checksum verification is off by default anyway for DVC.
2. Store the classHash in the offsetRecord so it can be transported during blob transfer. This is to ensure that the blob that was transferred is compatible with the current transformer logic.
3. Removed file based classHash persistence as it is now stored in the offsetRecord.
4. Fixed a bug in SIT where the classHash was being calculated based on the wrapper class not the user's class.
  • Loading branch information
kvargha authored Feb 3, 2025
1 parent b014e48 commit 408757f
Show file tree
Hide file tree
Showing 16 changed files with 498 additions and 116 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ Gemfile.lock
.bundles_cache
docs/vendor/
clients/da-vinci-client/classHash*.txt
integrations/venice-duckdb/classHash*.txt
integrations/venice-duckdb/classHash*.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public DaVinciBackend(
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();

if (backendConfig.isDatabaseChecksumVerificationEnabled() && recordTransformerConfig != null) {
// The checksum verification will fail because DVRT transforms the values
throw new VeniceException("DaVinciRecordTransformer cannot be used with database checksum verification.");
}

useDaVinciSpecificExecutionStatusForError = backendConfig.useDaVinciSpecificExecutionStatusForError();
writeBatchingPushStatus = backendConfig.getDaVinciPushStatusCheckIntervalInMs() >= 0;
this.configLoader = configLoader;
Expand Down Expand Up @@ -295,10 +301,6 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerConfig != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

aggVersionedBlobTransferStats =
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.linkedin.davinci.client;

import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -58,6 +62,19 @@ public void onEndVersionIngestion(int currentVersion) {
this.recordTransformer.onEndVersionIngestion(currentVersion);
}

public void internalOnRecovery(
AbstractStorageEngine storageEngine,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
// Using a wrapper around onRecovery because when calculating the class hash it grabs the name of the current class
// that is invoking it. If we directly invoke onRecovery from this class, the class hash will be calculated based
// on the contents of BlockingDaVinciRecordTransformer, not the user's implementation of DVRT.
// We also can't override onRecovery like the other methods because this method is final and the implementation
// should never be overriden.
this.recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
}

@Override
public void close() throws IOException {
this.recordTransformer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -212,9 +214,10 @@ public final int getClassHash() {
*/
public final void onRecovery(
AbstractStorageEngine storageEngine,
Integer partition,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
recordTransformerUtility.onRecovery(storageEngine, partition, compressor);
recordTransformerUtility.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;


/**
Expand All @@ -23,6 +24,7 @@
* @param <O> the type of the output value
*/
public class DaVinciRecordTransformerUtility<K, O> {
private static final Logger LOGGER = LogManager.getLogger(DaVinciRecordTransformerUtility.class);
private final DaVinciRecordTransformer recordTransformer;
private final AvroGenericDeserializer<K> keyDeserializer;
private final AvroGenericDeserializer<O> outputValueDeserializer;
Expand Down Expand Up @@ -80,45 +82,48 @@ public final ByteBuffer prependSchemaIdToHeader(ByteBuffer valueBytes, int schem
/**
* @return true if transformer logic has changed since the last time the class was loaded
*/
public boolean hasTransformerLogicChanged(int classHash) {
try {
String classHashPath = String.format("./classHash-%d.txt", recordTransformer.getStoreVersion());
File f = new File(classHashPath);
if (f.exists()) {
try (BufferedReader br = new BufferedReader(new FileReader(classHashPath))) {
int storedClassHash = Integer.parseInt(br.readLine());
if (storedClassHash == classHash) {
return false;
}
}
}
public boolean hasTransformerLogicChanged(int currentClassHash, OffsetRecord offsetRecord) {
Integer persistedClassHash = offsetRecord.getRecordTransformerClassHash();

try (FileWriter fw = new FileWriter(classHashPath)) {
fw.write(String.valueOf(classHash));
}
return true;
} catch (IOException e) {
throw new VeniceException("Failed to check if transformation logic has changed", e);
if (persistedClassHash != null && persistedClassHash == currentClassHash) {
LOGGER.info(
"A change in transformer logic has been detected. Persisted class hash = {}. New class hash = {}",
persistedClassHash,
currentClassHash);
return false;
}
LOGGER.info("Transformer logic hasn't changed. Class hash = {}", currentClassHash);
return true;
}

/**
* Bootstraps the client after it comes online.
*/
public final void onRecovery(
AbstractStorageEngine storageEngine,
Integer partition,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
// ToDo: Store class hash in RocksDB to support blob transfer
int classHash = recordTransformer.getClassHash();
boolean transformerLogicChanged = hasTransformerLogicChanged(classHash);
Optional<OffsetRecord> optionalOffsetRecord = storageEngine.getPartitionOffset(partitionId);
OffsetRecord offsetRecord = optionalOffsetRecord.orElseGet(() -> new OffsetRecord(partitionStateSerializer));

boolean transformerLogicChanged = hasTransformerLogicChanged(classHash, offsetRecord);

if (!recordTransformer.getStoreRecordsInDaVinci() || transformerLogicChanged) {
LOGGER.info("Bootstrapping directly from the VersionTopic for partition {}", partitionId);

// Bootstrap from VT
storageEngine.clearPartitionOffset(partition);
storageEngine.clearPartitionOffset(partitionId);

// Offset record is deleted, so create a new one and persist it
offsetRecord = new OffsetRecord(partitionStateSerializer);
offsetRecord.setRecordTransformerClassHash(classHash);
storageEngine.putPartitionOffset(partitionId, offsetRecord);
} else {
// Bootstrap from local storage
AbstractStorageIterator iterator = storageEngine.getIterator(partition);
LOGGER.info("Bootstrapping from local storage for partition {}", partitionId);
AbstractStorageIterator iterator = storageEngine.getIterator(partitionId);
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
byte[] valueBytes = iterator.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
private final Schema recordTransformerInputValueSchema;
private final AvroGenericDeserializer recordTransformerKeyDeserializer;
private final SparseConcurrentList<AvroGenericDeserializer> recordTransformerDeserializersByPutSchemaId;
private DaVinciRecordTransformer recordTransformer;
private BlockingDaVinciRecordTransformer recordTransformer;

protected final String localKafkaServer;
protected final int localKafkaClusterId;
Expand Down Expand Up @@ -649,7 +649,7 @@ public synchronized void subscribePartition(PubSubTopicPartition topicPartition,
int partitionNumber = topicPartition.getPartitionNumber();

if (recordTransformer != null) {
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
recordTransformer.internalOnRecovery(storageEngine, partitionNumber, partitionStateSerializer, compressor);
}

partitionToPendingConsumerActionCountMap.computeIfAbsent(partitionNumber, x -> new AtomicInteger(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.File;
import java.util.Optional;
import org.apache.avro.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class RecordTransformerTest {
static final int storeVersion = 1;

@BeforeMethod
@AfterClass
public void deleteClassHash() {
File file = new File(String.format("./classHash-%d.txt", storeVersion));
if (file.exists()) {
assertTrue(file.delete());
}
}
static final int partitionId = 0;
static final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer =
AvroProtocolDefinition.PARTITION_STATE.getSerializer();

@Test
public void testRecordTransformer() {
Expand Down Expand Up @@ -66,8 +62,13 @@ public void testRecordTransformer() {

DaVinciRecordTransformerUtility<Integer, String> recordTransformerUtility =
recordTransformer.getRecordTransformerUtility();
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash));
OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);

assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));

offsetRecord.setRecordTransformerClassHash(classHash);

assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));
}

@Test
Expand All @@ -87,18 +88,30 @@ public void testOnRecovery() {
AbstractStorageEngine storageEngine = mock(AbstractStorageEngine.class);
Lazy<VeniceCompressor> compressor = Lazy.of(() -> mock(VeniceCompressor.class));

int partitionNumber = 1;
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, times(1)).clearPartitionOffset(partitionNumber);
OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);
when(storageEngine.getPartitionOffset(partitionId)).thenReturn(Optional.of(offsetRecord));

recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
verify(storageEngine, times(1)).clearPartitionOffset(partitionId);

// Reset the mock to clear previous interactions
reset(storageEngine);

// Execute the onRecovery method again to test the case where the classHash file exists
when(storageEngine.getIterator(partitionNumber)).thenReturn(iterator);
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, never()).clearPartitionOffset(partitionNumber);
verify(storageEngine, times(1)).getIterator(partitionNumber);
offsetRecord.setRecordTransformerClassHash(recordTransformer.getClassHash());
assertEquals((int) offsetRecord.getRecordTransformerClassHash(), recordTransformer.getClassHash());

// class hash should be the same when the OffsetRecord is serialized then deserialized
byte[] offsetRecordBytes = offsetRecord.toBytes();
OffsetRecord deserializedOffsetRecord = new OffsetRecord(offsetRecordBytes, partitionStateSerializer);
assertEquals((int) deserializedOffsetRecord.getRecordTransformerClassHash(), recordTransformer.getClassHash());

when(storageEngine.getPartitionOffset(partitionId)).thenReturn(Optional.of(offsetRecord));

// Execute the onRecovery method again to test the case where the classHash exists
when(storageEngine.getIterator(partitionId)).thenReturn(iterator);
recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
verify(storageEngine, never()).clearPartitionOffset(partitionId);
verify(storageEngine, times(1)).getIterator(partitionId);
}

@Test
Expand Down Expand Up @@ -134,5 +147,4 @@ public void testBlockingRecordTransformer() {

recordTransformer.onEndVersionIngestion(2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@
import org.apache.logging.log4j.Logger;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -115,16 +114,6 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(this.cluster);
}

@BeforeMethod
@AfterClass
public void deleteClassHash() {
int storeVersion = 1;
File file = new File(String.format("./classHash-%d.txt", storeVersion));
if (file.exists()) {
assertTrue(file.delete());
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testRecordTransformer() throws Exception {
DaVinciConfig clientConfig = new DaVinciConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.davinci.client.DaVinciRecordTransformerUtility;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -24,25 +27,17 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class DuckDBDaVinciRecordTransformerTest {
static final int storeVersion = 1;
static final int partitionId = 0;
static final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer =
AvroProtocolDefinition.PARTITION_STATE.getSerializer();
static final String storeName = "test_store";
private final Set<String> columnsToProject = Collections.emptySet();

@BeforeMethod
@AfterClass
public void deleteClassHash() {
File file = new File(String.format("./classHash-%d.txt", storeVersion));
if (file.exists()) {
assertTrue(file.delete());
}
}

@Test
public void testRecordTransformer() throws IOException {
String tempDir = Utils.getTempDataDirectory().getAbsolutePath();
Expand Down Expand Up @@ -90,8 +85,13 @@ public void testRecordTransformer() throws IOException {

DaVinciRecordTransformerUtility<GenericRecord, GenericRecord> recordTransformerUtility =
recordTransformer.getRecordTransformerUtility();
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash));
OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);

assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));

offsetRecord.setRecordTransformerClassHash(classHash);

assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));
}
}

Expand Down
Loading

0 comments on commit 408757f

Please sign in to comment.