Skip to content

Commit 1124819

Browse files
committed
[flink] Supports read-optimized via hints
1 parent faba3a1 commit 1124819

File tree

3 files changed

+60
-0
lines changed

3 files changed

+60
-0
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,15 @@ public InlineElement getDescription() {
733733
"Implying how often to perform an optimization compaction, this configuration is used to "
734734
+ "ensure the query timeliness of the read-optimized system table.");
735735

736+
public static final ConfigOption<Boolean> READ_OPTIMIZED =
737+
key("read-optimized")
738+
.booleanType()
739+
.defaultValue(false)
740+
.withDescription(
741+
"Improves reading performance by only scanning files which does not need merging. "
742+
+ "For primary-key tables, only scans files on the topmost level. "
743+
+ "For append tables, acts like the normal append table.");
744+
736745
public static final ConfigOption<MemorySize> COMPACTION_TOTAL_SIZE_THRESHOLD =
737746
key("compaction.total-size-threshold")
738747
.memoryType()

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.apache.paimon.table.FileStoreTable;
3737
import org.apache.paimon.table.FileStoreTableFactory;
3838
import org.apache.paimon.table.Table;
39+
import org.apache.paimon.table.system.ReadOptimizedTable;
40+
import org.apache.paimon.table.system.SystemTableLoader;
3941
import org.apache.paimon.utils.Preconditions;
4042

4143
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -70,6 +72,7 @@
7072
import static java.lang.Boolean.parseBoolean;
7173
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
7274
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
75+
import static org.apache.paimon.CoreOptions.READ_OPTIMIZED;
7376
import static org.apache.paimon.CoreOptions.SCAN_MODE;
7477
import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
7578
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
@@ -110,6 +113,11 @@ && parseBoolean(options.get(SCAN_BOUNDED.key()))) {
110113
}
111114
if (origin instanceof SystemCatalogTable) {
112115
return new SystemTableSource(table, unbounded, context.getObjectIdentifier());
116+
} else if (!unbounded && parseBoolean(options.get(READ_OPTIMIZED.key()))) {
117+
Table roTable =
118+
SystemTableLoader.load(
119+
ReadOptimizedTable.READ_OPTIMIZED, (FileStoreTable) table);
120+
return new SystemTableSource(roTable, unbounded, context.getObjectIdentifier());
113121
} else {
114122
return new DataTableSource(
115123
context.getObjectIdentifier(),

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.table.api.TableEnvironment;
3737
import org.apache.flink.table.api.TableResult;
3838
import org.apache.flink.table.api.config.ExecutionConfigOptions;
39+
import org.apache.flink.table.api.config.TableConfigOptions;
3940
import org.apache.flink.types.Row;
4041
import org.apache.flink.types.RowKind;
4142
import org.apache.flink.util.CloseableIterator;
@@ -1091,6 +1092,48 @@ private void assertStreamingResult(CloseableIterator<Row> it, List<Row> expected
10911092
assertThat(actual).hasSameElementsAs(expected);
10921093
}
10931094

1095+
@Test
1096+
public void testReadOptimizedWithHint() throws Exception {
1097+
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
1098+
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
1099+
1100+
tEnv.executeSql(
1101+
String.format(
1102+
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');",
1103+
getTempDirPath()));
1104+
tEnv.useCatalog("PAIMON");
1105+
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
1106+
tEnv.executeSql("USE test_db").await();
1107+
tEnv.executeSql(
1108+
"CREATE TABLE T ("
1109+
+ " k INT,"
1110+
+ " v STRING,"
1111+
+ " hh INT,"
1112+
+ " dt STRING,"
1113+
+ " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
1114+
+ ") PARTITIONED BY (dt, hh) WITH ("
1115+
+ " 'bucket' = '1'"
1116+
+ ")");
1117+
tEnv.executeSql(
1118+
"INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')")
1119+
.await();
1120+
1121+
List<Row> beforeFullCompact = new ArrayList<>();
1122+
tEnv.executeSql("SELECT * FROM `t` /*+ OPTIONS('read-optimized'='true') */")
1123+
.collect()
1124+
.forEachRemaining(beforeFullCompact::add);
1125+
assertThat(beforeFullCompact).isEmpty();
1126+
1127+
tEnv.executeSql("CALL sys.compact(`table` => 'test_db.t', compact_strategy => 'full')")
1128+
.await();
1129+
1130+
List<Row> afterFullCompact = new ArrayList<>();
1131+
tEnv.executeSql("SELECT * FROM `t` /*+ OPTIONS('read-optimized'='true') */")
1132+
.collect()
1133+
.forEachRemaining(afterFullCompact::add);
1134+
assertThat(afterFullCompact).size().isEqualTo(3);
1135+
}
1136+
10941137
// ------------------------------------------------------------------------
10951138
// Random Tests
10961139
// ------------------------------------------------------------------------

0 commit comments

Comments
 (0)