diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java index 85ae9f9168a..754e1b199b4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/BlockingDaVinciRecordTransformer.java @@ -2,6 +2,7 @@ import com.linkedin.venice.annotation.Experimental; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.avro.Schema; @@ -56,4 +57,9 @@ public void onStartVersionIngestion(boolean isCurrentVersion) { public void onEndVersionIngestion(int currentVersion) { this.recordTransformer.onEndVersionIngestion(currentVersion); } + + @Override + public void close() throws IOException { + this.recordTransformer.close(); + } } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java index 4a5a98c39e5..c2572b6bdcb 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/DaVinciRecordTransformer.java @@ -5,6 +5,7 @@ import com.linkedin.venice.compression.VeniceCompressor; import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -29,7 +30,7 @@ * @param the type of the output value */ @Experimental -public abstract class DaVinciRecordTransformer { +public abstract class DaVinciRecordTransformer implements Closeable { /** * Version of the store of when the transformer is initialized. */ @@ -130,6 +131,10 @@ public void onEndVersionIngestion(int currentVersion) { return; } + public boolean useUniformInputValueSchema() { + return false; + } + // Final methods below /** diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java index 66ea96f4e9d..0bb09ad5a9f 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java @@ -320,6 +320,9 @@ public abstract class StoreIngestionTask implements Runnable, Closeable { protected final ChunkAssembler chunkAssembler; private final Optional cacheBackend; + private final Schema recordTransformerInputValueSchema; + private final AvroGenericDeserializer recordTransformerKeyDeserializer; + private final SparseConcurrentList recordTransformerDeserializersByPutSchemaId; private DaVinciRecordTransformer recordTransformer; protected final String localKafkaServer; @@ -475,18 +478,20 @@ public StoreIngestionTask( if (recordTransformerConfig != null && recordTransformerConfig.getRecordTransformerFunction() != null) { Schema keySchema = schemaRepository.getKeySchema(storeName).getSchema(); - Schema inputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema(); + this.recordTransformerKeyDeserializer = new AvroGenericDeserializer(keySchema, keySchema); + this.recordTransformerInputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema(); Schema outputValueSchema = recordTransformerConfig.getOutputValueSchema(); DaVinciRecordTransformer clientRecordTransformer = recordTransformerConfig.getRecordTransformerFunction() - .apply(versionNumber, keySchema, inputValueSchema, outputValueSchema); + .apply(versionNumber, keySchema, this.recordTransformerInputValueSchema, outputValueSchema); this.recordTransformer = new BlockingDaVinciRecordTransformer( clientRecordTransformer, keySchema, - inputValueSchema, + this.recordTransformerInputValueSchema, outputValueSchema, clientRecordTransformer.getStoreRecordsInDaVinci()); + this.recordTransformerDeserializersByPutSchemaId = new SparseConcurrentList<>(); versionedIngestionStats.registerTransformerLatencySensor(storeName, versionNumber); versionedIngestionStats.registerTransformerLifecycleStartLatency(storeName, versionNumber); @@ -503,6 +508,10 @@ public StoreIngestionTask( versionNumber, LatencyUtils.getElapsedTimeFromMsToMs(startTime), endTime); + } else { + this.recordTransformerKeyDeserializer = null; + this.recordTransformerInputValueSchema = null; + this.recordTransformerDeserializersByPutSchemaId = null; } this.localKafkaServer = this.kafkaProps.getProperty(KAFKA_BOOTSTRAP_SERVERS); @@ -3703,15 +3712,12 @@ private int processKafkaDataMessage( } // Check if put.getSchemaId is positive, if not default to 1 + // TODO: Write a test for chunked records... it does not seem right to transform negative schemas IDs into 1 int putSchemaId = put.getSchemaId() > 0 ? put.getSchemaId() : 1; if (recordTransformer != null) { long recordTransformerStartTime = System.currentTimeMillis(); ByteBuffer valueBytes = put.getPutValue(); - Schema valueSchema = schemaRepository.getValueSchema(storeName, putSchemaId).getSchema(); - Lazy recordDeserializer = - Lazy.of(() -> new AvroGenericDeserializer<>(valueSchema, valueSchema)); - ByteBuffer assembledObject = chunkAssembler.bufferAndAssembleRecord( consumerRecord.getTopicPartition(), putSchemaId, @@ -3726,12 +3732,22 @@ private int processKafkaDataMessage( return 0; } - SchemaEntry keySchema = schemaRepository.getKeySchema(storeName); - Lazy lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema)); + Lazy lazyKey = Lazy.of(() -> this.recordTransformerKeyDeserializer.deserialize(keyBytes)); Lazy lazyValue = Lazy.of(() -> { try { ByteBuffer decompressedAssembledObject = compressor.get().decompress(assembledObject); - return recordDeserializer.get().deserialize(decompressedAssembledObject); + + RecordDeserializer recordDeserializer = + this.recordTransformerDeserializersByPutSchemaId.computeIfAbsent(putSchemaId, i -> { + Schema valueSchema = schemaRepository.getValueSchema(storeName, putSchemaId).getSchema(); + if (this.recordTransformer.useUniformInputValueSchema()) { + return new AvroGenericDeserializer<>(valueSchema, this.recordTransformerInputValueSchema); + } else { + return new AvroGenericDeserializer<>(valueSchema, valueSchema); + } + }); + + return recordDeserializer.deserialize(decompressedAssembledObject); } catch (IOException e) { throw new RuntimeException(e); } @@ -3801,8 +3817,7 @@ private int processKafkaDataMessage( } if (recordTransformer != null) { - SchemaEntry keySchema = schemaRepository.getKeySchema(storeName); - Lazy lazyKey = Lazy.of(() -> deserializeAvroObjectAndReturn(ByteBuffer.wrap(keyBytes), keySchema)); + Lazy lazyKey = Lazy.of(() -> this.recordTransformerKeyDeserializer.deserialize(keyBytes)); recordTransformer.processDelete(lazyKey); // This is called here after processDelete because if the user stores their data somewhere other than @@ -4027,10 +4042,6 @@ private void deserializeValue( } } - private Object deserializeAvroObjectAndReturn(ByteBuffer input, SchemaEntry schemaEntry) { - return new AvroGenericDeserializer<>(schemaEntry.getSchema(), schemaEntry.getSchema()).deserialize(input); - } - private void maybeCloseInactiveIngestionTask() { LOGGER.warn("{} Has expired due to not being subscribed to any partitions for too long.", ingestionTaskName); if (!consumerActionsQueue.isEmpty()) { @@ -4096,6 +4107,7 @@ public synchronized void close() { versionNumber, LatencyUtils.getElapsedTimeFromMsToMs(startTime), endTime); + Utils.closeQuietlyWithErrorLogged(this.recordTransformer); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java index fab664eed59..1b156c24a61 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/config/DaVinciConfigTest.java @@ -10,6 +10,7 @@ import com.linkedin.davinci.client.DaVinciRecordTransformerConfig; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import org.apache.avro.Schema; import org.testng.annotations.Test; @@ -34,6 +35,11 @@ public DaVinciRecordTransformerResult transform(Lazy key, Lazy public void processPut(Lazy key, Lazy value) { return; } + + @Override + public void close() throws IOException { + + } } @Test diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java index c8076da06de..f2d83aec1c6 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/transformer/TestStringRecordTransformer.java @@ -3,6 +3,7 @@ import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import org.apache.avro.Schema; import org.apache.avro.util.Utf8; @@ -37,4 +38,9 @@ public DaVinciRecordTransformerResult transform(Lazy key, Lazy< public void processPut(Lazy key, Lazy value) { return; } + + @Override + public void close() throws IOException { + + } } diff --git a/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java b/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java index 5215dfb80ec..7b166f4e038 100644 --- a/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java +++ b/integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformer.java @@ -1,6 +1,6 @@ package com.linkedin.venice.duckdb; -import static com.linkedin.venice.sql.AvroToSQL.UnsupportedTypeHandling.FAIL; +import static com.linkedin.venice.sql.AvroToSQL.UnsupportedTypeHandling.SKIP; import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; @@ -9,7 +9,9 @@ import com.linkedin.venice.sql.InsertProcessor; import com.linkedin.venice.sql.SQLUtils; import com.linkedin.venice.sql.TableDefinition; +import com.linkedin.venice.utils.concurrent.CloseableThreadLocal; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -34,6 +36,10 @@ public class DuckDBDaVinciRecordTransformer private final String versionTableName; private final String duckDBUrl; private final Set columnsToProject; + private final CloseableThreadLocal connection; + private final CloseableThreadLocal deletePreparedStatement; + private final CloseableThreadLocal upsertPreparedStatement; + private final InsertProcessor upsertProcessor; public DuckDBDaVinciRecordTransformer( int storeVersion, @@ -49,6 +55,31 @@ public DuckDBDaVinciRecordTransformer( this.versionTableName = buildStoreNameWithVersion(storeVersion); this.duckDBUrl = "jdbc:duckdb:" + baseDir + "/" + duckDBFilePath; this.columnsToProject = columnsToProject; + String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "key"); // TODO: Fix this, it is + // broken + String upsertStatement = AvroToSQL.upsertStatement(versionTableName, keySchema, inputValueSchema, columnsToProject); + this.connection = CloseableThreadLocal.withInitial(() -> { + try { + return DriverManager.getConnection(duckDBUrl); + } catch (SQLException e) { + throw new VeniceException("Failed to connect to DB!", e); + } + }); + this.deletePreparedStatement = CloseableThreadLocal.withInitial(() -> { + try { + return this.connection.get().prepareStatement(deleteStatement); + } catch (SQLException e) { + throw new VeniceException("Failed to create PreparedStatement!", e); + } + }); + this.upsertPreparedStatement = CloseableThreadLocal.withInitial(() -> { + try { + return this.connection.get().prepareStatement(upsertStatement); + } catch (SQLException e) { + throw new VeniceException("Failed to create PreparedStatement!", e); + } + }); + this.upsertProcessor = AvroToSQL.upsertProcessor(keySchema, inputValueSchema, columnsToProject); } @Override @@ -60,38 +91,18 @@ public DaVinciRecordTransformerResult transform(Lazy key, Lazy value) { - // TODO: Pre-allocate the upsert statement and everything that goes into it, as much as possible. - Schema keySchema = key.get().getSchema(); - Schema valueSchema = value.get().getSchema(); - String upsertStatement = AvroToSQL.upsertStatement(versionTableName, keySchema, valueSchema, this.columnsToProject); - - // ToDo: Instead of creating a connection on every call, have a long-term connection. Maybe a connection pool? - try (Connection connection = DriverManager.getConnection(duckDBUrl)) { - // TODO: Pre-allocate the upsert processor as well - InsertProcessor upsertProcessor = AvroToSQL.upsertProcessor(keySchema, valueSchema, this.columnsToProject); - - // TODO: Pre-allocate the prepared statement (consider thread-local if it's not thread safe) - try (PreparedStatement preparedStatement = connection.prepareStatement(upsertStatement)) { - upsertProcessor.process(key.get(), value.get(), preparedStatement); - } - } catch (SQLException e) { - throw new RuntimeException(e); - } + this.upsertProcessor.process(key.get(), value.get(), this.upsertPreparedStatement.get()); } @Override public void processDelete(Lazy key) { - // Unable to convert to prepared statement as table and column names can't be parameterized - // ToDo make delete non-hardcoded on primaryKey - String deleteStatement = String.format(deleteStatementTemplate, versionTableName, "key"); - - // ToDo: Instead of creating a connection on every call, have a long-term connection. Maybe a connection pool? - try (Connection connection = DriverManager.getConnection(duckDBUrl); - PreparedStatement stmt = connection.prepareStatement(deleteStatement)) { + try { + PreparedStatement stmt = this.deletePreparedStatement.get(); + // TODO: Fix this, it is broken. stmt.setString(1, key.get().get("key").toString()); stmt.execute(); } catch (SQLException e) { - throw new RuntimeException(e); + throw new VeniceException("Failed to execute delete!"); } } @@ -104,7 +115,7 @@ public void onStartVersionIngestion(boolean isCurrentVersion) { getKeySchema(), getOutputValueSchema(), this.columnsToProject, - FAIL, + SKIP, true); TableDefinition existingTableDefinition = SQLUtils.getTableDefinition(this.versionTableName, connection); if (existingTableDefinition == null) { @@ -151,6 +162,10 @@ public void onEndVersionIngestion(int currentVersion) { } } + public boolean useUniformInputValueSchema() { + return true; + } + public String getDuckDBUrl() { return duckDBUrl; } @@ -158,4 +173,11 @@ public String getDuckDBUrl() { public String buildStoreNameWithVersion(int version) { return storeNameWithoutVersionInfo + "_v" + version; } + + @Override + public void close() throws IOException { + this.deletePreparedStatement.close(); + this.upsertPreparedStatement.close(); + this.connection.close(); + } } diff --git a/integrations/venice-duckdb/src/test/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformerTest.java b/integrations/venice-duckdb/src/test/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformerTest.java index 3f6e7644dac..5b913d73702 100644 --- a/integrations/venice-duckdb/src/test/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformerTest.java +++ b/integrations/venice-duckdb/src/test/java/com/linkedin/venice/duckdb/DuckDBDaVinciRecordTransformerTest.java @@ -12,6 +12,7 @@ import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.lazy.Lazy; import java.io.File; +import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -42,10 +43,10 @@ public void deleteClassHash() { } @Test - public void testRecordTransformer() { + public void testRecordTransformer() throws IOException { String tempDir = Utils.getTempDataDirectory().getAbsolutePath(); - DuckDBDaVinciRecordTransformer recordTransformer = new DuckDBDaVinciRecordTransformer( + try (DuckDBDaVinciRecordTransformer recordTransformer = new DuckDBDaVinciRecordTransformer( storeVersion, SINGLE_FIELD_RECORD_SCHEMA, NAME_RECORD_V1_SCHEMA, @@ -53,42 +54,44 @@ public void testRecordTransformer() { false, tempDir, storeName, - columnsToProject); + columnsToProject)) { + assertTrue(recordTransformer.useUniformInputValueSchema()); - Schema keySchema = recordTransformer.getKeySchema(); - assertEquals(keySchema.getType(), Schema.Type.RECORD); + Schema keySchema = recordTransformer.getKeySchema(); + assertEquals(keySchema.getType(), Schema.Type.RECORD); - Schema outputValueSchema = recordTransformer.getOutputValueSchema(); - assertEquals(outputValueSchema.getType(), Schema.Type.RECORD); + Schema outputValueSchema = recordTransformer.getOutputValueSchema(); + assertEquals(outputValueSchema.getType(), Schema.Type.RECORD); - recordTransformer.onStartVersionIngestion(true); + recordTransformer.onStartVersionIngestion(true); - GenericRecord keyRecord = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA); - keyRecord.put("key", "key"); - Lazy lazyKey = Lazy.of(() -> keyRecord); + GenericRecord keyRecord = new GenericData.Record(SINGLE_FIELD_RECORD_SCHEMA); + keyRecord.put("key", "key"); + Lazy lazyKey = Lazy.of(() -> keyRecord); - GenericRecord valueRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); - valueRecord.put("firstName", "Duck"); - valueRecord.put("lastName", "Goose"); - Lazy lazyValue = Lazy.of(() -> valueRecord); + GenericRecord valueRecord = new GenericData.Record(NAME_RECORD_V1_SCHEMA); + valueRecord.put("firstName", "Duck"); + valueRecord.put("lastName", "Goose"); + Lazy lazyValue = Lazy.of(() -> valueRecord); - DaVinciRecordTransformerResult transformerResult = recordTransformer.transform(lazyKey, lazyValue); - recordTransformer.processPut(lazyKey, lazyValue); - assertEquals(transformerResult.getResult(), DaVinciRecordTransformerResult.Result.UNCHANGED); - // Result will be empty when it's UNCHANGED - assertNull(transformerResult.getValue()); - assertNull(recordTransformer.transformAndProcessPut(lazyKey, lazyValue)); + DaVinciRecordTransformerResult transformerResult = recordTransformer.transform(lazyKey, lazyValue); + recordTransformer.processPut(lazyKey, lazyValue); + assertEquals(transformerResult.getResult(), DaVinciRecordTransformerResult.Result.UNCHANGED); + // Result will be empty when it's UNCHANGED + assertNull(transformerResult.getValue()); + assertNull(recordTransformer.transformAndProcessPut(lazyKey, lazyValue)); - recordTransformer.processDelete(lazyKey); + recordTransformer.processDelete(lazyKey); - assertFalse(recordTransformer.getStoreRecordsInDaVinci()); + assertFalse(recordTransformer.getStoreRecordsInDaVinci()); - int classHash = recordTransformer.getClassHash(); + int classHash = recordTransformer.getClassHash(); - DaVinciRecordTransformerUtility recordTransformerUtility = - recordTransformer.getRecordTransformerUtility(); - assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash)); - assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash)); + DaVinciRecordTransformerUtility recordTransformerUtility = + recordTransformer.getRecordTransformerUtility(); + assertTrue(recordTransformerUtility.hasTransformerLogicChanged(classHash)); + assertFalse(recordTransformerUtility.hasTransformerLogicChanged(classHash)); + } } @Test diff --git a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/CloseableThreadLocal.java b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/CloseableThreadLocal.java index e169c165aaf..baa79c15c3a 100644 --- a/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/CloseableThreadLocal.java +++ b/internal/venice-client-common/src/main/java/com/linkedin/venice/utils/concurrent/CloseableThreadLocal.java @@ -20,6 +20,10 @@ public class CloseableThreadLocal implements AutoClosea private final Set valueSet = new ConcurrentSkipListSet<>(new HashCodeComparator<>()); private final ThreadLocal threadLocal; + public static CloseableThreadLocal withInitial(Supplier initialValue) { + return new CloseableThreadLocal(initialValue); + } + /** * Creates a closeable thread local. The initial value of the * variable is determined by invoking the {@code get} method diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java index d0d9f58def7..18ad52ec13d 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestIntToStringRecordTransformer.java @@ -3,6 +3,7 @@ import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import org.apache.avro.Schema; @@ -30,4 +31,9 @@ public DaVinciRecordTransformerResult transform(Lazy key, Lazy< public void processPut(Lazy key, Lazy value) { return; } + + @Override + public void close() throws IOException { + + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java index 25a7fb39fca..260d1d3646c 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestSkipResultRecordTransformer.java @@ -3,6 +3,7 @@ import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; @@ -55,4 +56,8 @@ public void put(Integer key, String value) { inMemoryDB.put(key, value); } + @Override + public void close() throws IOException { + + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java index aba2c0baf69..7c6fea86b3f 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestStringRecordTransformer.java @@ -3,6 +3,7 @@ import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.avro.Schema; @@ -61,4 +62,8 @@ public void put(Integer key, String value) { inMemoryDB.put(key, value); } + @Override + public void close() throws IOException { + + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java index c1a5320fa08..a93a25a5ce9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestUnchangedResultRecordTransformer.java @@ -3,6 +3,7 @@ import com.linkedin.davinci.client.DaVinciRecordTransformer; import com.linkedin.davinci.client.DaVinciRecordTransformerResult; import com.linkedin.venice.utils.lazy.Lazy; +import java.io.IOException; import org.apache.avro.Schema; @@ -27,4 +28,8 @@ public void processPut(Lazy key, Lazy value) { return; } + @Override + public void close() throws IOException { + + } }