Skip to content

Flink: SQL support for dynamic iceberg sink#15279

Open
swapna267 wants to merge 6 commits intoapache:mainfrom
swapna267:dynamic_sink_flink_sql
Open

Flink: SQL support for dynamic iceberg sink#15279
swapna267 wants to merge 6 commits intoapache:mainfrom
swapna267:dynamic_sink_flink_sql

Conversation

@swapna267
Copy link
Contributor

This PR introduces a SQL table connector for using the dynamic iceberg sink.

Two new configuration options have been added to FlinkCreateTableOptions:

  • use-dynamic-iceberg-sink (boolean): Enable/disable dynamic sink functionality
  • dynamic-record-generator-impl (string): Fully qualified class name of the DynamicTableRecordGenerator implementation

Example SQL,

  CREATE TABLE dynamic_sink_table (
      id BIGINT,
      data STRING,
      database_name STRING,
      table_name STRING
  ) WITH (
      'connector' = 'iceberg',
      'catalog-type' = 'hadoop',
      'catalog-name' = 'my_catalog',
      'warehouse' = 's3://my-warehouse/',
      'use-dynamic-iceberg-sink' = 'true',
      'dynamic-record-generator-impl' = 'com.example.MyDynamicRecordGenerator',
      'table.props.write.format.default' = 'parquet',
      'table.props.write.target-file-size-bytes' = '134217728'
  );

  -- Insert data that will be routed to different tables based on database_name and table_name
  INSERT INTO dynamic_sink_table VALUES
      (1, 'record1', 'sales', 'orders'),
      (2, 'record2', 'sales', 'customers'),
      (3, 'record3', 'inventory', 'products');

Planning to provide a CustomVariantToDynamicRecordGenerator that can handle Flink VARIANT type column to generate records of different schemas landing in tables of corresponding schema.
Will add that in a different PR.

@github-actions github-actions bot added the flink label Feb 9, 2026
Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @swapna267! This looks great.

}

@TestTemplate
public void testCreateDynamicIcebergSink() throws DatabaseAlreadyExistException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we verify this test works with both the old FlinkSink and the new IcebergSink?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test in particular is testing the DynamicIcebergSink only by setting use-dynamic-iceberg-sink to true.

But i also see, TestIcebergConnector is not testing the new IcebergSink code path. Partially it's covered in TestFlinkTableSink (where iceberg tables are created in Iceberg catalog).

If my understanding is right, i prefer to put that into separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. The test is fine as-is.

Comment on lines +145 to +150
String dynamicRecordGeneratorImpl =
flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
Preconditions.checkNotNull(
dynamicRecordGeneratorImpl,
"%s must be specified when use-dynamic-iceberg-sink is true",
FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test to verify these conditions?

Copy link
Contributor Author

@swapna267 swapna267 Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can add. Don't see such detailed one's in general . Also concerned about the time tests take today to complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have many such tests for Dynamic Sink. Not specifying the record generator will probably error when it's being created, but it would still be nice to check for the particular error message reported back to the user. I'll leave it up to you to add it or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Contributor

@Guosmilesmile Guosmilesmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the Pr!Left some comments.


private TableCreator createTableCreator() {
final Map<String, String> tableProperties =
org.apache.iceberg.util.PropertyUtil.propertiesWithPrefix(writeProps, "table.props.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I’m not mistaken, if we want to set the table property write.parquet.row-group-size-bytes, do we need to specify it here as table.props.write.parquet.row-group-size-bytes? I think this should be documented and we should add a corresponding test case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes right. When doing CREATE TABLE in flink catalog , we pass in catalog configuration here.
table.props prefix is used to separate out the physical Iceberg table properties.

Basic documentation about the connector is here,
https://iceberg.apache.org/docs/nightly/flink-connector/
Once we have all functionality (dynamic record generator impl is coming in next PR), will add details there.

I combined this in existing test case , https://github.com/swapna267/iceberg/blob/bd2d500f07fb24d05111b6dabc9a8e77637a922c/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestIcebergConnector.java#L393
I can pull it out into another one if we think it's required.

@swapna267
Copy link
Contributor Author

Thanks @mxm and @Guosmilesmile for the review. Replied on some comments.

.getCatalogLoader()
.loadCatalog()
.loadTable(TableIdentifier.of(databaseName(), tableName()));
assertThat(table.properties()).containsEntry("key1", "val1");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also verify the records written to the table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines +145 to +150
String dynamicRecordGeneratorImpl =
flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
Preconditions.checkNotNull(
dynamicRecordGeneratorImpl,
"%s must be specified when use-dynamic-iceberg-sink is true",
FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL.key());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have many such tests for Dynamic Sink. Not specifying the record generator will probably error when it's being created, but it would still be nice to check for the particular error message reported back to the user. I'll leave it up to you to add it or not.

@mxm
Copy link
Contributor

mxm commented Feb 12, 2026

LGTM, just some minor comments.

tableLoader, resolvedSchema, context.getConfiguration(), writeProps);
return new IcebergTableSink(
tableLoader,
resolvedCatalogTable.getResolvedSchema(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we remove the filtering for the physical columns?

Context context, Configuration flinkConf, Map<String, String> writeProps) {
String dynamicRecordGeneratorImpl =
flinkConf.get(FlinkCreateTableOptions.DYNAMIC_RECORD_GENERATOR_IMPL);
Preconditions.checkNotNull(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have received several comments that instead of checkNotNull we should use checkArgument and a message like Invalid dynamic record generator value: %s. %s must be specified when use-dynamic-iceberg-sink is true.

return new IcebergTableSink(
catalogLoader,
dynamicRecordGeneratorImpl,
resolvedCatalogTable.getResolvedSchema(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need something like this?

    ResolvedSchema resolvedSchema =
        ResolvedSchema.of(
            resolvedCatalogTable.getResolvedSchema().getColumns().stream()
                .filter(Column::isPhysical)
                .collect(Collectors.toList()));


private static FlinkCatalog createCatalogLoader(
Map<String, String> tableProps, String catalogName) {
Preconditions.checkNotNull(
Copy link
Contributor

@pvary pvary Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use checkArgument and "standard error message"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this is only a move for this check. Do you think it would cause any issues if we change this to the new standard?

Comment on lines +178 to +180
org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
FlinkCatalogFactory factory = new FlinkCatalogFactory();
return (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Just to get rid of the org.apache.hadoop.conf.Configuration

Suggested change
org.apache.hadoop.conf.Configuration hadoopConf = FlinkCatalogFactory.clusterHadoopConf();
FlinkCatalogFactory factory = new FlinkCatalogFactory();
return (FlinkCatalog) factory.createCatalog(catalogName, tableProps, hadoopConf);
FlinkCatalogFactory factory = new FlinkCatalogFactory();
return (FlinkCatalog) factory.createCatalog(catalogName, tableProps, FlinkCatalogFactory.clusterHadoopConf(););

@pvary
Copy link
Contributor

pvary commented Feb 17, 2026

Nice stuff @swapna267!

Could you please update the documentation too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants

Comments