Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dvc][compat] Add blob transfer support for DaVinciRecordTransformer #1471

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,3 @@ _site/
Gemfile.lock
.bundles_cache
docs/vendor/
clients/da-vinci-client/classHash*.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this? classHash should not be included.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file will no longer get generated as the classHash is now stored in the offset record.

integrations/venice-duckdb/classHash*.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ public DaVinciBackend(
LOGGER.info("Creating Da Vinci backend with managed clients: {}", managedClients);
try {
VeniceServerConfig backendConfig = configLoader.getVeniceServerConfig();

if (backendConfig.isDatabaseChecksumVerificationEnabled() && recordTransformerConfig != null) {
// The checksum verification will fail because DVRT transforms the values
throw new VeniceException("DaVinciRecordTransformer cannot be used with database checksum verification.");
}

useDaVinciSpecificExecutionStatusForError = backendConfig.useDaVinciSpecificExecutionStatusForError();
writeBatchingPushStatus = backendConfig.getDaVinciPushStatusCheckIntervalInMs() >= 0;
this.configLoader = configLoader;
Expand Down Expand Up @@ -295,10 +301,6 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerConfig != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

aggVersionedBlobTransferStats =
new AggVersionedBlobTransferStats(metricsRepository, storeRepository, configLoader.getVeniceServerConfig());

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.linkedin.davinci.client;

import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -58,6 +62,19 @@ public void onEndVersionIngestion(int currentVersion) {
this.recordTransformer.onEndVersionIngestion(currentVersion);
}

public void internalOnRecovery(
AbstractStorageEngine storageEngine,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
// Using a wrapper around onRecovery because when calculating the class hash it grabs the name of the current class
// that is invoking it. If we directly invoke onRecovery from this class, the class hash will be calculated based
// on the contents of BlockingDaVinciRecordTransformer, not the user's implementation of DVRT.
// We also can't override onRecovery like the other methods because this method is final and the implementation
// should never be overriden.
this.recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
}

@Override
public void close() throws IOException {
this.recordTransformer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -212,9 +214,10 @@ public final int getClassHash() {
*/
public final void onRecovery(
AbstractStorageEngine storageEngine,
Integer partition,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
recordTransformerUtility.onRecovery(storageEngine, partition, compressor);
recordTransformerUtility.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import org.apache.avro.Schema;


Expand Down Expand Up @@ -80,45 +79,40 @@ public final ByteBuffer prependSchemaIdToHeader(ByteBuffer valueBytes, int schem
/**
* @return true if transformer logic has changed since the last time the class was loaded
*/
public boolean hasTransformerLogicChanged(int classHash) {
try {
String classHashPath = String.format("./classHash-%d.txt", recordTransformer.getStoreVersion());
File f = new File(classHashPath);
if (f.exists()) {
try (BufferedReader br = new BufferedReader(new FileReader(classHashPath))) {
int storedClassHash = Integer.parseInt(br.readLine());
if (storedClassHash == classHash) {
return false;
}
}
}
public boolean hasTransformerLogicChanged(int currentClassHash, OffsetRecord offsetRecord) {
Integer persistedClassHash = offsetRecord.getRecordTransformerClassHash();

try (FileWriter fw = new FileWriter(classHashPath)) {
fw.write(String.valueOf(classHash));
}
return true;
} catch (IOException e) {
throw new VeniceException("Failed to check if transformation logic has changed", e);
if (persistedClassHash != null && persistedClassHash == currentClassHash) {
return false;
}
return true;
}

/**
* Bootstraps the client after it comes online.
*/
public final void onRecovery(
AbstractStorageEngine storageEngine,
Integer partition,
int partitionId,
InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer,
Lazy<VeniceCompressor> compressor) {
// ToDo: Store class hash in RocksDB to support blob transfer
int classHash = recordTransformer.getClassHash();
boolean transformerLogicChanged = hasTransformerLogicChanged(classHash);
Optional<OffsetRecord> optionalOffsetRecord = storageEngine.getPartitionOffset(partitionId);
OffsetRecord offsetRecord = optionalOffsetRecord.orElseGet(() -> new OffsetRecord(partitionStateSerializer));

boolean transformerLogicChanged = hasTransformerLogicChanged(classHash, offsetRecord);

if (!recordTransformer.getStoreRecordsInDaVinci() || transformerLogicChanged) {
// Bootstrap from VT
storageEngine.clearPartitionOffset(partition);
storageEngine.clearPartitionOffset(partitionId);

// Offset record is deleted, so create a new one and persist it
offsetRecord = new OffsetRecord(partitionStateSerializer);
offsetRecord.setRecordTransformerClassHash(classHash);
storageEngine.putPartitionOffset(partitionId, offsetRecord);
} else {
// Bootstrap from local storage
AbstractStorageIterator iterator = storageEngine.getIterator(partition);
AbstractStorageIterator iterator = storageEngine.getIterator(partitionId);
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
byte[] valueBytes = iterator.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {
private final Schema recordTransformerInputValueSchema;
private final AvroGenericDeserializer recordTransformerKeyDeserializer;
private final SparseConcurrentList<AvroGenericDeserializer> recordTransformerDeserializersByPutSchemaId;
private DaVinciRecordTransformer recordTransformer;
private BlockingDaVinciRecordTransformer recordTransformer;

protected final String localKafkaServer;
protected final int localKafkaClusterId;
Expand Down Expand Up @@ -650,7 +650,7 @@ public synchronized void subscribePartition(PubSubTopicPartition topicPartition,
int partitionNumber = topicPartition.getPartitionNumber();

if (recordTransformer != null) {
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
recordTransformer.internalOnRecovery(storageEngine, partitionNumber, partitionStateSerializer, compressor);
}

partitionToPendingConsumerActionCountMap.computeIfAbsent(partitionNumber, x -> new AtomicInteger(0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,21 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.File;
import java.util.Optional;
import org.apache.avro.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class RecordTransformerTest {
static final int storeVersion = 1;

@BeforeMethod
@AfterClass
public void deleteClassHash() {
File file = new File(String.format("./classHash-%d.txt", storeVersion));
if (file.exists()) {
assertTrue(file.delete());
}
}
static final int partitionId = 0;
static final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer =
AvroProtocolDefinition.PARTITION_STATE.getSerializer();

@Test
public void testRecordTransformer() {
Expand Down Expand Up @@ -66,8 +62,13 @@ public void testRecordTransformer() {

DaVinciRecordTransformerUtility<Integer, String> recordTransformerUtility =
recordTransformer.getRecordTransformerUtility();
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash));
OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);

assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));

offsetRecord.setRecordTransformerClassHash(classHash);

assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));
}

@Test
Expand All @@ -87,18 +88,24 @@ public void testOnRecovery() {
AbstractStorageEngine storageEngine = mock(AbstractStorageEngine.class);
Lazy<VeniceCompressor> compressor = Lazy.of(() -> mock(VeniceCompressor.class));

int partitionNumber = 1;
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, times(1)).clearPartitionOffset(partitionNumber);
OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);
when(storageEngine.getPartitionOffset(partitionId)).thenReturn(Optional.of(offsetRecord));

recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
verify(storageEngine, times(1)).clearPartitionOffset(partitionId);

// Reset the mock to clear previous interactions
reset(storageEngine);

// Execute the onRecovery method again to test the case where the classHash file exists
when(storageEngine.getIterator(partitionNumber)).thenReturn(iterator);
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, never()).clearPartitionOffset(partitionNumber);
verify(storageEngine, times(1)).getIterator(partitionNumber);
offsetRecord.setRecordTransformerClassHash(recordTransformer.getClassHash());
assertEquals((int) offsetRecord.getRecordTransformerClassHash(), recordTransformer.getClassHash());
when(storageEngine.getPartitionOffset(partitionId)).thenReturn(Optional.of(offsetRecord));

// Execute the onRecovery method again to test the case where the classHash exists
when(storageEngine.getIterator(partitionId)).thenReturn(iterator);
recordTransformer.onRecovery(storageEngine, partitionId, partitionStateSerializer, compressor);
verify(storageEngine, never()).clearPartitionOffset(partitionId);
verify(storageEngine, times(1)).getIterator(partitionId);
}

@Test
Expand Down Expand Up @@ -134,5 +141,4 @@ public void testBlockingRecordTransformer() {

recordTransformer.onEndVersionIngestion(2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.logging.log4j.Logger;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -107,16 +106,6 @@ public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(cluster);
}

@BeforeMethod
@AfterClass
public void deleteClassHash() {
int storeVersion = 1;
File file = new File(String.format("./classHash-%d.txt", storeVersion));
if (file.exists()) {
assertTrue(file.delete());
}
}

@Test(timeOut = TEST_TIMEOUT)
public void testRecordTransformer() throws Exception {
DaVinciConfig clientConfig = new DaVinciConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@

import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.davinci.client.DaVinciRecordTransformerUtility;
import com.linkedin.venice.kafka.protocol.state.PartitionState;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.File;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -24,25 +27,17 @@
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class DuckDBDaVinciRecordTransformerTest {
static final int storeVersion = 1;
static final int partitionId = 0;
static final InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer =
AvroProtocolDefinition.PARTITION_STATE.getSerializer();
static final String storeName = "test_store";
private final Set<String> columnsToProject = Collections.emptySet();

@BeforeMethod
@AfterClass
public void deleteClassHash() {
File file = new File(String.format("./classHash-%d.txt", storeVersion));
if (file.exists()) {
assertTrue(file.delete());
}
}

@Test
public void testRecordTransformer() throws IOException {
String tempDir = Utils.getTempDataDirectory().getAbsolutePath();
Expand Down Expand Up @@ -90,8 +85,13 @@ public void testRecordTransformer() throws IOException {

DaVinciRecordTransformerUtility<GenericRecord, GenericRecord> recordTransformerUtility =
recordTransformer.getRecordTransformerUtility();
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash));
OffsetRecord offsetRecord = new OffsetRecord(partitionStateSerializer);

assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));

offsetRecord.setRecordTransformerClassHash(classHash);

assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash, offsetRecord));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,15 @@ public void setPendingReportIncPushVersionList(List<String> incPushVersionList)
partitionState.pendingReportIncrementalPushVersions = new ArrayList<>(incPushVersionList);
}

public Integer getRecordTransformerClassHash() {
Integer classHash = partitionState.getRecordTransformerClassHash();
return classHash;
}

public void setRecordTransformerClassHash(int classHash) {
this.partitionState.setRecordTransformerClassHash(classHash);
}

/**
* It may be useful to cache this mapping. TODO: Explore GC tuning later.
*
Expand All @@ -321,7 +330,8 @@ public String toString() {
+ getPartitionUpstreamOffsetString() + ", leaderTopic=" + getLeaderTopic() + ", offsetLag=" + getOffsetLag()
+ ", eventTimeEpochMs=" + getMaxMessageTimeInMs() + ", latestProducerProcessingTimeInMs="
+ getLatestProducerProcessingTimeInMs() + ", isEndOfPushReceived=" + isEndOfPushReceived() + ", databaseInfo="
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + '}';
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + ", recordTransformerClassHash="
+ getRecordTransformerClassHash() + '}';
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public enum AvroProtocolDefinition {
* Used to persist the state of a partition in Storage Nodes, including offset,
* Data Ingest Validation state, etc.
*/
PARTITION_STATE(24, 14, PartitionState.class),
PARTITION_STATE(24, 15, PartitionState.class),

/**
* Used to persist state related to a store-version, including Start of Buffer Replay
Expand Down
Loading
Loading