Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,12 @@
<td>Boolean</td>
<td>Enable query auth to give Catalog the opportunity to perform column level and row level permission validation on queries.</td>
</tr>
<tr>
<td><h5>read-optimized</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Improves reading performance by only scanning files which does not need merging. For primary-key tables, only scans files on the topmost level. For append tables, acts like the normal append table.</td>
</tr>
<tr>
<td><h5>read.batch-size</h5></td>
<td style="word-wrap: break-word;">1024</td>
Expand Down
9 changes: 9 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,15 @@ public InlineElement getDescription() {
"Implying how often to perform an optimization compaction, this configuration is used to "
+ "ensure the query timeliness of the read-optimized system table.");

public static final ConfigOption<Boolean> READ_OPTIMIZED =
key("read-optimized")
.booleanType()
.defaultValue(false)
.withDescription(
"Improves reading performance by only scanning files which does not need merging. "
+ "For primary-key tables, only scans files on the topmost level. "
+ "For append tables, acts like the normal append table.");

public static final ConfigOption<MemorySize> COMPACTION_TOTAL_SIZE_THRESHOLD =
key("compaction.total-size-threshold")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -70,6 +72,7 @@
import static java.lang.Boolean.parseBoolean;
import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
import static org.apache.paimon.CoreOptions.LOG_CONSISTENCY;
import static org.apache.paimon.CoreOptions.READ_OPTIMIZED;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.STREAMING_READ_MODE;
import static org.apache.paimon.CoreOptions.StartupMode.FROM_SNAPSHOT;
Expand Down Expand Up @@ -110,6 +113,11 @@ && parseBoolean(options.get(SCAN_BOUNDED.key()))) {
}
if (origin instanceof SystemCatalogTable) {
return new SystemTableSource(table, unbounded, context.getObjectIdentifier());
} else if (!unbounded && parseBoolean(options.get(READ_OPTIMIZED.key()))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems READ_OPTIMIZED is not a generic option, only work for Flink SQL.

Table roTable =
SystemTableLoader.load(
ReadOptimizedTable.READ_OPTIMIZED, (FileStoreTable) table);
return new SystemTableSource(roTable, unbounded, context.getObjectIdentifier());
} else {
return new DataTableSource(
context.getObjectIdentifier(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
Expand Down Expand Up @@ -1091,6 +1092,48 @@ private void assertStreamingResult(CloseableIterator<Row> it, List<Row> expected
assertThat(actual).hasSameElementsAs(expected);
}

@Test
public void testReadOptimizedWithHint() throws Exception {
TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);

tEnv.executeSql(
String.format(
"CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');",
getTempDirPath()));
tEnv.useCatalog("PAIMON");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
tEnv.executeSql("USE test_db").await();
tEnv.executeSql(
"CREATE TABLE t ("
+ " k INT,"
+ " v STRING,"
+ " hh INT,"
+ " dt STRING,"
+ " PRIMARY KEY (k, dt, hh) NOT ENFORCED"
+ ") PARTITIONED BY (dt, hh) WITH ("
+ " 'bucket' = '1'"
+ ")");
tEnv.executeSql(
"INSERT INTO t VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')")
.await();

List<Row> beforeFullCompact = new ArrayList<>();
tEnv.executeSql("SELECT * FROM `t` /*+ OPTIONS('read-optimized'='true') */")
.collect()
.forEachRemaining(beforeFullCompact::add);
assertThat(beforeFullCompact).isEmpty();

tEnv.executeSql("CALL sys.compact(`table` => 'test_db.t', compact_strategy => 'full')")
.await();

List<Row> afterFullCompact = new ArrayList<>();
tEnv.executeSql("SELECT * FROM `t` /*+ OPTIONS('read-optimized'='true') */")
.collect()
.forEachRemaining(afterFullCompact::add);
assertThat(afterFullCompact).size().isEqualTo(3);
}

// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
Expand Down
Loading