Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions misc/python/materialize/checks/all_checks/kafka_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -1498,3 +1498,91 @@ def validate(self) -> Testdrive:

# TODO: kafka-verify-data when it can deal with being run twice, to check the actual partitioning
"""))


@externally_idempotent(False)
class KafkaTopicMetadataRefreshInterval(Check):
"""Regression test for SS-112 (#36461).

The commit rejects `TOPIC METADATA REFRESH INTERVAL < 1s` at planning time
for Kafka sources and sinks. Because catalog bootstrap re-plans every
object's persisted `create_sql`, an environment that already has such an
object -- created on an older version that accepted it -- panics
environmentd on upgrade ("invalid persisted SQL") and crash-loops the whole
environment. We create the objects on the old base version and assert the
environment still boots and they survive the upgrade.
"""

def _can_run(self, e: Executor) -> bool:
# The < 1s rejection landed in v26.30. Only reproducible when the base
# version still accepts the value; otherwise the CREATE in initialize()
# fails up-front. Also makes this a no-op in non-upgrade scenarios
# (base == current, rejecting build).
return self.base_version < MzVersion.parse_mz("v26.30.0-dev")

def initialize(self) -> Testdrive:
# '999ms' source / '500ms' sink: accepted and functional pre-rejection,
# rejected when the new planner re-plans their create_sql on bootstrap.
# ('0s' -- the literal SS-112 case -- is avoided: it crash-loops the
# storage task on the old version; any sub-second value triggers the
# bootstrap panic equally well.)
return Testdrive(dedent("""
$ kafka-create-topic topic=tmri-source partitions=1

$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=tmri-source
"k1":"v1"

> CREATE SOURCE tmri_source_src
FROM KAFKA CONNECTION kafka_conn (
TOPIC 'testdrive-tmri-source-${testdrive.seed}',
TOPIC METADATA REFRESH INTERVAL '999ms'
)
> CREATE TABLE tmri_source FROM SOURCE tmri_source_src (REFERENCE "testdrive-tmri-source-${testdrive.seed}")
KEY FORMAT JSON VALUE FORMAT JSON ENVELOPE UPSERT

> CREATE TABLE tmri_sink_table (f1 INTEGER)
> INSERT INTO tmri_sink_table VALUES (1)
> CREATE SINK tmri_sink FROM tmri_sink_table
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'testdrive-tmri-sink-${testdrive.seed}',
TOPIC METADATA REFRESH INTERVAL '500ms'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM
"""))

def manipulate(self) -> list[Testdrive]:
# Only flow data; phase 2 runs on the new (rejecting) build, so no
# sub-second-interval DDL here.
return [
Testdrive(dedent(s))
for s in [
"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=tmri-source
"k2":"v2"
> INSERT INTO tmri_sink_table VALUES (2)
""",
"""
$ kafka-ingest format=bytes key-format=bytes key-terminator=: topic=tmri-source
"k3":"v3"
> INSERT INTO tmri_sink_table VALUES (3)
""",
]
]

def validate(self) -> Testdrive:
# Reaching validate() means bootstrap survived the upgrade; also assert
# both objects are still present and healthy.
return Testdrive(dedent("""
$ set-sql-timeout duration=120s
> SELECT count(*) FROM tmri_source
3
> SELECT count(*) FROM tmri_sink_table
3
> SELECT count(*) FROM mz_sources WHERE name = 'tmri_source_src'
1
> SELECT count(*) FROM mz_sinks WHERE name = 'tmri_sink'
1
> SELECT status FROM mz_internal.mz_sink_statuses WHERE name = 'tmri_sink'
running
"""))
65 changes: 65 additions & 0 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// by the Apache License, Version 2.0.

use std::collections::{BTreeMap, BTreeSet};
use std::time::Duration;

use base64::prelude::*;
use maplit::btreeset;
Expand Down Expand Up @@ -153,6 +154,7 @@ pub(crate) async fn migrate(
ast_rewrite_create_sink_partition_strategy(stmt)?;
ast_rewrite_sql_server_constraints(stmt)?;
ast_rewrite_add_missing_index_ids(tx, stmt)?;
ast_rewrite_kafka_metadata_refresh_intervals(stmt)?;
Ok(())
})?;

Expand Down Expand Up @@ -1039,3 +1041,66 @@ fn ast_rewrite_add_missing_index_ids(

Ok(())
}

