Skip to content

Commit

Permalink
[duckdb] Operation Foie Gras (#1457)
Browse files Browse the repository at this point in the history
Time to force-feed the duck. Pre-allocated all Avro deserializers,
connection, prepared statement as well as the InsertProcessor, so
that the hot path has as little to do as possible.

Miscellaneous:

- Changed the unhandled type policy from FAIL to SKIP.

- Made DVRT implement Closeable.

- Added a new useUniformInputValueSchema() API to DVRT:

  - If false (default), SIT will give whatever incoming record in
    the schema it comes in.

  - If true (which the DuckDB DVRT does), SIT will use Avro schema
    evolution to decode the incoming records into the same schema
    which it passed into the DVRT constructor.
  • Loading branch information
FelixGV authored Jan 18, 2025
1 parent 5a81893 commit 1b1a5fc
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.venice.annotation.Experimental;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.avro.Schema;

Expand Down Expand Up @@ -56,4 +57,9 @@ public void onStartVersionIngestion(boolean isCurrentVersion) {
public void onEndVersionIngestion(int currentVersion) {
this.recordTransformer.onEndVersionIngestion(currentVersion);
}

@Override
public void close() throws IOException {
this.recordTransformer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.linkedin.venice.compression.VeniceCompressor;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
Expand All @@ -29,7 +30,7 @@
* @param <O> the type of the output value
*/
@Experimental
public abstract class DaVinciRecordTransformer<K, V, O> {
public abstract class DaVinciRecordTransformer<K, V, O> implements Closeable {
/**
* Version of the store of when the transformer is initialized.
*/
Expand Down Expand Up @@ -130,6 +131,10 @@ public void onEndVersionIngestion(int currentVersion) {
return;
}

public boolean useUniformInputValueSchema() {
return false;
}

// Final methods below

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,9 @@ public abstract class StoreIngestionTask implements Runnable, Closeable {

protected final ChunkAssembler chunkAssembler;
private final Optional<ObjectCacheBackend> cacheBackend;
private final Schema recordTransformerInputValueSchema;
private final AvroGenericDeserializer recordTransformerKeyDeserializer;
private final SparseConcurrentList<AvroGenericDeserializer> recordTransformerDeserializersByPutSchemaId;
private DaVinciRecordTransformer recordTransformer;

protected final String localKafkaServer;
Expand Down Expand Up @@ -475,18 +478,20 @@ public StoreIngestionTask(

if (recordTransformerConfig != null && recordTransformerConfig.getRecordTransformerFunction() != null) {
Schema keySchema = schemaRepository.getKeySchema(storeName).getSchema();
Schema inputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema();
this.recordTransformerKeyDeserializer = new AvroGenericDeserializer(keySchema, keySchema);
this.recordTransformerInputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema();
Schema outputValueSchema = recordTransformerConfig.getOutputValueSchema();

DaVinciRecordTransformer clientRecordTransformer = recordTransformerConfig.getRecordTransformerFunction()
.apply(versionNumber, keySchema, inputValueSchema, outputValueSchema);
.apply(versionNumber, keySchema, this.recordTransformerInputValueSchema, outputValueSchema);

this.recordTransformer = new BlockingDaVinciRecordTransformer(
clientRecordTransformer,
keySchema,
inputValueSchema,
this.recordTransformerInputValueSchema,
outputValueSchema,
clientRecordTransformer.getStoreRecordsInDaVinci());
this.recordTransformerDeserializersByPutSchemaId = new SparseConcurrentList<>();

versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber);
versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber);
Expand All @@ -503,6 +508,10 @@ public StoreIngestionTask(
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
endTime);
} else {
this.recordTransformerKeyDeserializer = null;
this.recordTransformerInputValueSchema = null;
this.recordTransformerDeserializersByPutSchemaId = null;
}

this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS);
Expand Down Expand Up @@ -3703,15 +3712,12 @@ private int processKafkaDataMessage(
}

// Check if put.getSchemaId is positive, if not default to 1
// TODO: Write a test for chunked records... it does not seem right to transform negative schemas IDs into 1
int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1;

if (recordTransformer != null) {
long recordTransformerStartTime = System.currentTimeMillis();
ByteBuffer valueBytes = put.getPutValue();
Schema valueSchema = schemaRepository.getValueSchema(storeName, putSchemaId).getSchema();
Lazy<RecordDeserializer> recordDeserializer =
Lazy.of(() -> new AvroGenericDeserializer<>(valueSchema, valueSchema));

ByteBuffer assembledObject = chunkAssembler.bufferAndAssembleRecord(
consumerRecord.getTopicPartition(),
putSchemaId,
Expand All @@ -3726,12 +3732,22 @@ private int processKafkaDataMessage(
return 0;
}

SchemaEntry keySchema = schemaRepository.getKeySchema(storeName);
Lazy<Object> lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema));
Lazy<Object> lazyKey = Lazy.of(() -> this.recordTransformerKeyDeserializer.deserialize(keyBytes));
Lazy<Object> lazyValue = Lazy.of(() -> {
try {
ByteBuffer decompressedAssembledObject = compressor.get().decompress(assembledObject);
return recordDeserializer.get().deserialize(decompressedAssembledObject);

RecordDeserializer recordDeserializer =
this.recordTransformerDeserializersByPutSchemaId.computeIfAbsent(putSchemaId, i -> {
Schema valueSchema = schemaRepository.getValueSchema(storeName, putSchemaId).getSchema();
if (this.recordTransformer.useUniformInputValueSchema()) {
return new AvroGenericDeserializer<>(valueSchema, this.recordTransformerInputValueSchema);
} else {
return new AvroGenericDeserializer<>(valueSchema, valueSchema);
}
});

return recordDeserializer.deserialize(decompressedAssembledObject);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -3801,8 +3817,7 @@ private int processKafkaDataMessage(
}

if (recordTransformer != null) {
SchemaEntry keySchema = schemaRepository.getKeySchema(storeName);
Lazy<Object> lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema));
Lazy<Object> lazyKey = Lazy.of(() -> this.recordTransformerKeyDeserializer.deserialize(keyBytes));
recordTransformer.processDelete(lazyKey);

// This is called here after processDelete because if the user stores their data somewhere other than
Expand Down Expand Up @@ -4027,10 +4042,6 @@ private void deserializeValue(
}
}

private Object deserializeAvroObjectAndReturn(ByteBuffer input, SchemaEntry schemaEntry) {
return new AvroGenericDeserializer<>(schemaEntry.getSchema(), schemaEntry.getSchema()).deserialize(input);
}

private void maybeCloseInactiveIngestionTask() {
LOGGER.warn("{} Has expired due to not being subscribed to any partitions for too long.", ingestionTaskName);
if (!consumerActionsQueue.isEmpty()) {
Expand Down Expand Up @@ -4096,6 +4107,7 @@ public synchronized void close() {
versionNumber,
LatencyUtils.getElapsedTimeFromMsToMs(startTime),
endTime);
Utils.closeQuietlyWithErrorLogged(this.recordTransformer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.linkedin.davinci.client.DaVinciRecordTransformerConfig;
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import org.apache.avro.Schema;
import org.testng.annotations.Test;

Expand All @@ -34,6 +35,11 @@ public DaVinciRecordTransformerResult<Integer> transform(Lazy<Integer> key, Lazy
public void processPut(Lazy<Integer> key, Lazy<Integer> value) {
return;
}

@Override
public void close() throws IOException {

}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.util.Utf8;

Expand Down Expand Up @@ -37,4 +38,9 @@ public DaVinciRecordTransformerResult<String> transform(Lazy<Integer> key, Lazy<
public void processPut(Lazy<Integer> key, Lazy<String> value) {
return;
}

@Override
public void close() throws IOException {

}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.duckdb;

import static com.linkedin.venice.sql.AvroToSQL.UnsupportedTypeHandling.FAIL;
import static com.linkedin.venice.sql.AvroToSQL.UnsupportedTypeHandling.SKIP;

import com.linkedin.davinci.client.DaVinciRecordTransformer;
import com.linkedin.davinci.client.DaVinciRecordTransformerResult;
Expand All @@ -9,7 +9,9 @@
import com.linkedin.venice.sql.InsertProcessor;
import com.linkedin.venice.sql.SQLUtils;
import com.linkedin.venice.sql.TableDefinition;
import com.linkedin.venice.utils.concurrent.CloseableThreadLocal;
import com.linkedin.venice.utils.lazy.Lazy;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
Expand All @@ -34,6 +36,10 @@ public class DuckDBDaVinciRecordTransformer
private final String versionTableName;
private final String duckDBUrl;
private final Set<String> columnsToProject;
private final CloseableThreadLocal<Connection> connection;
private final CloseableThreadLocal<PreparedStatement> deletePreparedStatement;
private final CloseableThreadLocal<PreparedStatement> upsertPreparedStatement;
private final InsertProcessor upsertProcessor;

public DuckDBDaVinciRecordTransformer(
int storeVersion,
Expand All @@ -49,6 +55,31 @@ public DuckDBDaVinciRecordTransformer(
this.versionTableName = buildStoreNameWithVersion(storeVersion);
this.duckDBUrl = "jdbc:duckdb:" + baseDir + "/" + duckDBFilePath;
this.columnsToProject = columnsToProject;
String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "key"); // TODO: Fix this, it is
// broken
String upsertStatement = AvroToSQL.upsertStatement(versionTableName, keySchema, inputValueSchema, columnsToProject);
this.connection = CloseableThreadLocal.withInitial(() -> {
try {
return DriverManager.getConnection(duckDBUrl);
} catch (SQLException e) {
throw new VeniceException("Failed to connect to DB!", e);
}
});
this.deletePreparedStatement = CloseableThreadLocal.withInitial(() -> {
try {
return this.connection.get().prepareStatement(deleteStatement);
} catch (SQLException e) {
throw new VeniceException("Failed to create PreparedStatement!", e);
}
});
this.upsertPreparedStatement = CloseableThreadLocal.withInitial(() -> {
try {
return this.connection.get().prepareStatement(upsertStatement);
} catch (SQLException e) {
throw new VeniceException("Failed to create PreparedStatement!", e);
}
});
this.upsertProcessor = AvroToSQL.upsertProcessor(keySchema, inputValueSchema, columnsToProject);
}

@Override
Expand All @@ -60,38 +91,18 @@ public DaVinciRecordTransformerResult<GenericRecord> transform(Lazy<GenericRecor

@Override
public void processPut(Lazy<GenericRecord> key, Lazy<GenericRecord> value) {
// TODO: Pre-allocate the upsert statement and everything that goes into it, as much as possible.
Schema keySchema = key.get().getSchema();
Schema valueSchema = value.get().getSchema();
String upsertStatement = AvroToSQL.upsertStatement(versionTableName, keySchema, valueSchema, this.columnsToProject);

// ToDo: Instead of creating a connection on every call, have a long-term connection. Maybe a connection pool?
try (Connection connection = DriverManager.getConnection(duckDBUrl)) {
// TODO: Pre-allocate the upsert processor as well
InsertProcessor upsertProcessor = AvroToSQL.upsertProcessor(keySchema, valueSchema, this.columnsToProject);

// TODO: Pre-allocate the prepared statement (consider thread-local if it's not thread safe)
try (PreparedStatement preparedStatement = connection.prepareStatement(upsertStatement)) {
upsertProcessor.process(key.get(), value.get(), preparedStatement);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
this.upsertProcessor.process(key.get(), value.get(), this.upsertPreparedStatement.get());
}

@Override
public void processDelete(Lazy<GenericRecord> key) {
// Unable to convert to prepared statement as table and column names can't be parameterized
// ToDo make delete non-hardcoded on primaryKey
String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "key");

// ToDo: Instead of creating a connection on every call, have a long-term connection. Maybe a connection pool?
try (Connection connection = DriverManager.getConnection(duckDBUrl);
PreparedStatement stmt = connection.prepareStatement(deleteStatement)) {
try {
PreparedStatement stmt = this.deletePreparedStatement.get();
// TODO: Fix this, it is broken.
stmt.setString(1, key.get().get("key").toString());
stmt.execute();
} catch (SQLException e) {
throw new RuntimeException(e);
throw new VeniceException("Failed to execute delete!");
}
}

Expand All @@ -104,7 +115,7 @@ public void onStartVersionIngestion(boolean isCurrentVersion) {
getKeySchema(),
getOutputValueSchema(),
this.columnsToProject,
FAIL,
SKIP,
true);
TableDefinition existingTableDefinition = SQLUtils.getTableDefinition(this.versionTableName, connection);
if (existingTableDefinition == null) {
Expand Down Expand Up @@ -151,11 +162,22 @@ public void onEndVersionIngestion(int currentVersion) {
}
}

public boolean useUniformInputValueSchema() {
return true;
}

public String getDuckDBUrl() {
return duckDBUrl;
}

public String buildStoreNameWithVersion(int version) {
return storeNameWithoutVersionInfo + "_v" + version;
}

@Override
public void close() throws IOException {
this.deletePreparedStatement.close();
this.upsertPreparedStatement.close();
this.connection.close();
}
}
Loading

0 comments on commit 1b1a5fc

Please sign in to comment.