Skip to content

Commit 24bec29

Browse files
committed
1. modified based on CR
2. flink write with smaller row
1 parent 08d295e commit 24bec29

File tree

48 files changed

+394
-386
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+394
-386
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public Table getTable(TablePath tablePath, SchemaInfo schemaInfo) {
112112
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
113113
Admin admin = getOrCreateAdmin();
114114
TableInfo tableInfo = admin.getTableInfo(tablePath).join();
115-
return new FlussTable(this, tablePath, TableInfo.of(tableInfo, schemaInfo));
115+
return new FlussTable(this, tablePath, tableInfo.withNewSchema(schemaInfo));
116116
}
117117

118118
public MetadataUpdater getMetadataUpdater() {

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.fluss.metadata.TableBucket;
2828
import org.apache.fluss.metadata.TableInfo;
2929
import org.apache.fluss.row.InternalRow;
30-
import org.apache.fluss.row.PruneRow;
30+
import org.apache.fluss.row.ProjectedRow;
3131
import org.apache.fluss.row.encode.KeyEncoder;
3232
import org.apache.fluss.row.encode.ValueDecoder;
3333
import org.apache.fluss.types.RowType;
@@ -185,7 +185,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
185185
} else {
186186
Schema schema = schemaGetter.getSchema(value.schemaId);
187187
row =
188-
PruneRow.from(schema, tableInfo.getSchema())
188+
ProjectedRow.from(schema, tableInfo.getSchema())
189189
.replaceRow(value.row);
190190
}
191191
rowList.add(row);

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.apache.fluss.metadata.TableBucket;
2828
import org.apache.fluss.metadata.TableInfo;
2929
import org.apache.fluss.row.InternalRow;
30-
import org.apache.fluss.row.PruneRow;
30+
import org.apache.fluss.row.ProjectedRow;
3131
import org.apache.fluss.row.encode.KeyEncoder;
3232
import org.apache.fluss.row.encode.ValueDecoder;
3333
import org.apache.fluss.types.RowType;
@@ -132,7 +132,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
132132

133133
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
134134
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
135-
return lookupClient
135+
lookupClient
136136
.lookup(tableBucket, pkBytes)
137137
.thenApply(
138138
valueBytes -> {
@@ -145,7 +145,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
145145
Schema schema = schemaGetter.getSchema(value.schemaId);
146146
checkNotNull(schema, "schema is null");
147147
row =
148-
PruneRow.from(schema, tableInfo.getSchema())
148+
ProjectedRow.from(schema, tableInfo.getSchema())
149149
.replaceRow(value.row);
150150
}
151151
}

fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,17 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30-
import java.util.HashMap;
3130
import java.util.Map;
3231
import java.util.concurrent.TimeUnit;
3332

