Skip to content

Commit 2b1a733

Browse files
committed
[server] Persist all non-sensitive lake properties into table options
1 parent 32003fc commit 2b1a733

File tree

6 files changed

+64
-72
lines changed

6 files changed

+64
-72
lines changed

fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import java.lang.reflect.Field;
2424
import java.util.Collections;
2525
import java.util.HashMap;
26+
import java.util.HashSet;
2627
import java.util.List;
2728
import java.util.Map;
29+
import java.util.Set;
2830

2931
/** Utilities of Fluss {@link ConfigOptions}. */
3032
@Internal
@@ -37,18 +39,32 @@ public class FlussConfigUtils {
3739
public static final String CLIENT_SECURITY_PREFIX = "client.security.";
3840

3941
public static final List<String> ALTERABLE_TABLE_OPTIONS;
42+
public static final Set<String> SENSITIVE_TABLE_OPTIONS = new HashSet<>();
4043

4144
static {
4245
TABLE_OPTIONS = extractConfigOptions("table.");
4346
CLIENT_OPTIONS = extractConfigOptions("client.");
4447
ALTERABLE_TABLE_OPTIONS =
4548
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
49+
50+
SENSITIVE_TABLE_OPTIONS.add("password");
51+
SENSITIVE_TABLE_OPTIONS.add("secret");
52+
SENSITIVE_TABLE_OPTIONS.add("key");
4653
}
4754

4855
public static boolean isTableStorageConfig(String key) {
4956
return key.startsWith(TABLE_PREFIX);
5057
}
5158

59+
public static boolean isTableLakeConfig(String dataLakeFormat, String key) {
60+
if (dataLakeFormat == null) {
61+
return false;
62+
}
63+
64+
String dataLakeConfigPrefix = TABLE_PREFIX + "datalake." + dataLakeFormat + ".";
65+
return key.startsWith(dataLakeConfigPrefix);
66+
}
67+
5268
public static boolean isAlterableTableOption(String key) {
5369
return ALTERABLE_TABLE_OPTIONS.contains(key);
5470
}

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -452,11 +452,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
452452
() -> admin.createTable(tablePath, logTableWithoutBucketKeys1, false).get())
453453
.cause()
454454
.isInstanceOf(LakeTableAlreadyExistException.class)
455-
.hasMessage(
456-
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
457-
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
458-
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=3, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, bucket-key=c1,c2, file.format=parquet, fluss.k1=v1}, comment=null}. "
459-
+ "Please first drop the table in Paimon catalog or use a new table name.");
455+
.hasMessageContaining(
456+
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible.");
460457

461458
// create log table with different fields will throw exception
462459
TableDescriptor logTableWithoutBucketKeys2 =
@@ -471,11 +468,8 @@ void testCreateLakeEnableTableWithExistLakeTable() throws Exception {
471468
() -> admin.createTable(tablePath, logTableWithoutBucketKeys2, false).get())
472469
.cause()
473470
.isInstanceOf(LakeTableAlreadyExistException.class)
474-
.hasMessage(
475-
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible. "
476-
+ "Existing schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}, "
477-
+ "new schema: UpdateSchema{fields=[`c1` STRING, `c2` INT, `c3` STRING, `__bucket` INT, `__offset` BIGINT, `__timestamp` TIMESTAMP(6) WITH LOCAL TIME ZONE], partitionKeys=[], primaryKeys=[], options={bucket=-1, fluss.table.replication.factor=1, fluss.table.datalake.enabled=true, fluss.table.datalake.format=paimon, partition.legacy-name=false, file.format=parquet, fluss.k1=v1}, comment=null}. "
478-
+ "Please first drop the table in Paimon catalog or use a new table name.");
471+
.hasMessageContaining(
472+
"The table `fluss`.`log_table_with_exist_lake_table` already exists in Paimon catalog, but the table schema is not compatible.");
479473
}
480474

481475
@Test

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,14 @@
119119

120120
import java.io.UncheckedIOException;
121121
import java.util.HashMap;
122+
import java.util.Iterator;
122123
import java.util.List;
123124
import java.util.Map;
124125
import java.util.concurrent.CompletableFuture;
125126
import java.util.function.Supplier;
126127
import java.util.stream.Collectors;
127128

129+
import static org.apache.fluss.config.FlussConfigUtils.SENSITIVE_TABLE_OPTIONS;
128130
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
129131
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
130132
import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -263,7 +265,7 @@ public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest req
263265
lakeCatalogDynamicLoader.getLakeCatalogContainer();
264266

265267
// Check table creation permissions based on table type
266-
validateTableCreationPermission(tableDescriptor, tablePath);
268+
validateTableCreationPermission(tableDescriptor);
267269

268270
// apply system defaults if the config is not set
269271
tableDescriptor =
@@ -315,14 +317,11 @@ public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest reques
315317
List<TableChange> tableChanges = toTableChanges(request.getConfigChangesList());
316318
TablePropertyChanges tablePropertyChanges = toTablePropertyChanges(tableChanges);
317319

