Skip to content

Commit d490cb1

Browse files
committed
Fix lookup and get schema deadlock.
1 parent 24bec29 commit d490cb1

File tree

5 files changed

+54
-50
lines changed

5 files changed

+54
-50
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -169,28 +169,35 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
169169
}
170170

171171
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
172-
return lookupClient
173-
.prefixLookup(tableBucket, bucketKeyBytes)
174-
.thenApply(
175-
result -> {
176-
List<InternalRow> rowList = new ArrayList<>(result.size());
177-
for (byte[] valueBytes : result) {
178-
if (valueBytes == null) {
179-
continue;
180-
}
181-
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
182-
InternalRow row;
183-
if (value.schemaId == tableInfo.getSchemaId()) {
184-
row = value.row;
185-
} else {
186-
Schema schema = schemaGetter.getSchema(value.schemaId);
187-
row =
188-
ProjectedRow.from(schema, tableInfo.getSchema())
189-
.replaceRow(value.row);
190-
}
191-
rowList.add(row);
172+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
173+
174+
CompletableFuture.runAsync(
175+
() -> {
176+
try {
177+
List<byte[]> result =
178+
lookupClient.prefixLookup(tableBucket, bucketKeyBytes).get();
179+
List<InternalRow> rowList = new ArrayList<>(result.size());
180+
for (byte[] valueBytes : result) {
181+
if (valueBytes == null) {
182+
continue;
192183
}
193-
return new LookupResult(rowList);
194-
});
184+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
185+
InternalRow row;
186+
if (value.schemaId == tableInfo.getSchemaId()) {
187+
row = value.row;
188+
} else {
189+
Schema schema = schemaGetter.getSchema(value.schemaId);
190+
row =
191+
ProjectedRow.from(schema, tableInfo.getSchema())
192+
.replaceRow(value.row);
193+
}
194+
rowList.add(row);
195+
}
196+
future.complete(new LookupResult(rowList));
197+
} catch (Exception e) {
198+
future.complete(new LookupResult(Collections.emptyList()));
199+
}
200+
});
201+
return future;
195202
}
196203
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,30 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
132132

133133
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
134134
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
135-
lookupClient
136-
.lookup(tableBucket, pkBytes)
137-
.thenApply(
138-
valueBytes -> {
139-
InternalRow row = null;
140-
if (valueBytes != null) {
141-
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
142-
if (value.schemaId == tableInfo.getSchemaId()) {
143-
row = value.row;
144-
} else {
145-
Schema schema = schemaGetter.getSchema(value.schemaId);
146-
checkNotNull(schema, "schema is null");
147-
row =
148-
ProjectedRow.from(schema, tableInfo.getSchema())
149-
.replaceRow(value.row);
150-
}
135+
CompletableFuture<LookupResult> future = new CompletableFuture<>();
136+
137+
CompletableFuture.runAsync(
138+
() -> {
139+
try {
140+
byte[] valueBytes = lookupClient.lookup(tableBucket, pkBytes).get();
141+
InternalRow row = null;
142+
if (valueBytes != null) {
143+
ValueDecoder.Value value = kvValueDecoder.decodeValue(valueBytes);
144+
if (value.schemaId == tableInfo.getSchemaId()) {
145+
row = value.row;
146+
} else {
147+
Schema schema = schemaGetter.getSchema(value.schemaId);
148+
checkNotNull(schema, "schema is null");
149+
row =
150+
ProjectedRow.from(schema, tableInfo.getSchema())
151+
.replaceRow(value.row);
151152
}
152-
153-
return new LookupResult(row);
154-
});
153+
}
154+
future.complete(new LookupResult(row));
155+
} catch (Exception e) {
156+
future.completeExceptionally(e);
157+
}
158+
});
159+
return future;
155160
}
156161
}

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.apache.fluss.types.StringType;
5757

5858
import org.apache.commons.lang3.StringUtils;
59-
import org.junit.jupiter.api.AfterEach;
6059
import org.junit.jupiter.api.Disabled;
6160
import org.junit.jupiter.api.Test;
6261
import org.junit.jupiter.params.ParameterizedTest;
@@ -93,12 +92,6 @@
9392
/** IT case for {@link FlussTable}. */
9493
class FlussTableITCase extends ClientToServerITCaseBase {
9594

96-
@AfterEach
97-
protected void teardown() throws Exception {
98-
admin.dropDatabase(DATA1_TABLE_PATH_PK.getDatabaseName(), true, true).get();
99-
super.teardown();
100-
}
101-
10295
@Test
10396
void testGetDescriptor() throws Exception {
10497
createTable(DATA1_TABLE_PATH_PK, DATA1_TABLE_DESCRIPTOR_PK, false);

fluss-common/src/test/java/org/apache/fluss/record/FileLogProjectionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ void beforeEach() {
6767
testingSchemaGetter.updateLatestSchemaInfo(new SchemaInfo(TestData.DATA2_SCHEMA, 2));
6868
}
6969

70-
// TODO: add tests for nested types
7170
@Test
7271
void testSetCurrentProjection() throws Exception {
7372
short schemaId = (short) 2;
@@ -283,6 +282,7 @@ void testIllegalByteOrder(byte recordBatchMagic) throws Exception {
283282
Integer.MAX_VALUE))
284283
.isInstanceOf(EOFException.class)
285284
.hasMessageContaining("Failed to read `arrow header` from file channel");
285+
fileLogRecords2.close();
286286
}
287287

288288
@ParameterizedTest

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/lookup/FlinkLookupFunctionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,6 @@ void testSchemaChange() throws Exception {
195195
lookupFunction.close();
196196

197197
// start lookup job after schema change.
198-
admin.getTableSchema(tablePath, 1).get();
199198
lookupFunction =
200199
new FlinkLookupFunction(
201200
clientConf,

0 commit comments

Comments
 (0)