Skip to content

Commit 2ade0ce

Browse files
committed
POC: Support Add Column.
1 parent 3fcde27 commit 2ade0ce

File tree

188 files changed

+5389
-858
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

188 files changed

+5389
-858
lines changed

fluss-client/src/main/java/org/apache/fluss/client/Connection.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.fluss.client.admin.Admin;
2222
import org.apache.fluss.client.table.Table;
2323
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.metadata.SchemaInfo;
2425
import org.apache.fluss.metadata.TablePath;
2526

2627
import javax.annotation.concurrent.ThreadSafe;
@@ -56,6 +57,16 @@ public interface Connection extends AutoCloseable {
5657
/** Retrieve a new Table client to operate data in table. */
5758
Table getTable(TablePath tablePath);
5859

60+
/**
61+
* Retrieve a new Table client to operate data in table with specific schema. When performing
62+
* read, write and loop operations, this schema will be used.
63+
*
64+
* @param tablePath the path of the table to operate on
65+
* @param schemaInfo the schema information to be used for the table operations
66+
* @return a new Table client instance for the specified table and schema
67+
*/
68+
Table getTable(TablePath tablePath, SchemaInfo schemaInfo);
69+
5970
/** Close the connection and release all resources. */
6071
@Override
6172
void close() throws Exception;

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.exception.FlussRuntimeException;
3535
import org.apache.fluss.fs.FileSystem;
36+
import org.apache.fluss.metadata.SchemaInfo;
3637
import org.apache.fluss.metadata.TableInfo;
3738
import org.apache.fluss.metadata.TablePath;
3839
import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -104,6 +105,14 @@ public Table getTable(TablePath tablePath) {
104105
return new FlussTable(this, tablePath, tableInfo);
105106
}
106107

108+
@Override
109+
public Table getTable(TablePath tablePath, SchemaInfo schemaInfo) {
110+
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
111+
TableInfo tableInfo = metadataUpdater.getTableInfoOrElseThrow(tablePath);
112+
tableInfo = TableInfo.of(tableInfo, schemaInfo);
113+
return new FlussTable(this, tablePath, tableInfo);
114+
}
115+
107116
public RpcClient getRpcClient() {
108117
return rpcClient;
109118
}

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import java.util.concurrent.CompletableFuture;
9191
import java.util.stream.Collectors;
9292

93+
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.addPbAlterSchemas;
9394
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
9495
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
9596
import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
@@ -249,12 +250,17 @@ public CompletableFuture<Void> alterTable(
249250
TablePath tablePath, List<TableChange> tableChanges, boolean ignoreIfNotExists) {
250251
tablePath.validate();
251252
AlterTableRequest request = new AlterTableRequest();
252-
253253
List<PbAlterConfig> pbFlussTableChanges =
254254
tableChanges.stream()
255+
.filter(tableChange -> !(tableChange instanceof TableChange.SchemaChange))
255256
.map(ClientRpcMessageUtils::toPbAlterConfigs)
256257
.collect(Collectors.toList());
257258

259+
List<TableChange> schemaChanges =
260+
tableChanges.stream()
261+
.filter(tableChange -> tableChange instanceof TableChange.SchemaChange)
262+
.collect(Collectors.toList());
263+
addPbAlterSchemas(request, schemaChanges);
258264
request.addAllConfigChanges(pbFlussTableChanges)
259265
.setIgnoreIfNotExists(ignoreIfNotExists)
260266
.setTablePath()

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25+
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.SchemaGetter;
2527
import org.apache.fluss.metadata.TableBucket;
2628
import org.apache.fluss.metadata.TableInfo;
2729
import org.apache.fluss.row.InternalRow;
28-
import org.apache.fluss.row.decode.RowDecoder;
30+
import org.apache.fluss.row.PruneRow;
2931
import org.apache.fluss.row.encode.KeyEncoder;
3032
import org.apache.fluss.row.encode.ValueDecoder;
31-
import org.apache.fluss.types.DataType;
3233
import org.apache.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -68,8 +69,11 @@ class PrefixKeyLookuper implements Lookuper {
6869
/** Decode the lookup bytes to result row. */
6970
private final ValueDecoder kvValueDecoder;
7071

72+
private final SchemaGetter schemaGetter;
73+
7174
public PrefixKeyLookuper(
7275
TableInfo tableInfo,
76+
SchemaGetter schemaGetter,
7377
MetadataUpdater metadataUpdater,
7478
LookupClient lookupClient,
7579
List<String> lookupColumnNames) {
@@ -91,10 +95,8 @@ public PrefixKeyLookuper(
9195
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
9296
: null;
9397
this.kvValueDecoder =
94-
new ValueDecoder(
95-
RowDecoder.create(
96-
tableInfo.getTableConfig().getKvFormat(),
97-
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
98+
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
99+
this.schemaGetter = schemaGetter;
98100
}
99101

100102
private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumns) {
@@ -174,7 +176,17 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
174176
if (valueBytes == null) {
175177
continue;
176178
}
177-
rowList.add(kvValueDecoder.decodeValue(valueBytes).row);
179+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
180+
InternalRow row;
181+
if (value.schemaId == tableInfo.getSchemaId()) {
182+
row = value.row;
183+
} else {
184+
Schema schema = schemaGetter.getSchema(value.schemaId);
185+
row =
186+
PruneRow.from(schema, tableInfo.getSchema())
187+
.replaceRow(value.row);
188+
}
189+
rowList.add(row);
178190
}
179191
return new LookupResult(rowList);
180192
});

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,14 @@
2222
import org.apache.fluss.client.table.getter.PartitionGetter;
2323
import org.apache.fluss.exception.PartitionNotExistException;
2424
import org.apache.fluss.metadata.DataLakeFormat;
25+
import org.apache.fluss.metadata.Schema;
26+
import org.apache.fluss.metadata.SchemaGetter;
2527
import org.apache.fluss.metadata.TableBucket;
2628
import org.apache.fluss.metadata.TableInfo;
2729
import org.apache.fluss.row.InternalRow;
28-
import org.apache.fluss.row.decode.RowDecoder;
30+
import org.apache.fluss.row.PruneRow;
2931
import org.apache.fluss.row.encode.KeyEncoder;
3032
import org.apache.fluss.row.encode.ValueDecoder;
31-
import org.apache.fluss.types.DataType;
3233
import org.apache.fluss.types.RowType;
3334

3435
import javax.annotation.Nullable;
@@ -38,6 +39,7 @@
3839

3940
import static org.apache.fluss.client.utils.ClientUtils.getPartitionId;
4041
import static org.apache.fluss.utils.Preconditions.checkArgument;
42+
import static org.apache.fluss.utils.Preconditions.checkNotNull;
4143

4244
/** An implementation of {@link Lookuper} that lookups by primary key. */
4345
class PrimaryKeyLookuper implements Lookuper {
@@ -65,8 +67,13 @@ class PrimaryKeyLookuper implements Lookuper {
6567
/** Decode the lookup bytes to result row. */
6668
private final ValueDecoder kvValueDecoder;
6769

70+
private final SchemaGetter schemaGetter;
71+
6872
public PrimaryKeyLookuper(
69-
TableInfo tableInfo, MetadataUpdater metadataUpdater, LookupClient lookupClient) {
73+
TableInfo tableInfo,
74+
SchemaGetter schemaGetter,
75+
MetadataUpdater metadataUpdater,
76+
LookupClient lookupClient) {
7077
checkArgument(
7178
tableInfo.hasPrimaryKey(),
7279
"Log table %s doesn't support lookup",
@@ -94,10 +101,8 @@ public PrimaryKeyLookuper(
94101
? new PartitionGetter(lookupRowType, tableInfo.getPartitionKeys())
95102
: null;
96103
this.kvValueDecoder =
97-
new ValueDecoder(
98-
RowDecoder.create(
99-
tableInfo.getTableConfig().getKvFormat(),
100-
tableInfo.getRowType().getChildren().toArray(new DataType[0])));
104+
new ValueDecoder(schemaGetter, tableInfo.getTableConfig().getKvFormat());
105+
this.schemaGetter = schemaGetter;
101106
}
102107

103108
@Override
@@ -129,10 +134,20 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
129134
.lookup(tableBucket, pkBytes)
130135
.thenApply(
131136
valueBytes -> {
132-
InternalRow row =
133-
valueBytes == null
134-
? null
135-
: kvValueDecoder.decodeValue(valueBytes).row;
137+
InternalRow row = null;
138+
if (valueBytes != null) {
139+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
140+
if (value.schemaId == tableInfo.getSchemaId()) {
141+
row = value.row;
142+
} else {
143+
Schema schema = schemaGetter.getSchema(value.schemaId);
144+
checkNotNull(schema, "schema is null");
145+
row =
146+
PruneRow.from(schema, tableInfo.getSchema())
147+
.replaceRow(value.row);
148+
}
149+
}
150+
136151
return new LookupResult(row);
137152
});
138153
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/TableLookup.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.metadata.SchemaGetter;
2122
import org.apache.fluss.metadata.TableInfo;
2223

2324
import javax.annotation.Nullable;
@@ -28,39 +29,46 @@
2829
public class TableLookup implements Lookup {
2930

3031
private final TableInfo tableInfo;
32+
private final SchemaGetter schemaGetter;
3133
private final MetadataUpdater metadataUpdater;
3234
private final LookupClient lookupClient;
3335

3436
@Nullable private final List<String> lookupColumnNames;
3537

3638
public TableLookup(
37-
TableInfo tableInfo, MetadataUpdater metadataUpdater, LookupClient lookupClient) {
38-
this(tableInfo, metadataUpdater, lookupClient, null);
39+
TableInfo tableInfo,
40+
SchemaGetter schemaGetter,
41+
MetadataUpdater metadataUpdater,
42+
LookupClient lookupClient) {
43+
this(tableInfo, schemaGetter, metadataUpdater, lookupClient, null);
3944
}
4045

4146
private TableLookup(
4247
TableInfo tableInfo,
48+
SchemaGetter schemaGetter,
4349
MetadataUpdater metadataUpdater,
4450
LookupClient lookupClient,
4551
@Nullable List<String> lookupColumnNames) {
4652
this.tableInfo = tableInfo;
53+
this.schemaGetter = schemaGetter;
4754
this.metadataUpdater = metadataUpdater;
4855
this.lookupClient = lookupClient;
4956
this.lookupColumnNames = lookupColumnNames;
5057
}
5158

5259
@Override
5360
public Lookup lookupBy(List<String> lookupColumnNames) {
54-
return new TableLookup(tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
61+
return new TableLookup(
62+
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
5563
}
5664

5765
@Override
5866
public Lookuper createLookuper() {
5967
if (lookupColumnNames == null) {
60-
return new PrimaryKeyLookuper(tableInfo, metadataUpdater, lookupClient);
68+
return new PrimaryKeyLookuper(tableInfo, schemaGetter, metadataUpdater, lookupClient);
6169
} else {
6270
return new PrefixKeyLookuper(
63-
tableInfo, metadataUpdater, lookupClient, lookupColumnNames);
71+
tableInfo, schemaGetter, metadataUpdater, lookupClient, lookupColumnNames);
6472
}
6573
}
6674
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.client.admin.Admin;
22+
import org.apache.fluss.metadata.Schema;
23+
import org.apache.fluss.metadata.SchemaGetter;
24+
import org.apache.fluss.metadata.SchemaInfo;
25+
import org.apache.fluss.metadata.TablePath;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
import java.util.concurrent.TimeUnit;
33+
34+
/** Schema getter for client. */
35+
@Internal
36+
public class ClientSchemaGetter implements SchemaGetter {
37+
private static final Logger LOG = LoggerFactory.getLogger(ClientSchemaGetter.class);
38+
39+
private final TablePath tablePath;
40+
// todo: 改为cache.
41+
private final Map<Integer, Schema> schemasById;
42+
private final Admin admin;
43+
private SchemaInfo latestSchemaInfo;
44+
45+
public ClientSchemaGetter(TablePath tablePath, SchemaInfo latestSchemaInfo, Admin admin) {
46+
this.tablePath = tablePath;
47+
this.latestSchemaInfo = latestSchemaInfo;
48+
this.admin = admin;
49+
this.schemasById = new HashMap<>();
50+
schemasById.put(latestSchemaInfo.getSchemaId(), latestSchemaInfo.getSchema());
51+
}
52+
53+
@Override
54+
public Schema getSchema(int schemaId) {
55+
return schemasById.computeIfAbsent(
56+
schemaId,
57+
(id) -> {
58+
try {
59+
// todo: 测试这一步总是会一定概率卡住(有时候卡几分钟恢复),这个可以排查一下。
60+
SchemaInfo schemaInfo =
61+
admin.getTableSchema(tablePath, schemaId).get(5, TimeUnit.MINUTES);
62+
if (id > latestSchemaInfo.getSchemaId()) {
63+
latestSchemaInfo = schemaInfo;
64+
}
65+
return schemaInfo.getSchema();
66+
67+
} catch (Exception e) {
68+
LOG.warn("Failed to get schema for table: " + tablePath);
69+
throw new RuntimeException(e);
70+
}
71+
});
72+
}
73+
74+
@Override
75+
public SchemaInfo getLatestSchemaInfo() {
76+
return latestSchemaInfo;
77+
}
78+
79+
@Override
80+
public void release() {
81+
try {
82+
admin.close();
83+
} catch (Exception e) {
84+
throw new RuntimeException(e);
85+
}
86+
}
87+
}

0 commit comments

Comments
 (0)