Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.config.cluster.ColumnPositionType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.FsPathAndFileName;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionInfo;
Expand Down Expand Up @@ -94,11 +96,13 @@
import org.apache.fluss.rpc.messages.RegisterProducerOffsetsRequest;
import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest;
import org.apache.fluss.rpc.protocol.MergeMode;
import org.apache.fluss.utils.InstantiationUtils;
import org.apache.fluss.utils.json.DataTypeJsonSerde;
import org.apache.fluss.utils.json.JsonSerdeUtils;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -653,10 +657,22 @@ public static PbAddColumn toPbAddColumn(TableChange.AddColumn addColumn) {
if (addColumn.getComment() != null) {
pbAddColumn.setComment(addColumn.getComment());
}
if (addColumn.getAggFunction().isPresent()) {
pbAddColumn.setSerializedAggFunction(
serializeAggFunction(addColumn.getAggFunction().get()));
}

return pbAddColumn;
}

private static byte[] serializeAggFunction(AggFunction aggFunction) {
try {
return InstantiationUtils.serializeObject(aggFunction);
} catch (IOException e) {
throw new FlussRuntimeException("Failed to serialize aggregation function.", e);
}
}

public static PbDropColumn toPbDropColumn(TableChange.DropColumn dropColumn) {
return new PbDropColumn().setColumnName(dropColumn.getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,62 @@ void testAlterTableColumn() throws Exception {
.hasMessageContaining("Column nested_row already exists");
}

@Test
void testAlterAggregationTableColumnWithAggFunction() throws Exception {
TablePath tablePath = TablePath.of("test_db", "alter_aggregation_table_column");
Map<String, String> properties = new HashMap<>();
properties.put(ConfigOptions.TABLE_MERGE_ENGINE.key(), "aggregation");
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("value", DataTypes.BIGINT(), AggFunctions.SUM())
.primaryKey("id")
.build())
.distributedBy(3, "id")
.properties(properties)
.build();
admin.createTable(tablePath, tableDescriptor, false).get();

admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.addColumn(
"new_value",
DataTypes.BIGINT(),
"new aggregate column",
TableChange.ColumnPosition.last(),
AggFunctions.SUM())),
false)
.get();

SchemaInfo schemaInfo = admin.getTableSchema(tablePath).get();
assertThat(schemaInfo.getSchema().getAggFunction("new_value")).hasValue(AggFunctions.SUM());
}

@Test
void testAlterNonAggregationTableColumnWithAggFunction() throws Exception {
TablePath tablePath = TablePath.of("test_db", "alter_non_aggregation_table_column");
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false).get();

assertThatThrownBy(
() ->
admin.alterTable(
tablePath,
Collections.singletonList(
TableChange.addColumn(
"new_value",
DataTypes.BIGINT(),
"new aggregate column",
TableChange.ColumnPosition.last(),
AggFunctions.SUM())),
false)
.get())
.hasMessageContaining(
"Aggregation function is only supported for aggregation merge engine table");
}