318-
LakeCatalogDynamicLoader.LakeCatalogContainer lakeCatalogContainer =
319-
lakeCatalogDynamicLoader.getLakeCatalogContainer();
320320
metadataManager.alterTableProperties(
321321
tablePath,
322322
tableChanges,
323323
tablePropertyChanges,
324324
request.isIgnoreIfNotExists(),
325-
lakeCatalogContainer.getLakeCatalog(),
326325
lakeTableTieringManager,
327326
new DefaultLakeCatalogContext(false, currentSession().getPrincipal()));
328327

@@ -404,6 +403,18 @@ private TableDescriptor applySystemDefaults(
404403
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
405404
}
406405

406+
// add non-sensitive lake properties to the table properties
407+
if (dataLakeEnabled) {
408+
Map<String, String> tableLakeOptions =
409+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDefaultTableLakeOptions();
410+
removeSensitiveTableOptions(tableLakeOptions);
411+
if (tableLakeOptions != null && !tableLakeOptions.isEmpty()) {
412+
Map<String, String> newProperties = new HashMap<>(newDescriptor.getProperties());
413+
newProperties.putAll(tableLakeOptions);
414+
newDescriptor = newDescriptor.withProperties(newProperties);
415+
}
416+
}
417+
407418
// For tables with first_row or versioned merge engines, automatically set to IGNORE if
408419
// delete behavior is not set
409420
Configuration tableConf = Configuration.fromMap(tableDescriptor.getProperties());
@@ -427,6 +438,20 @@ private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) {
427438
return Boolean.parseBoolean(dataLakeEnabledValue);
428439
}
429440

