Skip to content
Open
3 changes: 3 additions & 0 deletions event-gateway/gateway-runtime/configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ metrics_port = 9003
# Default Kafka brokers. Channels can override with broker-driver.config.brokers.
brokers = ["localhost:9092"]
consumer_group_prefix = "event-gateway"
# Partitions and replication factor used for internal compacted topics.
compact_topic_partitions = 1
compact_topic_replication_factor = 1
tls = false
# Optional PEM CA file for self-signed or private Kafka CAs.
# tls_ca_file = "/etc/event-gateway/kafka/ca.crt"
Expand Down
36 changes: 24 additions & 12 deletions event-gateway/gateway-runtime/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@ type ServerConfig struct {

// KafkaConfig holds Kafka connection settings.
type KafkaConfig struct {
Brokers []string `koanf:"brokers"`
ConsumerGroupPrefix string `koanf:"consumer_group_prefix"`
TLS bool `koanf:"tls"`
TLSCAFile string `koanf:"tls_ca_file"`
TLSCertFile string `koanf:"tls_cert_file"`
TLSKeyFile string `koanf:"tls_key_file"`
TLSServerName string `koanf:"tls_server_name"`
SASLMechanism string `koanf:"sasl_mechanism"`
SASLUsername string `koanf:"sasl_username"`
SASLPassword string `koanf:"sasl_password"`
Brokers []string `koanf:"brokers"`
ConsumerGroupPrefix string `koanf:"consumer_group_prefix"`
CompactTopicPartitions int `koanf:"compact_topic_partitions"`
CompactTopicReplicationFactor int `koanf:"compact_topic_replication_factor"`
TLS bool `koanf:"tls"`
TLSCAFile string `koanf:"tls_ca_file"`
TLSCertFile string `koanf:"tls_cert_file"`
TLSKeyFile string `koanf:"tls_key_file"`
TLSServerName string `koanf:"tls_server_name"`
SASLMechanism string `koanf:"sasl_mechanism"`
SASLUsername string `koanf:"sasl_username"`
SASLPassword string `koanf:"sasl_password"`
}

// WebSubConfig holds WebSub-specific settings.
Expand Down Expand Up @@ -111,8 +113,10 @@ func DefaultConfig() *Config {
MetricsPort: 9003,
},
Kafka: KafkaConfig{
Brokers: []string{"localhost:9092"},
ConsumerGroupPrefix: "event-gateway",
Brokers: []string{"localhost:9092"},
ConsumerGroupPrefix: "event-gateway",
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
},
WebSub: WebSubConfig{
VerificationTimeoutSeconds: 10,
Expand Down Expand Up @@ -223,6 +227,8 @@ func mapEnvValue(path, value string) interface{} {
"server.websocket_port",
"server.admin_port",
"server.metrics_port",
"kafka.compact_topic_partitions",
"kafka.compact_topic_replication_factor",
"websub.verification_timeout_seconds",
"websub.delivery_max_retries",
"websub.delivery_initial_delay_ms",
Expand Down Expand Up @@ -295,6 +301,12 @@ func validateKafkaConfig(kafkaCfg KafkaConfig) error {
if len(kafkaCfg.Brokers) == 0 {
return fmt.Errorf("kafka.brokers must contain at least one broker")
}
if kafkaCfg.CompactTopicPartitions <= 0 {
return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", kafkaCfg.CompactTopicPartitions)
}
if kafkaCfg.CompactTopicReplicationFactor <= 0 {
return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", kafkaCfg.CompactTopicReplicationFactor)
}

if kafkaCfg.TLS {
if strings.TrimSpace(kafkaCfg.TLSCAFile) != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"math"
"os"
"strings"

Expand All @@ -34,29 +35,39 @@ import (

// ConnectionConfig holds the Kafka connection settings used by the driver.
type ConnectionConfig struct {
Brokers []string
TLS bool
TLSCAFile string
TLSCertFile string
TLSKeyFile string
TLSServerName string
SASLMechanism string
SASLUsername string
SASLPassword string
Brokers []string
CompactTopicPartitions int
CompactTopicReplicationFactor int
TLS bool
TLSCAFile string
TLSCertFile string
TLSKeyFile string
TLSServerName string
SASLMechanism string
SASLUsername string
SASLPassword string
}

// ResolveConnectionConfig merges global runtime config with per-binding overrides.
func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]interface{}) (ConnectionConfig, error) {
cfg := ConnectionConfig{
Brokers: append([]string(nil), global.Brokers...),
TLS: global.TLS,
TLSCAFile: global.TLSCAFile,
TLSCertFile: global.TLSCertFile,
TLSKeyFile: global.TLSKeyFile,
TLSServerName: global.TLSServerName,
SASLMechanism: global.SASLMechanism,
SASLUsername: global.SASLUsername,
SASLPassword: global.SASLPassword,
Brokers: append([]string(nil), global.Brokers...),
CompactTopicPartitions: global.CompactTopicPartitions,
CompactTopicReplicationFactor: global.CompactTopicReplicationFactor,
TLS: global.TLS,
TLSCAFile: global.TLSCAFile,
TLSCertFile: global.TLSCertFile,
TLSKeyFile: global.TLSKeyFile,
TLSServerName: global.TLSServerName,
SASLMechanism: global.SASLMechanism,
SASLUsername: global.SASLUsername,
SASLPassword: global.SASLPassword,
}
if cfg.CompactTopicPartitions <= 0 {
return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions)
}
if cfg.CompactTopicReplicationFactor <= 0 {
return ConnectionConfig{}, fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor)
}

if overrides != nil {
Expand All @@ -65,6 +76,16 @@ func ResolveConnectionConfig(global config.KafkaConfig, overrides map[string]int
} else if ok {
cfg.Brokers = brokers
}
if v, ok, err := intOverride(overrides["compact_topic_partitions"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
cfg.CompactTopicPartitions = v
}
if v, ok, err := intOverride(overrides["compact_topic_replication_factor"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
cfg.CompactTopicReplicationFactor = v
}
if v, ok, err := boolOverride(overrides["tls"]); err != nil {
return ConnectionConfig{}, err
} else if ok {
Expand Down Expand Up @@ -135,6 +156,18 @@ func validateConnectionConfig(cfg ConnectionConfig) error {
if len(cfg.Brokers) == 0 {
return fmt.Errorf("kafka brokers must not be empty")
}
if cfg.CompactTopicPartitions <= 0 {
return fmt.Errorf("kafka.compact_topic_partitions must be a positive integer, got %d", cfg.CompactTopicPartitions)
}
if cfg.CompactTopicPartitions > math.MaxInt32 {
return fmt.Errorf("kafka.compact_topic_partitions must be <= %d, got %d", math.MaxInt32, cfg.CompactTopicPartitions)
}
if cfg.CompactTopicReplicationFactor <= 0 {
return fmt.Errorf("kafka.compact_topic_replication_factor must be a positive integer, got %d", cfg.CompactTopicReplicationFactor)
}
if cfg.CompactTopicReplicationFactor > math.MaxInt16 {
return fmt.Errorf("kafka.compact_topic_replication_factor must be <= %d, got %d", math.MaxInt16, cfg.CompactTopicReplicationFactor)
}

if !cfg.TLS {
if cfg.TLSCAFile != "" || cfg.TLSCertFile != "" || cfg.TLSKeyFile != "" || cfg.TLSServerName != "" {
Expand Down Expand Up @@ -264,6 +297,29 @@ func boolOverride(value interface{}) (bool, bool, error) {
return v, true, nil
}

func intOverride(value interface{}) (int, bool, error) {
if value == nil {
return 0, false, nil
}
switch v := value.(type) {
case int:
return v, true, nil
case float64:
if math.IsNaN(v) || math.IsInf(v, 0) {
return 0, false, fmt.Errorf("expected integer Kafka config override, got non-finite float64 %v", v)
}
if v != math.Trunc(v) {
return 0, false, fmt.Errorf("expected integer Kafka config override, got non-integer float64 %v", v)
}
if v < math.MinInt32 || v > math.MaxInt32 {
return 0, false, fmt.Errorf("expected integer Kafka config override within [%d, %d], got float64 %v", math.MinInt32, math.MaxInt32, v)
}
return int(v), true, nil
default:
return 0, false, fmt.Errorf("expected int override, got %T", value)
}
}

func stringOverride(value interface{}) (string, bool, error) {
if value == nil {
return "", false, nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package kafka

import (
"math"
"os"
"path/filepath"
"reflect"
"strings"
"testing"

runtimeconfig "github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/config"
Expand All @@ -17,13 +19,15 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) {
}

global := runtimeconfig.KafkaConfig{
Brokers: []string{"broker-1:9092"},
TLS: true,
TLSCAFile: caPath,
TLSServerName: "global-kafka",
SASLMechanism: "plain",
SASLUsername: "global-user",
SASLPassword: "global-pass",
Brokers: []string{"broker-1:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLS: true,
TLSCAFile: caPath,
TLSServerName: "global-kafka",
SASLMechanism: "plain",
SASLUsername: "global-user",
SASLPassword: "global-pass",
}

resolved, err := ResolveConnectionConfig(global, map[string]interface{}{
Expand Down Expand Up @@ -57,7 +61,10 @@ func TestResolveConnectionConfig_MergesGlobalAndOverrides(t *testing.T) {
}

func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) {
resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{
resolved, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
}, map[string]interface{}{
"brokers": []interface{}{"broker:9092"},
"sasl_mechanism": "plain",
"sasl_username": " user-with-spaces ",
Expand All @@ -77,8 +84,10 @@ func TestResolveConnectionConfig_PreservesOpaqueCredentials(t *testing.T) {

func TestResolveConnectionConfig_RequiresTLSWhenTLSFilesAreConfigured(t *testing.T) {
_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
Brokers: []string{"broker:9092"},
TLSCAFile: "/tmp/ca.crt",
Brokers: []string{"broker:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLSCAFile: "/tmp/ca.crt",
}, nil)
if err == nil {
t.Fatalf("expected error when TLS files are set with TLS disabled")
Expand All @@ -93,26 +102,33 @@ func TestResolveConnectionConfig_ValidatesReadableTLSFiles(t *testing.T) {
}

_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
Brokers: []string{"broker:9092"},
TLS: true,
TLSCAFile: caPath,
Brokers: []string{"broker:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLS: true,
TLSCAFile: caPath,
}, nil)
if err != nil {
t.Fatalf("expected readable CA file to validate, got %v", err)
}

_, err = ResolveConnectionConfig(runtimeconfig.KafkaConfig{
Brokers: []string{"broker:9092"},
TLS: true,
TLSCAFile: filepath.Join(tempDir, "missing.crt"),
Brokers: []string{"broker:9092"},
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
TLS: true,
TLSCAFile: filepath.Join(tempDir, "missing.crt"),
}, nil)
if err == nil {
t.Fatalf("expected missing CA file to fail validation")
}
}

func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) {
_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{}, map[string]interface{}{
_, err := ResolveConnectionConfig(runtimeconfig.KafkaConfig{
CompactTopicPartitions: 1,
CompactTopicReplicationFactor: 1,
}, map[string]interface{}{
"brokers": []interface{}{"broker:9092"},
"sasl_mechanism": "scram-sha-512",
"sasl_username": "user",
Expand All @@ -121,3 +137,55 @@ func TestResolveConnectionConfig_RequiresSASLCredentials(t *testing.T) {
t.Fatalf("expected missing SASL password to fail validation")
}
}

func TestIntOverride_AcceptsIntegerFloat64(t *testing.T) {
got, ok, err := intOverride(float64(3))
if err != nil {
t.Fatalf("expected integer float64 override to succeed, got %v", err)
}
if !ok {
t.Fatalf("expected integer float64 override to be accepted")
}
if got != 3 {
t.Fatalf("expected integer float64 override to convert to 3, got %d", got)
}
}

func TestIntOverride_RejectsInvalidFloat64(t *testing.T) {
tests := []struct {
name string
value float64
wantErr string
}{
{
name: "non integer",
value: 3.5,
wantErr: "non-integer",
},
{
name: "out of bounds",
value: float64(math.MaxInt32) + 1,
wantErr: "within",
},
{
name: "non finite",
value: math.NaN(),
wantErr: "non-finite",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, ok, err := intOverride(tt.value)
if err == nil {
t.Fatalf("expected float64 override %v to fail", tt.value)
}
if ok {
t.Fatalf("expected invalid float64 override %v to be rejected", tt.value)
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Fatalf("expected error %q to contain %q", err.Error(), tt.wantErr)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,15 @@ func (e *KafkaBrokerDriver) EnsureTopics(ctx context.Context, topics []string) e

// EnsureCompactedTopic creates a compacted topic if it does not already exist.
func (e *KafkaBrokerDriver) EnsureCompactedTopic(ctx context.Context, topic string) error {
resp, err := e.admin.CreateTopics(ctx, 1, 1, map[string]*string{
"cleanup.policy": kadm.StringPtr("compact"),
}, topic)
resp, err := e.admin.CreateTopics(
ctx,
int32(e.cfg.CompactTopicPartitions),
int16(e.cfg.CompactTopicReplicationFactor),
map[string]*string{
"cleanup.policy": kadm.StringPtr("compact"),
},
topic,
)
if err != nil {
return fmt.Errorf("failed to create compacted topic %s: %w", topic, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ func (cm *ConsumerManager) createConsumer(groupID string, topics []string, callb
}

// consumerGroupID generates a unique, safe consumer group ID for a callback URL.
// Format: {prefix}-websub-{sha256(callbackURL)[:16]}
// Format: {prefix}-websub-{sha256(callbackURL)[:32]}
func (cm *ConsumerManager) consumerGroupID(callbackURL string) string {
h := sha256.Sum256([]byte(callbackURL))
return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:16]
return cm.groupPrefix + "-websub-" + hex.EncodeToString(h[:])[:32]
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}