Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[duckdb] Created a CLI tool to interact with DuckDBDaVinciRecordTransformer #1473

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,11 @@ ext.createDiffFile = { ->
// admin-tool
':!clients/venice-admin-tool/*',

// Duck Vinci Tool
':!integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/Arg.java',
':!integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/DuckVinciTool.java',
':!integrations/venice-duckdb/src/main/java/com/linkedin/venice/duckdb/Command.java',

// Keep this last
// Other files that have tests but are not executed in the regular unit test task
':!internal/alpini/*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,12 @@ private VeniceConfigLoader buildVeniceConfig() {
kafkaBootstrapServers = backendConfig.getString(KAFKA_BOOTSTRAP_SERVERS);
}

String recordTransformerOutputValueSchema = "null";

if (daVinciConfig.isRecordTransformerEnabled() && recordTransformerConfig.getOutputValueSchema() != null) {
recordTransformerOutputValueSchema = recordTransformerConfig.getOutputValueSchema().toString();
}

VeniceProperties config = new PropertyBuilder().put(KAFKA_ADMIN_CLASS, ApacheKafkaAdminAdapter.class.getName())
.put(ROCKSDB_LEVEL0_FILE_NUM_COMPACTION_TRIGGER, 4) // RocksDB default config
.put(ROCKSDB_LEVEL0_SLOWDOWN_WRITES_TRIGGER, 20) // RocksDB default config
Expand All @@ -712,11 +718,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(
RECORD_TRANSFORMER_VALUE_SCHEMA,
daVinciConfig.isRecordTransformerEnabled()
? recordTransformerConfig.getOutputValueSchema().toString()
: "null")
.put(RECORD_TRANSFORMER_VALUE_SCHEMA, recordTransformerOutputValueSchema)
.put(INGESTION_ISOLATION_CONFIG_PREFIX + "." + INGESTION_MEMORY_LIMIT, -1) // Explicitly disable memory limiter
// in Isolated Process
.put(backendConfig.toProperties())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ public class DaVinciRecordTransformerConfig {
private final Class outputValueClass;
private final Schema outputValueSchema;

/**
* Use this constructor if you don't intend on transforming records
* @param recordTransformerFunction the functional interface for creating a {@link DaVinciRecordTransformer}
*/
public DaVinciRecordTransformerConfig(DaVinciRecordTransformerFunctionalInterface recordTransformerFunction) {
this(recordTransformerFunction, null, null);
}

/**
* @param recordTransformerFunction the functional interface for creating a {@link DaVinciRecordTransformer}
* @param outputValueClass the class of the output value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,11 @@ public StoreIngestionTask(
this.recordTransformerInputValueSchema = schemaRepository.getSupersetOrLatestValueSchema(storeName).getSchema();
Schema outputValueSchema = recordTransformerConfig.getOutputValueSchema();

// User doesn't intend on transforming records. Use input value schema instead.
if (outputValueSchema == null) {
outputValueSchema = this.recordTransformerInputValueSchema;
}

DaVinciRecordTransformer clientRecordTransformer = recordTransformerConfig.getRecordTransformerFunction()
.apply(versionNumber, keySchema, this.recordTransformerInputValueSchema, outputValueSchema);

Expand Down
2 changes: 2 additions & 0 deletions integrations/venice-duckdb/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ dependencies {

api project(':clients:da-vinci-client')
api project(':internal:venice-client-common')
api project(':clients:venice-thin-client')

api project(':internal:venice-common')

implementation libraries.commonsCli
testImplementation project(':internal:venice-common')

integrationTestImplementation project(path: ':internal:venice-test-common', configuration: 'integrationTestUtils')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,7 @@ public void testRecordTransformer() throws Exception {
false,
tmpDir.getAbsolutePath(),
storeName,
columnsToProject),
GenericRecord.class,
NAME_RECORD_V1_SCHEMA);
columnsToProject));
clientConfig.setRecordTransformerConfig(recordTransformerConfig);

DaVinciClient<Integer, Object> clientWithRecordTransformer =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.linkedin.venice.duckdb;

public enum Arg {
STORE_NAME("store-name", "sn", true, "The name of the Venice store you want to ingest from"),
CLUSTER_DISCOVERY_D2_SERVICE_NAME(
"cluster-discovery-d2-service-name", "cdd2sn", true,
"The D2 service name used to discovery venice clusters. Ex: venice-discovery"
), ZK_HOST_URL("zk-host-url", "zkurl", true, "The zk host url used by D2"),
DUCKDB_OUTPUT_DIRECTORY(
"duckdb-output-directory", "ouputdir", true, "The directory of where you want your DuckDB file to be written to"
),
DISABLE_LOG("disable-log", "dl", false, "Disable logs from internal classes. Only print command output on console"),
FLAT_JSON("flat-json", "flj", false, "Display output as flat json, without pretty-print indentation and line breaks"),
HELP("help", "h", false, "Show usage"), SSL_CONFIG_PATH("ssl-config-path", "scp", true, "SSl config file path"),;

private final String argName;
private final String first;
private final boolean parameterized;
private final String helpText;

Arg(String argName, String first, boolean parameterized, String helpText) {
this.argName = argName;
this.first = first;
this.parameterized = parameterized;
this.helpText = helpText;
}

@Override
public String toString() {
return argName;
}

public String first() {
return first;
}

public String getArgName() {
return argName;
}

public String getHelpText() {
return helpText;
}

public boolean isParameterized() {
return parameterized;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.linkedin.venice.duckdb;

import static com.linkedin.venice.duckdb.Arg.CLUSTER_DISCOVERY_D2_SERVICE_NAME;
import static com.linkedin.venice.duckdb.Arg.DUCKDB_OUTPUT_DIRECTORY;
import static com.linkedin.venice.duckdb.Arg.STORE_NAME;
import static com.linkedin.venice.duckdb.Arg.ZK_HOST_URL;

import com.linkedin.venice.exceptions.VeniceException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.StringJoiner;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;


/**
* Contains all the possible commands for DuckVinciTool
*/
public enum Command {
GET(
"get", "Ingests all of the data from a Venice store into DuckDB",
new Arg[] { STORE_NAME, CLUSTER_DISCOVERY_D2_SERVICE_NAME, ZK_HOST_URL }, new Arg[] { DUCKDB_OUTPUT_DIRECTORY }
);

private final String commandName;
private final String description;
private final Arg[] requiredArgs;
private final Arg[] optionalArgs;

Command(String argName, String description, Arg[] requiredArgs) {
this(argName, description, requiredArgs, new Arg[] {});
}

Command(String argName, String description, Arg[] requiredArgs, Arg[] optionalArgs) {
this.commandName = argName;
this.description = description;
this.requiredArgs = requiredArgs;
this.optionalArgs = optionalArgs;
}

@Override
public String toString() {
return commandName;
}

public Arg[] getRequiredArgs() {
return requiredArgs;
}

public Arg[] getOptionalArgs() {
return optionalArgs;
}

public String getDesc() {
StringJoiner sj = new StringJoiner("");
if (!description.isEmpty()) {
sj.add(description);
sj.add(". ");
}

StringJoiner requiredArgs = new StringJoiner(" ");
for (Arg arg: getRequiredArgs()) {
requiredArgs.add("--" + arg.toString());
}

sj.add("\nRequires: " + requiredArgs);

StringJoiner optionalArgs = new StringJoiner(" ");
for (Arg arg: getOptionalArgs()) {
optionalArgs.add("--" + arg.toString());
}

if (getOptionalArgs().length > 0) {
sj.add("\nOptional args: " + optionalArgs.toString());
}

return sj.toString();
}

public static final Comparator<Command> commandComparator = new Comparator<Command>() {
public int compare(Command c1, Command c2) {
return c1.commandName.compareTo(c2.commandName);
}
};

public static Command getCommand(String name, CommandLine cmdLine) {
for (Command cmd: values()) {
if (cmd.commandName.equals(name)) {
return cmd;
}
}
if (name == null) {
List<String> candidateCommands = Arrays.stream(Command.values())
.filter(
command -> Arrays.stream(command.getRequiredArgs()).allMatch(arg -> cmdLine.hasOption(arg.toString())))
.map(command -> "--" + command)
.collect(Collectors.toList());
if (!candidateCommands.isEmpty()) {
throw new VeniceException(
"No command found, potential commands compatible with the provided parameters include: "
+ Arrays.toString(candidateCommands.toArray()));
}
}
String message = name == null
? " No command found, Please specify a command, eg [--get] "
: "No Command found with name: " + name;
throw new VeniceException(message);
}
}
Loading
Loading