fn ast_rewrite_kafka_metadata_refresh_intervals(
stmt: &mut Statement<Raw>,
) -> Result<(), anyhow::Error> {
use mz_repr::strconv::parse_interval;
use mz_sql::ast::{
CreateSinkConnection, CreateSourceConnection, KafkaSinkConfigOptionName,
KafkaSourceConfigOptionName, Value, WithOptionValue,
};
let interval: Option<&mut String> = match stmt {
Statement::CreateSource(stmt) => {
if let CreateSourceConnection::Kafka { options, .. } = &mut stmt.connection {
options.iter_mut().find_map(|option| {
if matches!(
option.name,
KafkaSourceConfigOptionName::TopicMetadataRefreshInterval
) && let Some(WithOptionValue::Value(Value::String(ref mut s))) =

@martykulma martykulma Jun 29, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think TopicMetadataRefreshInterval can also be Value::Number (TOPIC METADATA REFRESH INTERVAL = 0.5) and Value::Interval (TOPIC METADATA REFRESH INTERVAL = INTERVAL '0.5s')

Ultimately, its rust type is Duration in src/sql/src/kafka_util.rs, but I think that gets parsed out using TryFromValue in with_options.rs - which parses it as an interval.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, QA LLM review flagged the same thing

option.value
{
Some(s)
} else {
None
}
})
} else {
None
}
}
Statement::CreateSink(stmt) => {
if let CreateSinkConnection::Kafka { options, .. } = &mut stmt.connection {
options.iter_mut().find_map(|option| {
if matches!(
option.name,
KafkaSinkConfigOptionName::TopicMetadataRefreshInterval
) && let Some(WithOptionValue::Value(Value::String(ref mut s))) =
option.value
{
Some(s)
} else {
None
}
})
} else {
None
}
}
_ => None,
};

let Some(interval) = interval else {
return Ok(());
};

let interval_dur = parse_interval(interval)
.map_err(|e| anyhow::anyhow!("failed to parse interval: {e}"))?
.duration()?;

if interval_dur < Duration::from_secs(1) {
*interval = "1s".to_string();
}

Ok(())
}
13 changes: 7 additions & 6 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1205,7 +1205,7 @@ fn plan_kafka_source_connection(
let KafkaSourceConfigOptionExtracted {
group_id_prefix,
topic,
mut topic_metadata_refresh_interval,
topic_metadata_refresh_interval,
start_timestamp: _, // purified into `start_offset`
start_offset,
seen: _,
Expand All @@ -1226,9 +1226,10 @@ fn plan_kafka_source_connection(
// would result in a runtime error for the source.
sql_bail!("TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour");
}
if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
// We enforce a minimum of 1 second refresh interval to prevent overloading the topic
topic_metadata_refresh_interval = MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL;
if topic_metadata_refresh_interval < Duration::from_secs(1) {
// This is a librdkafka-enforced restriction that, if violated,
// would result in a runtime error for the source.
sql_bail!("TOPIC METADATA REFRESH INTERVAL must be at least 1 second");
}
let metadata_columns = include_metadata
.into_iter()
Expand Down Expand Up @@ -3805,7 +3806,7 @@ fn kafka_sink_builder(
transactional_id_prefix,
legacy_ids,
topic_config,
mut topic_metadata_refresh_interval,
topic_metadata_refresh_interval,
topic_partition_count,
topic_replication_factor,
seen: _,
Expand Down Expand Up @@ -3836,7 +3837,7 @@ fn kafka_sink_builder(
} else if topic_metadata_refresh_interval < MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL {
// We enforce a minimum of 1 second here to prevent excessive refreshes, and ensure that
// tokio::time::interval receives a valid (positive) duration.
topic_metadata_refresh_interval = MIN_KAFKA_TOPIC_METADATA_REFRESH_INTERVAL;
sql_bail!("TOPIC METADATA REFRESH INTERVAL must be at least 1 second");
}

let assert_positive = |val: Option<i32>, name: &str| {
Expand Down
11 changes: 11 additions & 0 deletions test/testdrive/kafka-sink-errors.td
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ contains:column referenced in KEY does not exist: f2
ENVELOPE DEBEZIUM
contains:LEGACY IDs option is not supported

! CREATE SINK zero_interval_sink
IN CLUSTER ${arg.single-replica-cluster}
FROM v1
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'testdrive-kafka-sink-zero-interval-${testdrive.seed}',
TOPIC METADATA REFRESH INTERVAL '0s'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM
contains:TOPIC METADATA REFRESH INTERVAL must be at least 1 second

#
# Sink dependencies
#
Expand Down
23 changes: 0 additions & 23 deletions test/testdrive/kafka-sinks.td
Original file line number Diff line number Diff line change
Expand Up @@ -315,27 +315,6 @@ contains:Expected one of PARTITION or SNAPSHOT or VERSION or COMMIT, found SIZE
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM

> CREATE SINK zero_interval_sink
IN CLUSTER ${arg.single-replica-cluster}
FROM src_tbl
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'testdrive-kafka-sink-zero-interval-${testdrive.seed}',
TOPIC METADATA REFRESH INTERVAL '0s'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM

> CREATE SINK clamp_refresh_interval_sink
IN CLUSTER ${arg.single-replica-cluster}
FROM src_tbl
INTO KAFKA CONNECTION kafka_conn (
TOPIC 'testdrive-kafka-sink-clamp-interval-${testdrive.seed}',
TOPIC METADATA REFRESH INTERVAL '1ms'
)
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE DEBEZIUM


# All sinks are unlinked
> SELECT bool_and(size IS NULL) FROM mz_sinks;
true
Expand All @@ -357,5 +336,3 @@ snk7 kafka snk7_cluster ""
snk8 kafka snk8_cluster ""
snk9 kafka snk9_cluster ""
snk_unsigned kafka snk_unsigned_cluster ""
zero_interval_sink kafka ${arg.single-replica-cluster} ""
clamp_refresh_interval_sink kafka ${arg.single-replica-cluster} ""
3 changes: 2 additions & 1 deletion test/testdrive/kafka-source-errors.td
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ contains:cannot convert negative interval to duration
)
contains:TOPIC METADATA REFRESH INTERVAL cannot be greater than 1 hour

> CREATE SOURCE clamp_topic_metadata_refresh_interval
! CREATE SOURCE bad_topic_metadata_refresh_interval
IN CLUSTER ${arg.single-replica-cluster}
FROM KAFKA CONNECTION kafka_conn (
TOPIC 'testdrive-thetopic-${testdrive.seed}',
TOPIC METADATA REFRESH INTERVAL '1ms'
)
contains:TOPIC METADATA REFRESH INTERVAL must be at least 1 second
Loading