441+
public void removeSensitiveTableOptions(Map<String, String> tableLakeOptions) {
442+
if (tableLakeOptions == null || tableLakeOptions.isEmpty()) {
443+
return;
444+
}
445+
446+
Iterator<Map.Entry<String, String>> iterator = tableLakeOptions.entrySet().iterator();
447+
while (iterator.hasNext()) {
448+
String key = iterator.next().getKey().toLowerCase();
449+
if (SENSITIVE_TABLE_OPTIONS.stream().anyMatch(key::contains)) {
450+
iterator.remove();
451+
}
452+
}
453+
}
454+
430455
@Override
431456
public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request) {
432457
TablePath tablePath = toTablePath(request.getTablePath());
@@ -734,11 +759,9 @@ private void validateHeartbeatRequest(
734759
* Validates whether the table creation is allowed based on the table type and configuration.
735760
*
736761
* @param tableDescriptor the table descriptor to validate
737-
* @param tablePath the table path for error reporting
738762
* @throws InvalidTableException if table creation is not allowed
739763
*/
740-
private void validateTableCreationPermission(
741-
TableDescriptor tableDescriptor, TablePath tablePath) {
764+
private void validateTableCreationPermission(TableDescriptor tableDescriptor) {
742765
boolean hasPrimaryKey = tableDescriptor.hasPrimaryKey();
743766

744767
if (hasPrimaryKey) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@
6161

6262
import java.util.Collection;
6363
import java.util.HashMap;
64-
import java.util.HashSet;
65-
import java.util.Iterator;
6664
import java.util.List;
6765
import java.util.Map;
6866
import java.util.Optional;
@@ -82,14 +80,6 @@ public class MetadataManager {
8280
private final int maxBucketNum;
8381
private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
8482

85-
public static final Set<String> SENSITIVE_TABLE_OPTIOINS = new HashSet<>();
86-
87-
static {
88-
SENSITIVE_TABLE_OPTIOINS.add("password");
89-
SENSITIVE_TABLE_OPTIOINS.add("secret");
90-
SENSITIVE_TABLE_OPTIOINS.add("key");
91-
}
92-
9383
/**
9484
* Creates a new metadata manager.
9585
*
@@ -325,15 +315,12 @@ public void alterTableProperties(
325315
List<TableChange> tableChanges,
326316
TablePropertyChanges tablePropertyChanges,
327317
boolean ignoreIfNotExists,
328-
@Nullable LakeCatalog lakeCatalog,
329318
LakeTableTieringManager lakeTableTieringManager,
330319
LakeCatalog.Context lakeCatalogContext) {
331320
try {
332321
// it throws TableNotExistException if the table or database not exists
333322
TableRegistration tableReg = getTableRegistration(tablePath);
334323
SchemaInfo schemaInfo = getLatestSchema(tablePath);
335-
// we can't use MetadataManager#getTable here, because it will add the default
336-
// lake options to the table properties, which may cause the validation failure
337324
TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo);
338325

339326
// validate the changes
@@ -357,7 +344,6 @@ public void alterTableProperties(
357344
tableDescriptor,
358345
newDescriptor,
359346
tableChanges,
360-
lakeCatalog,
361347
lakeCatalogContext);
362348
// update the table to zk
363349
TableRegistration updatedTableRegistration =
@@ -396,8 +382,10 @@ private void preAlterTableProperties(
396382
TableDescriptor tableDescriptor,
397383
TableDescriptor newDescriptor,
398384
List<TableChange> tableChanges,
399-
LakeCatalog lakeCatalog,
400385
LakeCatalog.Context lakeCatalogContext) {
386+
LakeCatalog lakeCatalog =
387+
lakeCatalogDynamicLoader.getLakeCatalogContainer().getLakeCatalog();
388+
401389
if (isDataLakeEnabled(newDescriptor)) {
402390
if (lakeCatalog == null) {
403391
throw new InvalidAlterTableException(
@@ -506,20 +494,6 @@ private boolean isDataLakeEnabled(Map<String, String> properties) {
506494
return Boolean.parseBoolean(dataLakeEnabledValue);
507495
}
508496

509-
public void removeSensitiveTableOptions(Map<String, String> tableLakeOptions) {
510-
if (tableLakeOptions == null || tableLakeOptions.isEmpty()) {
511-
return;
512-
}
513-
514-
Iterator<Map.Entry<String, String>> iterator = tableLakeOptions.entrySet().iterator();
515-
while (iterator.hasNext()) {
516-
String key = iterator.next().getKey().toLowerCase();
517-
if (SENSITIVE_TABLE_OPTIOINS.stream().anyMatch(key::contains)) {
518-
iterator.remove();
519-
}
520-
}
521-
}
522-
523497
public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
524498
Optional<TableRegistration> optionalTable;
525499
try {
@@ -533,10 +507,7 @@ public TableInfo getTable(TablePath tablePath) throws TableNotExistException {
533507
}
534508
TableRegistration tableReg = optionalTable.get();
535509
SchemaInfo schemaInfo = getLatestSchema(tablePath);
536-
Map<String, String> tableLakeOptions =
537-
lakeCatalogDynamicLoader.getLakeCatalogContainer().getDefaultTableLakeOptions();
538-
removeSensitiveTableOptions(tableLakeOptions);
539-
return tableReg.toTableInfo(tablePath, schemaInfo, tableLakeOptions);
510+
return tableReg.toTableInfo(tablePath, schemaInfo);
540511
}
541512

542513
public Map<TablePath, TableInfo> getTables(Collection<TablePath> tablePaths)
@@ -559,14 +530,7 @@ public Map<TablePath, TableInfo> getTables(Collection<TablePath> tablePaths)
559530
TableRegistration tableReg = tablePath2TableRegistrations.get(tablePath);
560531
SchemaInfo schemaInfo = tablePath2SchemaInfos.get(tablePath);
561532

562-
result.put(
563-
tablePath,
564-
tableReg.toTableInfo(
565-
tablePath,
566-
schemaInfo,
567-
lakeCatalogDynamicLoader
568-
.getLakeCatalogContainer()
569-
.getDefaultTableLakeOptions()));
533+
result.put(tablePath, tableReg.toTableInfo(tablePath, schemaInfo));
570534
}
571535
} catch (Exception e) {
572536
throw new FlussRuntimeException(

fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949

5050
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
5151
import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption;
52+
import static org.apache.fluss.config.FlussConfigUtils.isTableLakeConfig;
5253
import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig;
5354
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5455
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -75,6 +76,13 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
7576
// check properties should only contain table.* options,
7677
// and this cluster know it, and value is valid
7778
for (String key : tableConf.keySet()) {
79+
// skip datalake configs
80+
if (isTableLakeConfig(
81+
tableDescriptor.getProperties().get(ConfigOptions.TABLE_DATALAKE_FORMAT.key()),
82+
key)) {
83+
continue;
84+
}
85+
7886
if (!TABLE_OPTIONS.containsKey(key)) {
7987
if (isTableStorageConfig(key)) {
8088
throw new InvalidConfigException(

fluss-server/src/main/java/org/apache/fluss/server/zk/data/TableRegistration.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.fluss.server.zk.data;
1919

20-
import org.apache.fluss.config.ConfigOptions;
2120
import org.apache.fluss.config.Configuration;
2221
import org.apache.fluss.config.TableConfig;
2322
import org.apache.fluss.metadata.Schema;
@@ -87,20 +86,8 @@ public TableConfig getTableConfig() {
8786
}
8887

8988
public TableInfo toTableInfo(TablePath tablePath, SchemaInfo schemaInfo) {
90-
return toTableInfo(tablePath, schemaInfo, null);
91-
}
92-
93-
public TableInfo toTableInfo(
94-
TablePath tablePath,
95-
SchemaInfo schemaInfo,
96-
@Nullable Map<String, String> defaultTableLakeOptions) {
9789
Configuration properties = Configuration.fromMap(this.properties);
98-
if (defaultTableLakeOptions != null) {
99-
if (properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED)) {
100-
// only make the lake options visible when the datalake is enabled on the table
101-
defaultTableLakeOptions.forEach(properties::setString);
102-
}
103-
}
90+
10491
return new TableInfo(
10592
tablePath,
10693
this.tableId,

0 commit comments

Comments
 (0)