Skip to content

Commit a475aee

Browse files
committed
1. reduce the re-calculate of file projection.
2. fix test
1 parent afb3948 commit a475aee

File tree

10 files changed

+136
-47
lines changed

10 files changed

+136
-47
lines changed

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,6 @@ void testUpsertWithSmallBuffer() throws Exception {
237237
}
238238
}
239239

240-
// todo: 测试写入一个过于提前或者为0的schema后会如何
241240
@Test
242241
void testPutAndLookup() throws Exception {
243242
TablePath tablePath = TablePath.of("test_db_1", "test_put_and_lookup_table");

fluss-client/src/test/java/org/apache/fluss/client/table/scanner/log/DefaultCompletedFetchTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,6 @@ private MemoryLogRecords genRecordsWithProjection(
266266
fileLogRecords.flush();
267267

268268
FileLogProjection fileLogProjection = new FileLogProjection();
269-
// todo: 调整测试
270269
fileLogProjection.setCurrentProjection(
271270
DATA2_TABLE_ID,
272271
testingSchemaGetter,

fluss-common/src/main/java/org/apache/fluss/metadata/Schema.java

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
import java.util.Collections;
3636
import java.util.HashSet;
3737
import java.util.List;
38-
import java.util.Map;
3938
import java.util.Objects;
4039
import java.util.Optional;
4140
import java.util.Set;
41+
import java.util.concurrent.atomic.AtomicInteger;
4242
import java.util.stream.Collectors;
4343
import java.util.stream.IntStream;
4444

@@ -56,11 +56,9 @@
5656
public final class Schema implements Serializable {
5757

5858
private static final long serialVersionUID = 1L;
59-
6059
private static final Schema EMPTY = Schema.newBuilder().build();
6160

6261
private final List<Column> columns;
63-
private final Map<Integer, Column> columnById;
6462
private final @Nullable PrimaryKey primaryKey;
6563
private final RowType rowType;
6664

@@ -82,8 +80,6 @@ private Schema(List<Column> columns, @Nullable PrimaryKey primaryKey, int highes
8280
new DataField(
8381
column.getName(), column.getDataType()))
8482
.collect(Collectors.toList()));
85-
columnById =
86-
columns.stream().collect(Collectors.toMap(Column::getColumnId, column -> column));
8783
this.highestFieldId = highestFieldId;
8884
}
8985

@@ -207,10 +203,11 @@ public static Schema.Builder newBuilder() {
207203
public static final class Builder {
208204
private final List<Column> columns;
209205
private @Nullable PrimaryKey primaryKey;
210-
private @Nullable Integer highestFieldId;
206+
private AtomicInteger highestFieldId;
211207

212208
private Builder() {
213209
columns = new ArrayList<>();
210+
highestFieldId = new AtomicInteger(-1);
214211
}
215212

216213
/** Adopts all members from the given schema. */
@@ -219,11 +216,12 @@ public Builder fromSchema(Schema schema) {
219216
if (schema.primaryKey != null) {
220217
primaryKeyNamed(schema.primaryKey.constraintName, schema.primaryKey.columnNames);
221218
}
219+
this.highestFieldId = new AtomicInteger(schema.highestFieldId);
222220
return this;
223221
}
224222

225223
public Builder highestFieldId(int highestFieldId) {
226-
this.highestFieldId = highestFieldId;
224+
this.highestFieldId = new AtomicInteger(highestFieldId);
227225
return this;
228226
}
229227

@@ -287,8 +285,7 @@ public Builder fromColumns(List<Column> inputColumns) {
287285
public Builder column(String columnName, DataType dataType) {
288286
checkNotNull(columnName, "Column name must not be null.");
289287
checkNotNull(dataType, "Data type must not be null.");
290-
short defaultColumnId = (short) columns.size();
291-
columns.add(new Column(columnName, dataType, null, defaultColumnId));
288+
columns.add(new Column(columnName, dataType, null, highestFieldId.incrementAndGet()));
292289
return this;
293290
}
294291

@@ -362,18 +359,15 @@ public Builder primaryKeyNamed(String constraintName, List<String> columnNames)
362359
public Schema build() {
363360
Integer maximumColumnId =
364361
columns.stream().map(Column::getColumnId).max(Integer::compareTo).orElse(0);
365-
if (highestFieldId == null) {
366-
highestFieldId = maximumColumnId;
367-
} else {
368-
checkState(
369-
highestFieldId >= maximumColumnId,
370-
"Highest field id must be greater than or equal to the maximum column id.");
371-
}
362+
363+
checkState(
364+
columns.isEmpty() || highestFieldId.get() >= maximumColumnId,
365+
"Highest field id must be greater than or equal to the maximum column id.");
372366

373367
checkState(
374368
columns.stream().map(Column::getColumnId).distinct().count() == columns.size(),
375369
"Column ids must be unique.");
376-
return new Schema(columns, primaryKey, highestFieldId);
370+
return new Schema(columns, primaryKey, highestFieldId.get());
377371
}
378372
}
379373

fluss-common/src/main/java/org/apache/fluss/record/FileLogProjection.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public class FileLogProjection {
103103
private SchemaGetter schemaGetter;
104104
private long tableId;
105105
private ArrowCompressionInfo compressionInfo;
106-
private int[] selectedFieldIds;
106+
private int[] selectedFieldPositions;
107107

108108
public FileLogProjection() {
109109
this.outputStream = new ByteArrayOutputStream();
@@ -118,11 +118,18 @@ public void setCurrentProjection(
118118
long tableId,
119119
SchemaGetter schemaGetter,
120120
ArrowCompressionInfo compressionInfo,
121-
int[] selectedFields) {
121+
int[] selectedFieldIds) {
122122
this.tableId = tableId;
123123
this.schemaGetter = schemaGetter;
124124
this.compressionInfo = compressionInfo;
125-
this.selectedFieldIds = selectedFields;
125+
126+
// Currently, only add last column is supported.Thus selectedFieldPositions is always same
127+
// from same selectedFieldIds.
128+
// TODO: if support drop column or add column in middle, this selectedFieldPositions should
129+
// be re-calculated for each schema.
130+
this.selectedFieldPositions =
131+
selectedFieldPositions(
132+
schemaGetter.getLatestSchemaInfo().getSchema(), selectedFieldIds);
126133
}
127134

128135
/**
@@ -405,16 +412,14 @@ private void setCurrentSchema(short schemaId) {
405412
if (projectionsCache.containsKey(projectionKey)) {
406413
// the schema and projection should identical for the same table id.
407414
currentProjection = projectionsCache.get(projectionKey);
408-
if (!Arrays.equals(currentProjection.selectedFieldIds, selectedFieldIds)
415+
if (!Arrays.equals(currentProjection.selectedFieldPositions, selectedFieldPositions)
409416
|| !currentProjection.schema.equals(rowType)) {
410417
throw new InvalidColumnProjectionException(
411418
"The schema and projection should be identical for the same table id.");
412419
}
413420
return;
414421
}
415422

416-
int[] selectedFieldPositions = checkProjection(schema, selectedFieldIds);
417-
418423
// initialize the projection util information
419424
Schema arrowSchema = ArrowUtils.toArrowSchema(rowType);
420425
BitSet selection = toBitSet(arrowSchema.getFields().size(), selectedFieldPositions);
@@ -454,12 +459,11 @@ private void setCurrentSchema(short schemaId) {
454459
rowType,
455460
metadataLength,
456461
bodyCompression,
457-
selectedFieldPositions,
458-
selectedFieldIds);
462+
selectedFieldPositions);
459463
projectionsCache.put(projectionKey, currentProjection);
460464
}
461465

462-
int[] checkProjection(org.apache.fluss.metadata.Schema schema, int[] projectedFields) {
466+
int[] selectedFieldPositions(org.apache.fluss.metadata.Schema schema, int[] projectedFields) {
463467
Map<Integer, Integer> columnIdPositions = new HashMap<>();
464468
List<Integer> columnIds = schema.getColumnIds();
465469
for (int i = 0; i < columnIds.size(); i++) {
@@ -521,7 +525,6 @@ static final class ProjectionInfo {
521525
final int arrowMetadataLength;
522526
final ArrowBodyCompression bodyCompression;
523527
final int[] selectedFieldPositions;
524-
final int[] selectedFieldIds;
525528

526529
private ProjectionInfo(
527530
BitSet nodesProjection,
@@ -530,16 +533,14 @@ private ProjectionInfo(
530533
RowType schema,
531534
int arrowMetadataLength,
532535
ArrowBodyCompression bodyCompression,
533-
int[] selectedFieldPositions,
534-
int[] selectedFieldIds) {
536+
int[] selectedFieldPositions) {
535537
this.nodesProjection = nodesProjection;
536538
this.buffersProjection = buffersProjection;
537539
this.bufferCount = bufferCount;
538540
this.schema = schema;
539541
this.arrowMetadataLength = arrowMetadataLength;
540542
this.bodyCompression = bodyCompression;
541543
this.selectedFieldPositions = selectedFieldPositions;
542-
this.selectedFieldIds = selectedFieldIds;
543544
}
544545
}
545546

fluss-common/src/main/java/org/apache/fluss/record/KvRecordReadContext.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ public class KvRecordReadContext implements KvRecordBatch.ReadContext {
3131

3232
private final KvFormat kvFormat;
3333
private final SchemaGetter schemaGetter;
34-
35-
// todo: Modify to use cache.
3634
private final Map<Integer, RowDecoder> rowDecoderCache;
3735

3836
private KvRecordReadContext(KvFormat kvFormat, SchemaGetter schemaGetter) {
@@ -48,8 +46,6 @@ public static KvRecordReadContext createReadContext(
4846

4947
@Override
5048
public RowDecoder getRowDecoder(int schemaId) {
51-
// assume schema id are always not changed for now.
52-
// todo: however, schema will change here.
5349
return rowDecoderCache.computeIfAbsent(
5450
schemaId,
5551
(id) -> {

fluss-common/src/main/java/org/apache/fluss/record/ValueRecordReadContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828

2929
/** A default implementation of {@link ValueRecordBatch.ReadContext} . */
3030
public class ValueRecordReadContext implements ValueRecordBatch.ReadContext {
31-
32-
// todo: use cache later.
3331
private final Map<Integer, RowDecoder> rowDecoderCache;
3432
private final SchemaGetter schemaGetter;
3533
private final KvFormat kvFormat;

fluss-common/src/main/java/org/apache/fluss/row/encode/ValueDecoder.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,24 +25,23 @@
2525
import org.apache.fluss.row.decode.RowDecoder;
2626
import org.apache.fluss.types.DataType;
2727

28-
import java.util.HashMap;
2928
import java.util.Map;
3029

3130
import static org.apache.fluss.row.encode.ValueEncoder.SCHEMA_ID_LENGTH;
31+
import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap;
3232

3333
/**
3434
* A decoder to decode a schema id and {@link BinaryRow} from a byte array value which is encoded by
3535
* {@link ValueEncoder#encodeValue(short, BinaryRow)}.
3636
*/
3737
public class ValueDecoder {
3838

39-
// todo: use cache.
4039
private final Map<Short, RowDecoder> rowDecoders;
4140
private final SchemaGetter schemaGetter;
4241
private final KvFormat kvFormat;
4342

4443
public ValueDecoder(SchemaGetter schemaGetter, KvFormat kvFormat) {
45-
this.rowDecoders = new HashMap<>();
44+
this.rowDecoders = newConcurrentHashMap();
4645
this.schemaGetter = schemaGetter;
4746
this.kvFormat = kvFormat;
4847
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/SchemaChangeEvent.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.fluss.metadata.SchemaInfo;
2121
import org.apache.fluss.metadata.TablePath;
2222

23+
import java.util.Objects;
24+
2325
/** An event for schema change. */
2426
public class SchemaChangeEvent implements CoordinatorEvent {
2527
private final TablePath tablePath;
@@ -37,4 +39,24 @@ public TablePath getTablePath() {
3739
public SchemaInfo getSchemaInfo() {
3840
return schemaInfo;
3941
}
42+
43+
@Override
44+
public String toString() {
45+
return "SchemaChangeEvent{" + "tablePath=" + tablePath + ", schemaInfo=" + schemaInfo + '}';
46+
}
47+
48+
@Override
49+
public boolean equals(Object o) {
50+
if (!(o instanceof SchemaChangeEvent)) {
51+
return false;
52+
}
53+
SchemaChangeEvent that = (SchemaChangeEvent) o;
54+
return Objects.equals(tablePath, that.tablePath)
55+
&& Objects.equals(schemaInfo, that.schemaInfo);
56+
}
57+
58+
@Override
59+
public int hashCode() {
60+
return Objects.hash(tablePath, schemaInfo);
61+
}
4062
}

0 commit comments

Comments
 (0)