Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -450,13 +449,25 @@ protected void waitUntilBucketSynced(TableBucket tb) {

protected void checkDataInPaimonPrimaryKeyTable(
TablePath tablePath, List<InternalRow> expectedRows) throws Exception {
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath);
for (InternalRow expectedRow : expectedRows) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
}
retry(
Duration.ofMinutes(1),
() -> {
try (CloseableIterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath)) {
Map<Integer, String> actualRows = new HashMap<>();
while (paimonRowIterator.hasNext()) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
actualRows.put(row.getInt(0), row.getString(1).toString());
}

Map<Integer, String> expectedRowsByKey = new HashMap<>();
for (InternalRow expectedRow : expectedRows) {
expectedRowsByKey.put(
expectedRow.getInt(0), expectedRow.getString(1).toString());
}
assertThat(actualRows).isEqualTo(expectedRowsByKey);
}
});
}

protected CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowCloseableIterator(
Expand All @@ -475,24 +486,30 @@ protected CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowClos

protected void checkFlussOffsetsInSnapshot(
TablePath tablePath, Map<TableBucket, Long> expectedOffsets) throws Exception {
FileStoreTable table =
(FileStoreTable)
getPaimonCatalog()
.getTable(
Identifier.create(
tablePath.getDatabaseName(),
tablePath.getTableName()));
Snapshot snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNotNull();

String offsetFile = snapshot.properties().get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
Map<TableBucket, Long> recordedOffsets =
new LakeTable(
new LakeTable.LakeSnapshotMetadata(
// don't care about snapshot id
-1, new FsPath(offsetFile), null))
.getOrReadLatestTableSnapshot()
.getBucketLogEndOffset();
assertThat(recordedOffsets).isEqualTo(expectedOffsets);
retry(
Duration.ofMinutes(1),
() -> {
FileStoreTable table =
(FileStoreTable)
getPaimonCatalog()
.getTable(
Identifier.create(
tablePath.getDatabaseName(),
tablePath.getTableName()));
Snapshot snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNotNull();

String offsetFile =
snapshot.properties().get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
assertThat(offsetFile).isNotNull();
Map<TableBucket, Long> recordedOffsets =
new LakeTable(
new LakeTable.LakeSnapshotMetadata(
// don't care about snapshot id
-1, new FsPath(offsetFile), null))
.getOrReadLatestTableSnapshot()
.getBucketLogEndOffset();
assertThat(recordedOffsets).isEqualTo(expectedOffsets);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import java.util.stream.Stream;

import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;

/** IT case for tiering tables to paimon. */
Expand Down Expand Up @@ -496,19 +497,39 @@ private Tuple2<Long, TableDescriptor> createPartitionedTable(
private void checkDataInPaimonAppendOnlyTable(
TablePath tablePath, List<InternalRow> expectedRows, long startingOffset)
throws Exception {
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath);
Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
while (paimonRowIterator.hasNext()) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
InternalRow flussRow = flussRowIterator.next();
assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0));
assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString());
// system columns are always the last three: __bucket, __offset, __timestamp
int offsetIndex = row.getFieldCount() - 2;
assertThat(row.getLong(offsetIndex)).isEqualTo(startingOffset++);
}
assertThat(flussRowIterator.hasNext()).isFalse();
retry(
Duration.ofMinutes(1),
() -> {
try (CloseableIterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath)) {
List<String> actualRows = new ArrayList<>();
while (paimonRowIterator.hasNext()) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
// system columns are always the last three:
// __bucket, __offset, __timestamp
int offsetIndex = row.getFieldCount() - 2;
actualRows.add(
row.getInt(0)
+ "|"
+ row.getString(1).toString()
+ "|"
+ row.getLong(offsetIndex));
}

List<String> expectedPaimonRows = new ArrayList<>();
long offset = startingOffset;
for (InternalRow flussRow : expectedRows) {
expectedPaimonRows.add(
flussRow.getInt(0)
+ "|"
+ flussRow.getString(1).toString()
+ "|"
+ offset++);
}
assertThat(actualRows)
.containsExactlyInAnyOrderElementsOf(expectedPaimonRows);
}
});
}

private void checkDataInPaimonAppendOnlyPartitionedTable(
Expand All @@ -517,19 +538,41 @@ private void checkDataInPaimonAppendOnlyPartitionedTable(
List<InternalRow> expectedRows,
long startingOffset)
throws Exception {
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath, partitionSpec);
Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
while (paimonRowIterator.hasNext()) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
InternalRow flussRow = flussRowIterator.next();
assertThat(row.getInt(0)).isEqualTo(flussRow.getInt(0));
assertThat(row.getString(1).toString()).isEqualTo(flussRow.getString(1).toString());
assertThat(row.getString(2).toString()).isEqualTo(flussRow.getString(2).toString());
// the idx 3 is __bucket, so use 4
assertThat(row.getLong(4)).isEqualTo(startingOffset++);
}
assertThat(flussRowIterator.hasNext()).isFalse();
retry(
Duration.ofMinutes(1),
() -> {
try (CloseableIterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
getPaimonRowCloseableIterator(tablePath, partitionSpec)) {
List<String> actualRows = new ArrayList<>();
while (paimonRowIterator.hasNext()) {
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
// the idx 3 is __bucket, so use 4
actualRows.add(
row.getInt(0)
+ "|"
+ row.getString(1).toString()
+ "|"
+ row.getString(2).toString()
+ "|"
+ row.getLong(4));
}

List<String> expectedPaimonRows = new ArrayList<>();
long offset = startingOffset;
for (InternalRow flussRow : expectedRows) {
expectedPaimonRows.add(
flussRow.getInt(0)
+ "|"
+ flussRow.getString(1).toString()
+ "|"
+ flussRow.getString(2).toString()
+ "|"
+ offset++);
}
assertThat(actualRows)
.containsExactlyInAnyOrderElementsOf(expectedPaimonRows);
}
});
}

private CloseableIterator<org.apache.paimon.data.InternalRow> getPaimonRowCloseableIterator(
Expand Down