Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -73,6 +74,9 @@ public class FlussSinkBuilder<InputT> {
private final Map<String, String> configOptions = new HashMap<>();
private FlussSerializationSchema<InputT> serializationSchema;
private boolean shuffleByBucketId = true;
// Optional list of columns for partial update. When set, upsert will only update these columns.
// The primary key columns must be fully specified in this list.
private List<String> partialUpdateColumns;

/** Set the bootstrap server for the sink. */
public FlussSinkBuilder<InputT> setBootstrapServers(String bootstrapServers) {
Expand All @@ -98,6 +102,23 @@ public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId)
return this;
}

/**
* Enable partial update by specifying the column names to update for upsert tables. Primary key
* columns must be included in this list.
*/
public FlussSinkBuilder<InputT> setPartialUpdateColumns(List<String> columns) {
this.partialUpdateColumns = columns;
return this;
}

/**
* Enable partial update by specifying the column names to update for upsert tables. Convenience
* varargs overload.
*/
public FlussSinkBuilder<InputT> setPartialUpdateColumns(String... columns) {
return setPartialUpdateColumns(columns == null ? null : Arrays.asList(columns));
}

/** Set a configuration option. */
public FlussSinkBuilder<InputT> setOption(String key, String value) {
configOptions.put(key, value);
Expand Down Expand Up @@ -153,12 +174,17 @@ public FlussSink<InputT> build() {

if (isUpsert) {
LOG.info("Initializing Fluss upsert sink writer ...");
int[] targetColumnIndexes =
computeTargetColumnIndexes(
tableRowType.getFieldNames(),
tableInfo.getPrimaryKeys(),
partialUpdateColumns);
writerBuilder =
new FlinkSink.UpsertSinkWriterBuilder<>(
tablePath,
flussConfig,
tableRowType,
null, // not support partialUpdateColumns yet
targetColumnIndexes,
numBucket,
bucketKeys,
partitionKeys,
Expand Down Expand Up @@ -193,4 +219,48 @@ private void validateConfiguration() {
checkNotNull(tableName, "Table name is required but not provided.");
checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
}

// -------------- Test-visible helper methods --------------
/**
* Computes target column indexes for partial updates. If {@code specifiedColumns} is null or
* empty, returns null indicating full update. Validates that all primary key columns are
* included in the specified columns.
*
* @param allFieldNames the list of all field names in table row type order
* @param primaryKeyNames the list of primary key column names
* @param specifiedColumns the optional list of columns specified for partial update
* @return the indexes into {@code allFieldNames} corresponding to {@code specifiedColumns}, or
* null for full update
* @throws IllegalArgumentException if a specified column does not exist or primary key coverage
* is incomplete
*/
static int[] computeTargetColumnIndexes(
List<String> allFieldNames,
List<String> primaryKeyNames,
List<String> specifiedColumns) {
if (specifiedColumns == null || specifiedColumns.isEmpty()) {
return null; // full update
}

// Map specified column names to indexes
int[] indexes = new int[specifiedColumns.size()];
for (int i = 0; i < specifiedColumns.size(); i++) {
String col = specifiedColumns.get(i);
int idx = allFieldNames.indexOf(col);
checkArgument(
idx >= 0, "Column '%s' not found in table schema: %s", col, allFieldNames);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Slightly more context can be provided in the message e.g. "Partial update column '%s' not found in table schema: %s'

indexes[i] = idx;
}

// Validate that all primary key columns are covered
for (String pk : primaryKeyNames) {
checkArgument(
specifiedColumns.contains(pk),
"Partial updates must include all primary key columns. Missing primary key column: %s. Provided columns: %s",
pk,
specifiedColumns);
}

return indexes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.Test;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -171,12 +172,55 @@ void testFluentChaining() {
.setTable(tableName)
.setOption("key1", "value1")
.setOptions(new HashMap<>())
.setShuffleByBucketId(false);
.setShuffleByBucketId(false)
.setPartialUpdateColumns("id", "price");

// Verify the builder instance is returned
assertThat(chainedBuilder).isInstanceOf(FlussSinkBuilder.class);
}

@Test
void testComputeTargetColumnIndexesFullUpdate() {
int[] result =
FlussSinkBuilder.computeTargetColumnIndexes(
Arrays.asList("id", "name", "price"), Arrays.asList("id"), null);
assertThat(result).isNull();
}

@Test
void testComputeTargetColumnIndexesValidPartialIncludesPk() {
int[] result =
FlussSinkBuilder.computeTargetColumnIndexes(
Arrays.asList("id", "name", "price", "ts"),
Arrays.asList("id"),
Arrays.asList("id", "price"));
assertThat(result).containsExactly(0, 2);
}

@Test
void testComputeTargetColumnIndexesMissingPkThrows() {
assertThatThrownBy(
() ->
FlussSinkBuilder.computeTargetColumnIndexes(
Arrays.asList("id", "name", "price"),
Arrays.asList("id"),
Arrays.asList("name", "price")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Partial updates must include all primary key columns");
}

@Test
void testComputeTargetColumnIndexesUnknownColumnThrows() {
assertThatThrownBy(
() ->
FlussSinkBuilder.computeTargetColumnIndexes(
Arrays.asList("id", "name"),
Arrays.asList("id"),
Arrays.asList("id", "unknown")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("not found in table schema");
}

// Helper method to get private field values using reflection
@SuppressWarnings("unchecked")
private <T> T getFieldValue(Object object, String fieldName) throws Exception {
Expand Down
Loading