Skip to content

Commit 9742f89

Browse files
swuferhongloserwang1024
authored andcommitted
remove Connection.getTable with Schema Info.
1 parent a475aee commit 9742f89

File tree

10 files changed

+55
-160
lines changed

10 files changed

+55
-160
lines changed

fluss-client/src/main/java/org/apache/fluss/client/Connection.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.fluss.client.admin.Admin;
2222
import org.apache.fluss.client.table.Table;
2323
import org.apache.fluss.config.Configuration;
24-
import org.apache.fluss.metadata.SchemaInfo;
2524
import org.apache.fluss.metadata.TablePath;
2625

2726
import javax.annotation.concurrent.ThreadSafe;
@@ -57,16 +56,6 @@ public interface Connection extends AutoCloseable {
5756
/** Retrieve a new Table client to operate data in table. */
5857
Table getTable(TablePath tablePath);
5958

60-
/**
61-
* Retrieve a new Table client to operate data in table with specific schema. When performing
62-
* read, write and loop operations, this schema will be used.
63-
*
64-
* @param tablePath the path of the table to operate on
65-
* @param schemaInfo the schema information to be used for the table operations
66-
* @return a new Table client instance for the specified table and schema
67-
*/
68-
Table getTable(TablePath tablePath, SchemaInfo schemaInfo);
69-
7059
/** Close the connection and release all resources. */
7160
@Override
7261
void close() throws Exception;

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.apache.fluss.config.Configuration;
3434
import org.apache.fluss.exception.FlussRuntimeException;
3535
import org.apache.fluss.fs.FileSystem;
36-
import org.apache.fluss.metadata.SchemaInfo;
37-
import org.apache.fluss.metadata.TableInfo;
3836
import org.apache.fluss.metadata.TablePath;
3937
import org.apache.fluss.metrics.registry.MetricRegistry;
4038
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -106,15 +104,6 @@ public Table getTable(TablePath tablePath) {
106104
return new FlussTable(this, tablePath, admin.getTableInfo(tablePath).join());
107105
}
108106

109-
@Override
110-
public Table getTable(TablePath tablePath, SchemaInfo schemaInfo) {
111-
// force to update the table info from server to avoid stale data in cache.
112-
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
113-
Admin admin = getOrCreateAdmin();
114-
TableInfo tableInfo = admin.getTableInfo(tablePath).join();
115-
return new FlussTable(this, tablePath, tableInfo.withNewSchema(schemaInfo));
116-
}
117-
118107
public MetadataUpdater getMetadataUpdater() {
119108
return metadataUpdater;
120109
}

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.fluss.metadata.LogFormat;
4040
import org.apache.fluss.metadata.MergeEngineType;
4141
import org.apache.fluss.metadata.Schema;
42-
import org.apache.fluss.metadata.SchemaInfo;
4342
import org.apache.fluss.metadata.TableBucket;
4443
import org.apache.fluss.metadata.TableChange;
4544
import org.apache.fluss.metadata.TableDescriptor;
@@ -79,7 +78,6 @@
7978
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
8079
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
8180
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
82-
import static org.apache.fluss.record.TestData.DATA2_SCHEMA;
8381
import static org.apache.fluss.record.TestData.DATA3_SCHEMA_PK;
8482
import static org.apache.fluss.testutils.DataTestUtils.assertRowValueEquals;
8583
import static org.apache.fluss.testutils.DataTestUtils.compactedRow;
@@ -278,8 +276,7 @@ void testPutAndLookup() throws Exception {
278276
false)
279277
.get();
280278
waitAllSchemaSync(tablePath, 2);
281-
Table newSchemaTable =
282-
conn.getTable(tableInfo.getTablePath(), new SchemaInfo(DATA2_SCHEMA, 2));
279+
Table newSchemaTable = conn.getTable(tableInfo.getTablePath());
283280
// schema change case1: read new data with new schema.
284281
verifyPutAndLookup(newSchemaTable, new Object[] {2, "b", "bb"});
285282
// schema change case2: read new data with old schema.
@@ -368,9 +365,7 @@ void testPutAndPrefixLookup() throws Exception {
368365
.get();
369366
waitAllSchemaSync(tablePath, 2);
370367
try (Connection connection = ConnectionFactory.createConnection(clientConf);
371-
Table newSchemaTable =
372-
connection.getTable(
373-
tableInfo.getTablePath(), new SchemaInfo(newSchema, 2))) {
368+
Table newSchemaTable = connection.getTable(tableInfo.getTablePath())) {
374369
// schema change case1: read new data with new schema.
375370
verifyPutAndLookup(
376371
newSchemaTable, new Object[] {1, "a", 4L, "value4", "add_column_value"});

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScannerITCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ void testScanSnapshotDuringSchemaChange() throws Exception {
126126

127127
// put into values with old schema.
128128
Map<TableBucket, List<InternalRow>> oldSchemaRowByBuckets = putRows(tableId, tablePath, 10);
129+
waitUntilAllSnapshotFinished(oldSchemaRowByBuckets.keySet(), 0);
129130

130131
// add a new column and rename an existing column
131132
admin.alterTable(
@@ -175,7 +176,7 @@ void testScanSnapshotDuringSchemaChange() throws Exception {
175176
}
176177

177178
// wait snapshot finish
178-
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 0);
179+
waitUntilAllSnapshotFinished(expectedRowByBuckets.keySet(), 1);
179180

180181
// test read snapshot with new Schema
181182
testSnapshotRead(tablePath, expectedRowByBuckets);

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,12 @@ public Builder fromColumns(List<Column> inputColumns) {
261261

262262
if (allMatchColumnId) {
263263
columns.addAll(inputColumns);
264+
highestFieldId =
265+
new AtomicInteger(
266+
inputColumns.stream()
267+
.mapToInt(Column::getColumnId)
268+
.max()
269+
.orElse(-1));
264270
} else {
265271
// if all columnId is not set, this maybe from old version schema. Just use its
266272
// position as columnId.

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
package org.apache.fluss.flink.source;
1919

20-
import org.apache.fluss.client.Connection;
21-
import org.apache.fluss.client.ConnectionFactory;
22-
import org.apache.fluss.client.table.Table;
2320
import org.apache.fluss.config.Configuration;
2421
import org.apache.fluss.flink.source.deserializer.DeserializerInitContextImpl;
2522
import org.apache.fluss.flink.source.deserializer.FlussDeserializationSchema;
@@ -35,9 +32,6 @@
3532
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
3633
import org.apache.fluss.lake.source.LakeSource;
3734
import org.apache.fluss.lake.source.LakeSplit;
38-
import org.apache.fluss.metadata.Schema;
39-
import org.apache.fluss.metadata.SchemaInfo;
40-
import org.apache.fluss.metadata.TableInfo;
4135
import org.apache.fluss.metadata.TablePath;
4236
import org.apache.fluss.predicate.Predicate;
4337
import org.apache.fluss.types.RowType;
@@ -56,8 +50,6 @@
5650

5751
import javax.annotation.Nullable;
5852

59-
import java.util.List;
60-
6153
/** Flink source for Fluss. */
6254
public class FlinkSource<OUT>
6355
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
@@ -184,14 +176,6 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
184176
FlinkSourceReaderMetrics flinkSourceReaderMetrics =
185177
new FlinkSourceReaderMetrics(context.metricGroup());
186178

187-
TableInfo tableInfo;
188-
try (Connection connection = ConnectionFactory.createConnection(flussConf);
189-
Table table = connection.getTable(tablePath)) {
190-
tableInfo = table.getTableInfo();
191-
}
192-
193-
Schema schema = tableInfo.getSchema();
194-
195179
deserializationSchema.open(
196180
new DeserializerInitContextImpl(
197181
context.metricGroup().addGroup("deserializer"),
@@ -200,16 +184,12 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
200184
FlinkRecordEmitter<OUT> recordEmitter = new FlinkRecordEmitter<>(deserializationSchema);
201185
// recall to projectedFields
202186

203-
int[] projectedFields = reCalculateProjectedFields(sourceOutputType, schema.getRowType());
204-
205187
return new FlinkSourceReader<>(
206188
elementsQueue,
207189
flussConf,
208190
tablePath,
209191
sourceOutputType,
210-
new SchemaInfo(schema, tableInfo.getSchemaId()),
211192
context,
212-
projectedFields,
213193
flinkSourceReaderMetrics,
214194
recordEmitter,
215195
lakeSource);
@@ -219,31 +199,4 @@ public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext conte
219199
public TypeInformation<OUT> getProducedType() {
220200
return deserializationSchema.getProducedType(sourceOutputType);
221201
}
222-
223-
/**
224-
* The projected fields for the fluss table from the source output types. Mapping based on
225-
* column name rather thn column id.
226-
*
227-
* @return
228-
*/
229-
private static int[] reCalculateProjectedFields(
230-
RowType sourceOutputType, RowType flussRowType) {
231-
if (sourceOutputType.copy(false).equals(flussRowType.copy(false))) {
232-
return null;
233-
}
234-
235-
List<String> fieldNames = sourceOutputType.getFieldNames();
236-
int[] projectedFlussFields = new int[fieldNames.size()];
237-
for (int i = 0; i < fieldNames.size(); i++) {
238-
int fieldIndex = flussRowType.getFieldIndex(fieldNames.get(i));
239-
if (fieldIndex == -1) {
240-
throw new IllegalArgumentException(
241-
String.format(
242-
"The field %s is not found in the fluss table.",
243-
fieldNames.get(i)));
244-
}
245-
projectedFlussFields[i] = fieldIndex;
246-
}
247-
return projectedFlussFields;
248-
}
249202
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceReader.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.fluss.flink.source.split.SourceSplitState;
3232
import org.apache.fluss.lake.source.LakeSource;
3333
import org.apache.fluss.lake.source.LakeSplit;
34-
import org.apache.fluss.metadata.SchemaInfo;
3534
import org.apache.fluss.metadata.TableBucket;
3635
import org.apache.fluss.metadata.TablePath;
3736
import org.apache.fluss.types.RowType;
@@ -41,8 +40,6 @@
4140
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
4241
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
4342

44-
import javax.annotation.Nullable;
45-
4643
import java.util.Map;
4744
import java.util.Set;
4845
import java.util.function.Consumer;
@@ -57,9 +54,7 @@ public FlinkSourceReader(
5754
Configuration flussConfig,
5855
TablePath tablePath,
5956
RowType sourceOutputType,
60-
SchemaInfo schemaInfo,
6157
SourceReaderContext context,
62-
@Nullable int[] projectedFields,
6358
FlinkSourceReaderMetrics flinkSourceReaderMetrics,
6459
FlinkRecordEmitter<OUT> recordEmitter,
6560
LakeSource<LakeSplit> lakeSource) {
@@ -72,8 +67,6 @@ public FlinkSourceReader(
7267
flussConfig,
7368
tablePath,
7469
sourceOutputType,
75-
schemaInfo,
76-
projectedFields,
7770
flinkSourceReaderMetrics,
7871
lakeSource),
7972
(ignore) -> {}),

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java

Lines changed: 30 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.flink.source.reader;
1919

20+
import org.apache.fluss.annotation.VisibleForTesting;
2021
import org.apache.fluss.client.Connection;
2122
import org.apache.fluss.client.ConnectionFactory;
2223
import org.apache.fluss.client.table.Table;
@@ -36,7 +37,6 @@
3637
import org.apache.fluss.flink.source.split.SourceSplitBase;
3738
import org.apache.fluss.lake.source.LakeSource;
3839
import org.apache.fluss.lake.source.LakeSplit;
39-
import org.apache.fluss.metadata.SchemaInfo;
4040
import org.apache.fluss.metadata.TableBucket;
4141
import org.apache.fluss.metadata.TablePath;
4242
import org.apache.fluss.types.RowType;
@@ -58,7 +58,6 @@
5858
import java.time.Duration;
5959
import java.util.ArrayDeque;
6060
import java.util.ArrayList;
61-
import java.util.Arrays;
6261
import java.util.Collections;
6362
import java.util.HashMap;
6463
import java.util.HashSet;
@@ -120,21 +119,18 @@ public FlinkSourceSplitReader(
120119
Configuration flussConf,
121120
TablePath tablePath,
122121
RowType sourceOutputType,
123-
SchemaInfo schemaInfo,
124-
@Nullable int[] projectedFields,
125122
FlinkSourceReaderMetrics flinkSourceReaderMetrics,
126123
@Nullable LakeSource<LakeSplit> lakeSource) {
127124
this.flinkMetricRegistry =
128125
new FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
129126
this.connection = ConnectionFactory.createConnection(flussConf, flinkMetricRegistry);
130-
this.table = connection.getTable(tablePath, schemaInfo);
127+
this.table = connection.getTable(tablePath);
131128
this.sourceOutputType = sourceOutputType;
132129
this.boundedSplits = new ArrayDeque<>();
133130
this.subscribedBuckets = new HashMap<>();
134131
this.flinkSourceReaderMetrics = flinkSourceReaderMetrics;
135-
136-
sanityCheck(table.getTableInfo().getRowType(), projectedFields);
137-
this.projectedFields = projectedFields;
132+
this.projectedFields =
133+
reCalculateProjectedFields(sourceOutputType, table.getTableInfo().getRowType());
138134
this.logScanner = table.newScan().project(projectedFields).createLogScanner();
139135
this.stoppingOffsets = new HashMap<>();
140136
this.emptyLogSplits = new HashSet<>();
@@ -568,35 +564,33 @@ public void close() throws Exception {
568564
flinkMetricRegistry.close();
569565
}
570566

571-
private void sanityCheck(RowType flussTableRowType, @Nullable int[] projectedFields) {
572-
RowType tableRowType =
573-
projectedFields != null
574-
? flussTableRowType.project(projectedFields)
575-
: flussTableRowType;
576-
if (!sourceOutputType.copy(false).equals(tableRowType.copy(false))) {
577-
// The default nullability of Flink row type and Fluss row type might be not the same,
578-
// thus we need to compare the row type without nullability here.
579-
580-
final String flussSchemaMsg;
581-
if (projectedFields == null) {
582-
flussSchemaMsg = "\nFluss table schema: " + tableRowType;
583-
} else {
584-
flussSchemaMsg =
585-
"\nFluss table schema: "
586-
+ tableRowType
587-
+ " (projection "
588-
+ Arrays.toString(projectedFields)
589-
+ ")";
567+
/**
568+
* The projected fields for the fluss table from the source output types. Mapping based on
569+
* column name rather thn column id.
570+
*/
571+
private static int[] reCalculateProjectedFields(
572+
RowType sourceOutputType, RowType flussRowType) {
573+
if (sourceOutputType.copy(false).equals(flussRowType.copy(false))) {
574+
return null;
575+
}
576+
577+
List<String> fieldNames = sourceOutputType.getFieldNames();
578+
int[] projectedFlussFields = new int[fieldNames.size()];
579+
for (int i = 0; i < fieldNames.size(); i++) {
580+
int fieldIndex = flussRowType.getFieldIndex(fieldNames.get(i));
581+
if (fieldIndex == -1) {
582+
throw new ValidationException(
583+
String.format(
584+
"The field %s is not found in the fluss table.",
585+
fieldNames.get(i)));
590586
}
591-
// Throw exception if the schema is the not same, this should rarely happen because we
592-
// only allow fluss tables derived from fluss catalog. But this can happen if an ALTER
593-
// TABLE command executed on the fluss table, after the job is submitted but before the
594-
// SinkFunction is opened.
595-
throw new ValidationException(
596-
"The Flink query schema is not matched to Fluss table schema. "
597-
+ "\nFlink query schema: "
598-
+ sourceOutputType
599-
+ flussSchemaMsg);
587+
projectedFlussFields[i] = fieldIndex;
600588
}
589+
return projectedFlussFields;
590+
}
591+
592+
@VisibleForTesting
593+
public int[] getProjectedFields() {
594+
return projectedFields;
601595
}
602596
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/FlinkSourceReaderTest.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
2727
import org.apache.fluss.flink.source.split.LogSplit;
2828
import org.apache.fluss.flink.utils.FlinkTestBase;
29-
import org.apache.fluss.metadata.Schema;
30-
import org.apache.fluss.metadata.SchemaInfo;
3129
import org.apache.fluss.metadata.TableBucket;
3230
import org.apache.fluss.metadata.TableDescriptor;
3331
import org.apache.fluss.metadata.TablePath;
@@ -179,9 +177,7 @@ private FlinkSourceReader createReader(
179177
flussConf,
180178
tablePath,
181179
sourceOutputType,
182-
new SchemaInfo(Schema.newBuilder().fromRowType(sourceOutputType).build(), 1),
183180
context,
184-
null,
185181
new FlinkSourceReaderMetrics(context.metricGroup()),
186182
recordEmitter,
187183
null);

0 commit comments

Comments
 (0)