@Test
void testCreateInvalidDatabaseAndTable() throws Exception {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.annotation.Nullable;

import java.util.Objects;
import java.util.Optional;

/** {@link TableChange} represents the modification of the Fluss Table. */
public interface TableChange {
Expand All @@ -42,7 +43,21 @@ static AddColumn addColumn(
DataType dataType,
@Nullable String comment,
ColumnPosition position) {
return new AddColumn(columnName, dataType, comment, position);
return new AddColumn(columnName, dataType, comment, position, null);
}

/**
* A table change to add the column with specified position and aggregation function.
*
* @return a TableChange represents the modification.
*/
static AddColumn addColumn(
String columnName,
DataType dataType,
@Nullable String comment,
ColumnPosition position,
@Nullable AggFunction aggFunction) {
return new AddColumn(columnName, dataType, comment, position, aggFunction);
}

/**
Expand Down Expand Up @@ -230,15 +245,21 @@ class AddColumn implements SchemaChange {
private final String name;
private final DataType dataType;
private final @Nullable String comment;
private final @Nullable AggFunction aggFunction;

private final ColumnPosition position;

private AddColumn(
String name, DataType dataType, @Nullable String comment, ColumnPosition position) {
String name,
DataType dataType,
@Nullable String comment,
ColumnPosition position,
@Nullable AggFunction aggFunction) {
this.name = name;
this.dataType = dataType;
this.comment = comment;
this.position = position;
this.aggFunction = aggFunction;
}

public String getName() {
Expand All @@ -258,6 +279,10 @@ public ColumnPosition getPosition() {
return position;
}

public Optional<AggFunction> getAggFunction() {
return Optional.ofNullable(aggFunction);
}

@Override
public String toString() {
return "AddColumn{"
Expand All @@ -269,6 +294,8 @@ public String toString() {
+ ", comment='"
+ comment
+ '\''
+ ", aggFunction="
+ aggFunction
+ ", position="
+ position
+ '}';
Expand Down
1 change: 1 addition & 0 deletions fluss-rpc/src/main/proto/FlussApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,7 @@ message PbAddColumn {
required bytes data_type_json = 2;
optional string comment = 3;
required int32 column_position_type = 4; // LAST=0,FIRST=1,AFTER=3
optional bytes serialized_agg_function = 5;
}

message PbDropColumn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.DatabaseInfo;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.MergeEngineType;
import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.SchemaInfo;
Expand Down Expand Up @@ -75,7 +76,9 @@
import java.util.Set;
import java.util.concurrent.Callable;

import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAggregationFunctionParameters;
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateAlterTableProperties;
import static org.apache.fluss.server.utils.TableDescriptorValidation.validateNoAggregationFunctions;

/** A manager for metadata. */
public class MetadataManager {
Expand Down Expand Up @@ -440,6 +443,7 @@ public void alterTableSchema(
if (!schemaChanges.isEmpty()) {
Schema newSchema =
SchemaUpdate.applySchemaChanges(table.getSchema(), schemaChanges);
validateAlterTableSchema(table, newSchema);
LakeCatalog.Context lakeCatalogContext =
new CoordinatorService.DefaultLakeCatalogContext(
false,
Expand Down Expand Up @@ -473,6 +477,17 @@ public void alterTableSchema(
}
}

static void validateAlterTableSchema(TableInfo table, Schema newSchema) {
if (table.getTableConfig()
.getMergeEngineType()
.map(MergeEngineType.AGGREGATION::equals)
.orElse(false)) {
validateAggregationFunctionParameters(newSchema);
} else {
validateNoAggregationFunctions(newSchema);
}
}

private void syncSchemaChangesToLake(
TablePath tablePath,
TableInfo tableInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,12 @@ private SchemaUpdate addColumn(TableChange.AddColumn addColumn) {
}

// Delegate the actual addition to the builder
builder.column(addColumn.getName(), addColumn.getDataType());
if (addColumn.getAggFunction().isPresent()) {
builder.column(
addColumn.getName(), addColumn.getDataType(), addColumn.getAggFunction().get());
} else {
builder.column(addColumn.getName(), addColumn.getDataType());
}

// Fixed: Use null check for the String comment
String comment = addColumn.getComment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.config.cluster.ColumnPositionType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.metadata.AggFunction;
import org.apache.fluss.metadata.DatabaseChange;
import org.apache.fluss.metadata.DatabaseSummary;
import org.apache.fluss.metadata.PartitionSpec;
Expand Down Expand Up @@ -197,6 +199,7 @@
import org.apache.fluss.server.zk.data.PartitionRegistration;
import org.apache.fluss.server.zk.data.lake.LakeTable;
import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot;
import org.apache.fluss.utils.InstantiationUtils;
import org.apache.fluss.utils.json.DataTypeJsonSerde;
import org.apache.fluss.utils.json.JsonSerdeUtils;
import org.apache.fluss.utils.json.TableBucketOffsets;
Expand All @@ -206,6 +209,7 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -355,17 +359,33 @@ public static List<TableChange> toAddColumns(List<PbAddColumn> addColumns) {
return addColumns.stream()
.filter(Objects::nonNull)
.map(
pbAddColumn ->
TableChange.addColumn(
pbAddColumn.getColumnName(),
JsonSerdeUtils.readValue(
pbAddColumn.getDataTypeJson(),
DataTypeJsonSerde.INSTANCE),
pbAddColumn.hasComment() ? pbAddColumn.getComment() : null,
toColumnPosition(pbAddColumn.getColumnPositionType())))
pbAddColumn -> {
AggFunction aggFunction =
pbAddColumn.hasSerializedAggFunction()
? deserializeAggFunction(
pbAddColumn.getSerializedAggFunction())
: null;
return TableChange.addColumn(
pbAddColumn.getColumnName(),
JsonSerdeUtils.readValue(
pbAddColumn.getDataTypeJson(),
DataTypeJsonSerde.INSTANCE),
pbAddColumn.hasComment() ? pbAddColumn.getComment() : null,
toColumnPosition(pbAddColumn.getColumnPositionType()),
aggFunction);
})
.collect(Collectors.toList());
}

private static AggFunction deserializeAggFunction(byte[] serializedAggFunction) {
try {
return InstantiationUtils.deserializeObject(
serializedAggFunction, AggFunction.class.getClassLoader());
} catch (IOException | ClassNotFoundException e) {
throw new FlussRuntimeException("Failed to deserialize aggregation function.", e);
}
}

public static List<TableChange.SchemaChange> toDropColumns(List<PbDropColumn> dropColumns) {
return dropColumns.stream()
.filter(Objects::nonNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ private static void checkArrowCompression(Configuration tableConf) {
private static void checkMergeEngine(
Configuration tableConf, boolean hasPrimaryKey, Schema schema) {
MergeEngineType mergeEngine = tableConf.get(ConfigOptions.TABLE_MERGE_ENGINE);
if (mergeEngine != MergeEngineType.AGGREGATION) {
validateNoAggregationFunctions(schema);
}
if (mergeEngine != null) {
if (!hasPrimaryKey) {
throw new InvalidConfigException(
Expand Down Expand Up @@ -377,6 +380,20 @@ private static void checkMergeEngine(
}
}

/** Validates that the schema doesn't contain any aggregation functions. */
public static void validateNoAggregationFunctions(Schema schema) {
for (Schema.Column column : schema.getColumns()) {
Optional<AggFunction> aggFunction = column.getAggFunction();
if (aggFunction.isPresent()) {
throw new InvalidConfigException(
String.format(
"Aggregation function is only supported for aggregation merge engine table, "
+ "but column '%s' has aggregation function '%s'.",
column.getName(), aggFunction.get()));
}
}
}

/**
* Validates aggregation function parameters in the schema.
*
Expand All @@ -388,7 +405,7 @@ private static void checkMergeEngine(
* @throws InvalidConfigException if any aggregation function has invalid parameters or data
* types
*/
private static void validateAggregationFunctionParameters(Schema schema) {
public static void validateAggregationFunctionParameters(Schema schema) {
// Get primary key columns for early exit
List<String> primaryKeys = schema.getPrimaryKeyColumnNames();

Expand Down