Skip to content

[codex] Fix schema registry auth for direct StreamNative catalogs#390

Open
xylaaaaa wants to merge 1 commit into
apache:trino-435from
xylaaaaa:codex/streamnative-schema-registry-auth-fix
Open

[codex] Fix schema registry auth for direct StreamNative catalogs#390
xylaaaaa wants to merge 1 commit into
apache:trino-435from
xylaaaaa:codex/streamnative-schema-registry-auth-fix

Conversation

@xylaaaaa
Copy link
Copy Markdown

What changed

  • preserve full schema registry URLs for the Kafka connector's Confluent path and normalize host:port inputs to https://host:port
  • explicitly keep Schema Registry Basic Auth properties in the final client properties passed to CachedSchemaRegistryClient
  • add focused tests for URL normalization and Basic Auth property propagation

Why

Apache Doris was able to reach StreamNative Kafka through Routine Load and Kafka Connect, but Kafka Catalog failed in the Confluent schema-registry path. The failure had two parts:

  1. the schema registry URL was reduced to host text, which broke direct URL construction
  2. Basic Auth properties were not reliably present in the final schema-registry client configuration

This patch fixes the direct StreamNative path used by Kafka Catalog.

Validation

  • mvn -pl plugin/trino-kafka -Dtest=TestConfluentSchemaRegistryConfig,TestConfluentModule -DfailIfNoTests=false test
  • manual Doris validation against StreamNative:
    • SHOW DATABASES FROM sn_kafka_sr_fixed
    • SHOW TABLES after SWITCH sn_kafka_sr_fixed; USE \default``
    • SELECT * FROM doris_schema_avro LIMIT 5

Notes

  • This PR targets the direct Schema Registry access path.
  • It does not try to address local proxy-specific behavior.

@xylaaaaa xylaaaaa marked this pull request as ready for review May 14, 2026 03:19
Copilot AI review requested due to automatic review settings May 14, 2026 03:19
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Updates Confluent Schema Registry configuration and integration to (a) normalize registry URLs as HTTPS by default and (b) improve schema registry client property handling (notably basic auth propagation).

Changes:

  • Switch schema registry URL config from Set<HostAddress> to List<String> and normalize host:port entries to https://...
  • Add subject-mapping config coverage and URL normalization tests
  • Refactor auth wiring and introduce buildSchemaRegistryClientProperties(...) to merge provider properties and fill in confluent/basic-auth keys

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentSchemaRegistryConfig.java Normalize registry URLs to HTTPS and change URL type to list of strings
plugin/trino-kafka/src/main/java/io/trino/plugin/kafka/schema/confluent/ConfluentModule.java Conditional auth binding refactor; property merge/build helper added
plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentSchemaRegistryConfig.java Add tests for URL normalization and subject-mapping parsing; update defaults/mappings
plugin/trino-kafka/src/test/java/io/trino/plugin/kafka/schema/confluent/TestConfluentModule.java New test validating merged schema-registry basic auth properties

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +114 to +121
configBinder(binder).bindConfig(BasicAuthConfig.class);
install(conditionalModule(
ConfluentSchemaRegistryConfig.class,
schemaRegistryConfig -> schemaRegistryConfig.getConfluentSchemaRegistryAuthType() == BASIC_AUTH,
authBinder -> authBinder.bind(SchemaRegistryClientPropertiesProvider.class)
.to(ConfluentSchemaRegistryBasicAuth.class)
.in(Scopes.SINGLETON),
authBinder -> authBinder.bind(SchemaRegistryClientPropertiesProvider.class)
Comment on lines +153 to +160
Map<String, Object> schemaRegistryClientProperties = propertiesProviders.stream()
.map(SchemaRegistryClientPropertiesProvider::getSchemaRegistryClientProperties)
.flatMap(properties -> properties.entrySet().stream())
.collect(java.util.stream.Collectors.toMap(
Map.Entry::getKey,
entry -> (Object) entry.getValue(),
(left, right) -> right,
HashMap::new));
return stream(splitter.split(nodes))
.map(ConfluentSchemaRegistryConfig::toHostAddress)
.collect(toImmutableSet());
.map(ConfluentSchemaRegistryConfig::normalizeUrl)
Comment on lines +43 to +44
.setConfluentSubjectsCacheRefreshInterval(new Duration(1, SECONDS))
.setConfluentSchemaRegistrySubjectMapping(null));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants