Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,39 @@ public int maxRecordsPerMicroBatch() {
.parse();
}

public boolean asyncMicroBatchPlanningEnabled() {
return confParser
.booleanConf()
.option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
.sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
.parse();
}

public long streamingSnapshotPollingIntervalMs() {
return confParser
.longConf()
.option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
.parse();
}

public long asyncQueuePreloadFileLimit() {
return confParser
.longConf()
.option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
.defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
.parse();
}

public long asyncQueuePreloadRowLimit() {
return confParser
.longConf()
.option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
.defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
.parse();
}

public boolean preserveDataGrouping() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,19 @@ private SparkReadOptions() {}
public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
"streaming-max-rows-per-micro-batch";

// Enable async micro batch planning
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
"async-micro-batch-planning-enabled";
// Polling interval for async planner to refresh table metadata (ms)
public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
"streaming-snapshot-polling-interval-ms";
public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT = 30000L;
// Initial queue preload limits for async micro batch planner
public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT = "async-queue-preload-file-limit";
public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT = "async-queue-preload-row-limit";
public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;

// Table path
public static final String PATH = "path";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,9 @@ private SparkSQLProperties() {}

// Prefix for custom snapshot properties
public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property.";

// Controls whether to enable async micro batch planning for session
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
"spark.sql.iceberg.async-micro-batch-planning-enabled";
public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false;
}
Loading