Skip to content

Commit 279a97d

Browse files
committed
POC: Support Add Column.
# Conflicts: # fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
1 parent 3fcde27 commit 279a97d

File tree

194 files changed

+5437
-835
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

194 files changed

+5437
-835
lines changed

fluss-client/src/main/java/org/apache/fluss/client/Connection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.admin.Admin;
2222
import org.apache.fluss.client.table.Table;
2323
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.metadata.SchemaInfo;
2425
import org.apache.fluss.metadata.TablePath;
2526

2627
import javax.annotation.concurrent.ThreadSafe;
@@ -56,6 +57,16 @@ public interface Connection extends AutoCloseable {
5657
/** Retrieve a new Table client to operate data in table. */
5758
Table getTable(TablePath tablePath);
5859

60+
/**
61+
* Retrieve a new Table client to operate data in table with specific schema. When performing
62+
* read, write and loop operations, this schema will be used.
63+
*
64+
* @param tablePath the path of the table to operate on
65+
* @param schemaInfo the schema information to be used for the table operations
66+
* @return a new Table client instance for the specified table and schema
67+
*/
68+
Table getTable(TablePath tablePath, SchemaInfo schemaInfo);
69+
5970
/** Close the connection and release all resources. */
6071
@Override
6172
void close() throws Exception;

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.exception.FlussRuntimeException;
3535
import org.apache.fluss.fs.FileSystem;
36+
import org.apache.fluss.metadata.SchemaInfo;
3637
import org.apache.fluss.metadata.TableInfo;
3738
import org.apache.fluss.metadata.TablePath;
3839
import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -104,6 +105,14 @@ public Table getTable(TablePath tablePath) {
104105
return new FlussTable(this, tablePath, tableInfo);
105106
}
106107

108+
@Override
109+
public Table getTable(TablePath tablePath, SchemaInfo schemaInfo) {
110+
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
111+
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
112+
tableInfo = TableInfo.of(tableInfo, schemaInfo);
113+
return new FlussTable(this, tablePath, tableInfo);
114+
}
115+
107116
public RpcClient getRpcClient() {
108117
return rpcClient;
109118
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import java.util.concurrent.CompletableFuture;
9191
import java.util.stream.Collectors;
9292

93+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.addPbAlterSchemas;
9394
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9495
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9596
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
@@ -249,12 +250,17 @@ public CompletableFuture<Void> alterTable(
249250
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
250251
tablePath.validate();
251252
AlterTableRequest request = new AlterTableRequest();
252-
253253
List<PbAlterConfig> pbFlussTableChanges =
254254
tableChanges.stream()
255+
.filter(tableChange -> !(tableChange instanceof TableChange.SchemaChange))
255256
.map(ClientRpcMessageUtils::toPbAlterConfigs)
256257
.collect(Collectors.toList());
257258

259+
List<TableChange> schemaChanges =
260+
tableChanges.stream()
261+
.filter(tableChange -> tableChange instanceof TableChange.SchemaChange)
262+
.collect(Collectors.toList());
263+
addPbAlterSchemas(request, schemaChanges);
258264
request.addAllConfigChanges(pbFlussTableChanges)
259265
.setIgnoreIfNotExists(ignoreIfNotExists)
260266
.setTablePath()

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25+
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.SchemaGetter;
2527
import org.apache.fluss.metadata.TableBucket;
2628
import org.apache.fluss.metadata.TableInfo;
2729
import org.apache.fluss.row.InternalRow;
28-
import org.apache.fluss.row.decode.RowDecoder;
30+
import org.apache.fluss.row.PruneRow;
2931
import org.apache.fluss.row.encode.KeyEncoder;
3032
import org.apache.fluss.row.encode.ValueDecoder;
31-
import org.apache.fluss.types.DataType;
3233
import org.apache.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -68,8 +69,11 @@ class PrefixKeyLookuper implements Lookuper {
6869
/** Decode the lookup bytes to result row. */
6970
private final ValueDecoder kvValueDecoder;
7071

72+
private final SchemaGetter schemaGetter;
73+
7174
public PrefixKeyLookuper(
7275
TableInfo tableInfo,
76+
SchemaGetter schemaGetter,
7377
MetadataUpdater metadataUpdater,
7478
LookupClient lookupClient,
7579
List<String> lookupColumnNames) {
@@ -91,10 +95,8 @@ public PrefixKeyLookuper(
9195
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
9296
: null;
9397
this.kvValueDecoder =
94-
new ValueDecoder(
95-
RowDecoder.create(
96-
tableInfo.getTableConfig().getKvFormat(),
97-
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
98+
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
99+
this.schemaGetter = schemaGetter;
98100
}
99101

100102
private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
@@ -174,7 +176,17 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
174176
if (valueBytes == null) {
175177
continue;
176178
}
177-
rowList.add(kvValueDecoder.decodeValue(valueBytes).row);
179+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
180+
InternalRow row;
181+
if (value.schemaId == tableInfo.getSchemaId()) {
182+
row = value.row;
183+
} else {
184+
Schema schema = schemaGetter.getSchema(value.schemaId);
185+
row =
186+
PruneRow.from(schema, tableInfo.getSchema())
187+
.replaceRow(value.row);
188+
}
189+
rowList.add(row);
178190
}
179191
return new LookupResult(rowList);
180192
});

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25+
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.SchemaGetter;
2527
import org.apache.fluss.metadata.TableBucket;
2628
import org.apache.fluss.metadata.TableInfo;
2729
import org.apache.fluss.row.InternalRow;
28-
import org.apache.fluss.row.decode.RowDecoder;
30+
import org.apache.fluss.row.PruneRow;
2931
import org.apache.fluss.row.encode.KeyEncoder;
3032
import org.apache.fluss.row.encode.ValueDecoder;
31-
import org.apache.fluss.types.DataType;
3233
import org.apache.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -65,8 +66,13 @@ class PrimaryKeyLookuper implements Lookuper {
6566
/** Decode the lookup bytes to result row. */
6667
private final ValueDecoder kvValueDecoder;
6768

69+
private final SchemaGetter schemaGetter;
70+
6871
public PrimaryKeyLookuper(
69-
TableInfo tableInfo, MetadataUpdater metadataUpdater, LookupClient lookupClient) {
72+
TableInfo tableInfo,
73+
SchemaGetter schemaGetter,
74+
MetadataUpdater metadataUpdater,
75+
LookupClient lookupClient) {
7076
checkArgument(
7177
tableInfo.hasPrimaryKey(),
7278
"Log table %s doesn't support lookup",
@@ -94,10 +100,8 @@ public PrimaryKeyLookuper(
94100
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
95101
: null;
96102
this.kvValueDecoder =
97-
new ValueDecoder(
98-
RowDecoder.create(
99-
tableInfo.getTableConfig().getKvFormat(),
100-
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
103+
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
104+
this.schemaGetter = schemaGetter;
101105
}
102106

103107
@Override
@@ -129,10 +133,19 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
129133
.lookup(tableBucket, pkBytes)
130134
.thenApply(
131135
valueBytes -> {
132-
InternalRow row =
133-
valueBytes == null
134-
? null
135-
: kvValueDecoder.decodeValue(valueBytes).row;
136+
InternalRow row = null;
137+
if (valueBytes != null) {
138+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
139+
if (value.schemaId == tableInfo.getSchemaId()) {
140+
row = value.row;
141+
} else {
142+
Schema schema = schemaGetter.getSchema(value.schemaId);
143+
row =
144+
PruneRow.from(schema, tableInfo.getSchema())
145+
.replaceRow(value.row);
146+
}
147+
}
148+
136149
return new LookupResult(row);
137150
});
138151
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.metadata.SchemaGetter;
2122
import org.apache.fluss.metadata.TableInfo;
2223

2324
import javax.annotation.Nullable;
@@ -28,39 +29,46 @@
2829
public class TableLookup implements Lookup {
2930

3031
private final TableInfo tableInfo;
32+
private final SchemaGetter schemaGetter;
3133
private final MetadataUpdater metadataUpdater;
3234
private final LookupClient lookupClient;
3335

3436
@Nullable private final List<String> lookupColumnNames;
3537

3638
public TableLookup(
37-
TableInfo tableInfo, MetadataUpdater metadataUpdater, LookupClient lookupClient) {
38-
this(tableInfo, metadataUpdater, lookupClient, null);
39+
TableInfo tableInfo,
40+
SchemaGetter schemaGetter,
41+
MetadataUpdater metadataUpdater,
42+
LookupClient lookupClient) {
43+
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null);
3944
}
4045

4146
private TableLookup(
4247
TableInfo tableInfo,
48+
SchemaGetter schemaGetter,
4349
MetadataUpdater metadataUpdater,
4450
LookupClient lookupClient,
4551
@Nullable List<String> lookupColumnNames) {
4652
this.tableInfo = tableInfo;
53+
this.schemaGetter = schemaGetter;
4754
this.metadataUpdater = metadataUpdater;
4855
this.lookupClient = lookupClient;
4956
this.lookupColumnNames = lookupColumnNames;
5057
}
5158

5259
@Override
5360
public Lookup lookupBy(List<String> lookupColumnNames) {
54-
return new TableLookup(tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
61+
return new TableLookup(
62+
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
5563
}
5664

5765
@Override
5866
public Lookuper createLookuper() {
5967
if (lookupColumnNames == null) {
60-
return new PrimaryKeyLookuper(tableInfo, metadataUpdater, lookupClient);
68+
return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient);
6169
} else {
6270
return new PrefixKeyLookuper(
63-
tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
71+
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
6472
}
6573
}
6674
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.admin.Admin;
22+
import org.apache.fluss.metadata.Schema;
23+
import org.apache.fluss.metadata.SchemaGetter;
24+
import org.apache.fluss.metadata.SchemaInfo;
25+
import org.apache.fluss.metadata.TablePath;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
import java.util.concurrent.TimeUnit;
33+
34+
/** Schema getter for client. */
35+
@Internal
36+
public class ClientSchemaGetter implements SchemaGetter {
37+
private static final Logger LOG = LoggerFactory.getLogger(ClientSchemaGetter.class);
38+
39+
private final TablePath tablePath;
40+
// todo: 改为cache.
41+
private final Map<Integer, Schema> schemasById;
42+
private final Admin admin;
43+
private SchemaInfo latestSchemaInfo;
44+
45+
public ClientSchemaGetter(TablePath tablePath, SchemaInfo latestSchemaInfo, Admin admin) {
46+
this.tablePath = tablePath;
47+
this.latestSchemaInfo = latestSchemaInfo;
48+
this.admin = admin;
49+
this.schemasById = new HashMap<>();
50+
schemasById.put(latestSchemaInfo.getSchemaId(), latestSchemaInfo.getSchema());
51+
}
52+
53+
@Override
54+
public Schema getSchema(int schemaId) {
55+
return schemasById.computeIfAbsent(
56+
schemaId,
57+
(id) -> {
58+
try {
59+
// todo: 测试这一步总是会一定概率卡住(有时候卡几分钟恢复),这个可以排查一下。
60+
SchemaInfo schemaInfo =
61+
admin.getTableSchema(tablePath, schemaId).get(5, TimeUnit.MINUTES);
62+
if (id > latestSchemaInfo.getSchemaId()) {
63+
latestSchemaInfo = schemaInfo;
64+
}
65+
return schemaInfo.getSchema();
66+
67+
} catch (Exception e) {
68+
LOG.warn("Failed to get schema for table: " + tablePath);
69+
throw new RuntimeException(e);
70+
}
71+
});
72+
}
73+
74+
@Override
75+
public SchemaInfo getLatestSchemaInfo() {
76+
return latestSchemaInfo;
77+
}
78+
79+
@Override
80+
public void release() {
81+
try {
82+
admin.close();
83+
} catch (Exception e) {
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)