Skip to content

Make Internal Compacted Topic Partition Count & Replication Factor Configurable#1933

Open
senthuran16 wants to merge 9 commits into
mainfrom
websub-kafka-improvements-2
Open

Make Internal Compacted Topic Partition Count & Replication Factor Configurable#1933
senthuran16 wants to merge 9 commits into
mainfrom
websub-kafka-improvements-2

Conversation

@senthuran16
Copy link
Copy Markdown
Member

@senthuran16 senthuran16 commented May 11, 2026

Purpose

Internal compacted Kafka topics were always created with 1 partition and replication factor 1, which ignores cluster policy and deployment-specific sizing needs.
Resolves #1882

Goals

  • make internal compacted topic partition count configurable
  • make internal compacted topic replication factor configurable
  • keep sensible defaults and reject invalid values early

Approach

  • added kafka.compact_topic_partitions and kafka.compact_topic_replication_factor to the existing runtime Kafka config path
  • threaded those values through Kafka connection resolution with defaulting and positive-value validation
  • replaced the hardcoded CreateTopics(..., 1, 1, ...) call in EnsureCompactedTopic(...) with the resolved config values

User stories

Operators can align internal compacted topic creation with cluster replication and partitioning requirements without patching code.

Documentation

N/A - config-only runtime behavior change; no product doc update included in this PR.

Automation tests

  • Unit tests

    Validated with existing package tests: env GOCACHE=/tmp/go-build-cache go test ./event-gateway/gateway-runtime/internal/config ./event-gateway/gateway-runtime/ internal/connectors/brokerdriver/kafka

  • Integration tests

    Not run in this turn

Security checks

Samples

N/A

Related PRs

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 11, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

This pull request introduces configurable Kafka compacted topic partition and replication factor settings. The global Kafka runtime configuration gains two integer fields with default value 1. These values propagate to connection-level configuration, which supports per-binding overrides through a new numeric validation helper. The configuration values are then applied during compacted topic creation, replacing previous hardcoded partition and replica counts. Supporting test coverage validates override behavior, bounds checking, and numeric type handling. An unrelated change increases WebSub consumer group ID hash truncation from 16 to 32 characters for collision reduction.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 13.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Title check ✅ Passed The title directly summarizes the main change: making Kafka compacted topic partition count and replication factor configurable instead of hardcoded.
Description check ✅ Passed The pull request description addresses all required template sections with clear and complete information.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch websub-kafka-improvements-2

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go (1)

154-191: 💤 Low value

Consider expanding test coverage for intOverride.

The current tests validate non-integer, out-of-bounds, and NaN float64 inputs. Consider adding test cases for:

  • Positive/negative infinity: math.Inf(1) and math.Inf(-1)
  • Direct int type input (line 305 in config.go handles this case)
  • nil input
  • Invalid types like string

These additions would provide more comprehensive coverage of the helper's type handling.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go`
around lines 154 - 191, Expand TestIntOverride_RejectsInvalidFloat64 (or add a
new table-driven test) to cover more input types for intOverride: add cases for
math.Inf(1) and math.Inf(-1) expecting rejection with "non-finite", add a case
passing an int (e.g. 42) that should return ok==true and no error, add a nil
input expecting error and ok==false, and add an invalid type like a string
expecting error and ok==false; use the same pattern of t.Run and assertions
(check err nil/nonnull, ok flag, and error message contents) so the test
exercises intOverride's type handling branches.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go`:
- Around line 221-224: The consumerGroupID function now truncates the SHA256
hash to 32 chars instead of 16, which changes group IDs and will orphan existing
WebSub consumers; revert or migrate: update consumerGroupID (function
consumerGroupID in ConsumerManager) to preserve the previous format (use
hex.EncodeToString(h[:])[:16]) or implement compatibility by generating the old
16-char ID when a matching consumer group exists (or support both IDs for a
transition period), and add a clear release-note/migration entry describing the
breaking change and steps to migrate offsets if you choose the 32-char format.

---

Nitpick comments:
In
`@event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go`:
- Around line 154-191: Expand TestIntOverride_RejectsInvalidFloat64 (or add a
new table-driven test) to cover more input types for intOverride: add cases for
math.Inf(1) and math.Inf(-1) expecting rejection with "non-finite", add a case
passing an int (e.g. 42) that should return ok==true and no error, add a nil
input expecting error and ok==false, and add an invalid type like a string
expecting error and ok==false; use the same pattern of t.Run and assertions
(check err nil/nonnull, ok flag, and error message contents) so the test
exercises intOverride's type handling branches.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: f04cb9d3-f051-4665-bb28-7170537092ff

📥 Commits

Reviewing files that changed from the base of the PR and between 7bcef00 and 5fe0f5b.

📒 Files selected for processing (6)
  • event-gateway/gateway-runtime/configs/config.toml
  • event-gateway/gateway-runtime/internal/config/config.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/config_test.go
  • event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka/endpoint.go
  • event-gateway/gateway-runtime/internal/connectors/receiver/websub/consumer_manager.go

@senthuran16 senthuran16 changed the title Egw/kafka sub topic partition Make Internal Compacted Topic Partition Count & Replication Factor Configurable May 11, 2026
@senthuran16
Copy link
Copy Markdown
Member Author

Holding merging this PR until the test suite is ready

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Improvement]: Make the compacted topic's partition/replication settings configurable.

2 participants