33+
import static org.apache.fluss.utils.MapUtils.newConcurrentHashMap;
34+
3435
/** Schema getter for client. */
3536
@Internal
3637
public class ClientSchemaGetter implements SchemaGetter {
3738
private static final Logger LOG = LoggerFactory.getLogger(ClientSchemaGetter.class);
3839

3940
private final TablePath tablePath;
40-
// todo: 改为cache.
4141
private final Map<Integer, Schema> schemasById;
4242
private final Admin admin;
4343
private SchemaInfo latestSchemaInfo;
@@ -46,7 +46,7 @@ public ClientSchemaGetter(TablePath tablePath, SchemaInfo latestSchemaInfo, Admi
4646
this.tablePath = tablePath;
4747
this.latestSchemaInfo = latestSchemaInfo;
4848
this.admin = admin;
49-
this.schemasById = new HashMap<>();
49+
this.schemasById = newConcurrentHashMap();
5050
schemasById.put(latestSchemaInfo.getSchemaId(), latestSchemaInfo.getSchema());
5151
}
5252

@@ -56,9 +56,8 @@ public Schema getSchema(int schemaId) {
5656
schemaId,
5757
(id) -> {
5858
try {
59-
// todo: 测试这一步总是会一定概率卡住(有时候卡几分钟恢复),这个可以排查一下。
6059
SchemaInfo schemaInfo =
61-
admin.getTableSchema(tablePath, schemaId).get(5, TimeUnit.MINUTES);
60+
admin.getTableSchema(tablePath, schemaId).get(1, TimeUnit.MINUTES);
6261
if (id > latestSchemaInfo.getSchemaId()) {
6362
latestSchemaInfo = schemaInfo;
6463
}
@@ -77,11 +76,5 @@ public SchemaInfo getLatestSchemaInfo() {
7776
}
7877

7978
@Override
80-
public void release() {
81-
try {
82-
admin.close();
83-
} catch (Exception e) {
84-
throw new RuntimeException(e);
85-
}
86-
}
79+
public void release() {}
8780
}

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/KvSnapshotBatchScanner.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,16 @@ public KvSnapshotBatchScanner(
121121
initReaderAsynchronously();
122122
}
123123

124+
/**
125+
* Fetch data from snapshot.
126+
*
127+
* <p>If the snapshot file reader is not ready in given maximum block time, return an empty
128+
* iterator. If the reader is ready, always return the reader if there remains any record in the
129+
* reader, otherwise, return null.
130+
*
131+
* @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE}
132+
* milliseconds)
133+
*/
124134
@Nullable
125135
@Override
126136
public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws IOException {

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.fluss.row.GenericRow;
3535
import org.apache.fluss.row.InternalRow;
3636
import org.apache.fluss.row.ProjectedRow;
37-
import org.apache.fluss.row.PruneRow;
3837
import org.apache.fluss.rpc.gateway.TabletServerGateway;
3938
import org.apache.fluss.rpc.messages.LimitScanRequest;
4039
import org.apache.fluss.rpc.messages.LimitScanResponse;
@@ -144,7 +143,7 @@ private List<InternalRow> parseLimitScanResponse(LimitScanResponse limitScanResp
144143
InternalRow row = record.getRow();
145144
if (targetSchemaId != record.schemaId()) {
146145
row =
147-
PruneRow.from(
146+
ProjectedRow.from(
148147
schemaGetter.getSchema(record.schemaId()),
149148
schemaGetter.getSchema(targetSchemaId))
150149
.replaceRow(row);

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/SnapshotFilesReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.rocksdb.RocksIteratorWrapper;
2727
import org.apache.fluss.row.InternalRow;
2828
import org.apache.fluss.row.ProjectedRow;
29-
import org.apache.fluss.row.PruneRow;
3029
import org.apache.fluss.row.encode.ValueDecoder;
3130
import org.apache.fluss.utils.CloseableIterator;
3231
import org.apache.fluss.utils.CloseableRegistry;
@@ -160,7 +159,7 @@ public InternalRow next() {
160159
InternalRow originRow = originValue.row;
161160
if (targetSchemaId != originValue.schemaId) {
162161
originRow =
163-
PruneRow.from(schemaGetter.getSchema(originValue.schemaId), targetSchema)
162+
ProjectedRow.from(schemaGetter.getSchema(originValue.schemaId), targetSchema)
164163
.replaceRow(originRow);
165164
}
166165

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -921,6 +921,8 @@ public void close() {
921921

922922
writerBufferPool.close();
923923
arrowWriterPool.close();
924+
// Release all the memory segments.
925+
bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory());
924926
bufferAllocator.close();
925927
}
926928

fluss-client/src/main/java/org/apache/fluss/client/write/WriteRecord.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,6 @@ public int getEstimatedSizeInBytes() {
206206
return estimatedSizeInBytes;
207207
}
208208

209-
public TableInfo getTableInfo() {
210-
return tableInfo;
211-
}
212-
213209
public int getSchemaId() {
214210
return tableInfo.getSchemaId();
215211
}

fluss-client/src/main/java/org/apache/fluss/client/write/WriterClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ public WriterClient(
120120
conf.get(ConfigOptions.CLIENT_WRITER_DYNAMIC_CREATE_PARTITION_ENABLED),
121121
this::maybeAbortBatches);
122122
} catch (Throwable t) {
123+
LOG.error("Failed to construct writer.", t);
123124
close(Duration.ofMillis(0));
124125
throw new FlussRuntimeException(
125126
String.format(

0 commit comments

Comments
 (0)