Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[da-vinci] Enable compatibility of DVRT for BLOB transfer #1280

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,6 @@ public DaVinciBackend(
}

if (backendConfig.isBlobTransferManagerEnabled()) {
if (recordTransformerFunction != null) {
throw new VeniceException("DaVinciRecordTransformer doesn't support blob transfer.");
}

blobTransferManager = BlobTransferUtil.getP2PBlobTransferManagerForDVCAndStart(
configLoader.getVeniceServerConfig().getDvcP2pBlobTransferServerPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.StorageInitializationException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.serializer.AvroGenericDeserializer;
import com.linkedin.venice.serializer.AvroSerializer;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.Optional;
import org.apache.avro.Schema;


Expand Down Expand Up @@ -74,24 +74,20 @@ public final ByteBuffer prependSchemaIdToHeader(ByteBuffer valueBytes, int schem
/**
* @return true if transformer logic has changed since the last time the class was loaded
*/
public boolean hasTransformerLogicChanged(int classHash) {
public boolean hasTransformerLogicChanged(AbstractStorageEngine storageEngine, int partitionId, int classHash) {
try {
String classHashPath = String.format("./classHash-%d.txt", recordTransformer.getStoreVersion());
File f = new File(classHashPath);
if (f.exists()) {
try (BufferedReader br = new BufferedReader(new FileReader(classHashPath))) {
int storedClassHash = Integer.parseInt(br.readLine());
if (storedClassHash == classHash) {
return false;
}
}
}

try (FileWriter fw = new FileWriter(classHashPath)) {
fw.write(String.valueOf(classHash));
Optional<OffsetRecord> offsetRecord = storageEngine.getPartitionOffset(partitionId);
Integer offsetRecordClassHash = offsetRecord.map(OffsetRecord::getTransformerClassHash).orElse(null);
if (!Objects.equals(offsetRecordClassHash, classHash)) {
offsetRecord.ifPresent(record -> {
record.setTransformerClassHash(classHash);
storageEngine.putPartitionOffset(partitionId, record);
});
return true;
} else {
return false;
}
return true;
} catch (IOException e) {
} catch (IllegalArgumentException | StorageInitializationException e) {
throw new VeniceException("Failed to check if transformation logic has changed", e);
}
}
Expand All @@ -101,18 +97,17 @@ public boolean hasTransformerLogicChanged(int classHash) {
*/
public final void onRecovery(
AbstractStorageEngine storageEngine,
Integer partition,
int partitionId,
Lazy<VeniceCompressor> compressor) {
// ToDo: Store class hash in RocksDB to support blob transfer
int classHash = recordTransformer.getClassHash();
boolean transformerLogicChanged = hasTransformerLogicChanged(classHash);
boolean transformerLogicChanged = hasTransformerLogicChanged(storageEngine, partitionId, classHash);

if (!recordTransformer.getStoreRecordsInDaVinci() || transformerLogicChanged) {
// Bootstrap from VT
storageEngine.clearPartitionOffset(partition);
storageEngine.clearPartitionOffset(partitionId);
} else {
// Bootstrap from local storage
AbstractStorageIterator iterator = storageEngine.getIterator(partition);
AbstractStorageIterator iterator = storageEngine.getIterator(partitionId);
for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
byte[] keyBytes = iterator.key();
byte[] valueBytes = iterator.value();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.davinci.transformer;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -18,8 +18,10 @@
import com.linkedin.davinci.store.AbstractStorageEngine;
import com.linkedin.davinci.store.AbstractStorageIterator;
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.File;
import java.util.Optional;
import org.apache.avro.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -61,13 +63,22 @@ public void testRecordTransformer() {
recordTransformer.processDelete(lazyKey);

assertFalse(recordTransformer.getStoreRecordsInDaVinci());

AbstractStorageEngine storageEngine = mock(AbstractStorageEngine.class);
int classHash = recordTransformer.getClassHash();
int partitionNumber = 1;
OffsetRecord nullOffsetRecord = mock(OffsetRecord.class);
when(nullOffsetRecord.getTransformerClassHash()).thenReturn(null);

OffsetRecord matchingOffsetRecord = mock(OffsetRecord.class);
when(matchingOffsetRecord.getTransformerClassHash()).thenReturn(classHash);

DaVinciRecordTransformerUtility<Integer, String> recordTransformerUtility =
recordTransformer.getRecordTransformerUtility();
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash));
when(storageEngine.getPartitionOffset(partitionNumber)).thenReturn(Optional.of(nullOffsetRecord));
assertTrue(recordTransformerUtility.hasTransformerLogicChanged(storageEngine, partitionNumber, classHash));
verify(storageEngine, times(1)).putPartitionOffset(eq(partitionNumber), any(OffsetRecord.class));
when(storageEngine.getPartitionOffset(partitionNumber)).thenReturn(Optional.of(matchingOffsetRecord));
assertFalse(recordTransformerUtility.hasTransformerLogicChanged(storageEngine, partitionNumber, classHash));
}

@Test
Expand All @@ -86,15 +97,6 @@ public void testOnRecovery() {
int partitionNumber = 1;
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, times(1)).clearPartitionOffset(partitionNumber);

// Reset the mock to clear previous interactions
reset(storageEngine);

// Execute the onRecovery method again to test the case where the classHash file exists
when(storageEngine.getIterator(partitionNumber)).thenReturn(iterator);
recordTransformer.onRecovery(storageEngine, partitionNumber, compressor);
verify(storageEngine, never()).clearPartitionOffset(partitionNumber);
verify(storageEngine, times(1)).getIterator(partitionNumber);
Comment on lines -89 to -97
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we set up the mocks so that this passes?

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,22 @@ CharSequence guidToUtf8(GUID guid) {
return new Utf8(GuidUtils.getCharSequenceFromGuid(guid));
}

public Integer getTransformerClassHash() {
return partitionState.getTransformerClassHash();
}

public void setTransformerClassHash(Integer classHash) {
this.partitionState.transformerClassHash = classHash;
}

@Override
public String toString() {
return "OffsetRecord{" + "localVersionTopicOffset=" + getLocalVersionTopicOffset() + ", upstreamOffset="
+ getPartitionUpstreamOffsetString() + ", leaderTopic=" + getLeaderTopic() + ", offsetLag=" + getOffsetLag()
+ ", eventTimeEpochMs=" + getMaxMessageTimeInMs() + ", latestProducerProcessingTimeInMs="
+ getLatestProducerProcessingTimeInMs() + ", isEndOfPushReceived=" + isEndOfPushReceived() + ", databaseInfo="
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + '}';
+ getDatabaseInfo() + ", realTimeProducerState=" + getRealTimeProducerState() + ", transformerClassHash="
+ getTransformerClassHash() + '}';
Comment on lines +309 to +324
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we make it explicit that this belongs to the record transformer? i.e. recordTransformerClassHash

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@
"avro.java.string": "String"
},
"default": {}
},
{
"name": "transformerClassHash",
"doc": "An integer hash code of the DaVinci record transformer for compatibility checks during BLOB transfer",
"type": ["null", "int"],
"default": null
Comment on lines +191 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: DaVinci record transformer -> DaVinciRecordTransformer

This will also be utilized when blob transfer isn't enabled. Can we change the phrasing to "during bootstrapping?"

}
]
}