Skip to content
Open
33 changes: 11 additions & 22 deletions event-gateway/gateway-runtime/cmd/event-gateway/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,18 @@ import (
// 3. Add bindings in channels.yaml — no changes to main.go or runtime needed
func registerConnectors(registry *connectors.Registry, cfg *config.Config) {
registry.RegisterBrokerDriver("kafka", func(brokerDriverCfg map[string]interface{}) (connectors.BrokerDriver, error) {
brokers := cfg.Kafka.Brokers // fallback to global config
if brokerDriverCfg != nil {
if b, ok := brokerDriverCfg["brokers"]; ok {
switch v := b.(type) {
case []interface{}:
parsed := make([]string, 0, len(v))
for _, item := range v {
if s, ok := item.(string); ok {
parsed = append(parsed, s)
}
}
if len(parsed) > 0 {
brokers = parsed
}
case []string:
if len(v) > 0 {
brokers = v
}
}
}
defaults := kafka.ConnectionConfig{
Brokers: cfg.Kafka.Brokers,
TLS: cfg.Kafka.TLS,
SASLMechanism: cfg.Kafka.SASLMechanism,
SASLUsername: cfg.Kafka.SASLUsername,
SASLPassword: cfg.Kafka.SASLPassword,
}
return kafka.NewBrokerDriver(brokers)
resolved, err := kafka.ResolveConnectionConfig(defaults, brokerDriverCfg)
if err != nil {
return nil, err
}
return kafka.NewBrokerDriver(resolved)
})

registry.RegisterReceiver("websub", func(ecfg connectors.ReceiverConfig) (connectors.Receiver, error) {
Expand All @@ -73,7 +63,6 @@ func registerConnectors(registry *connectors.Registry, cfg *config.Config) {
DeliveryConcurrency: cfg.WebSub.DeliveryConcurrency,
RuntimeID: cfg.RuntimeID,
ConsumerGroupPrefix: cfg.Kafka.ConsumerGroupPrefix,
Brokers: cfg.Kafka.Brokers,
})
})

Expand Down
5 changes: 5 additions & 0 deletions event-gateway/gateway-runtime/configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ metrics_port = 9003
# Default Kafka brokers. Channels can override with broker-driver.config.brokers.
brokers = ["localhost:9092"]
consumer_group_prefix = "event-gateway"
tls = false
# Supported values: plain, scram-sha-256, scram-sha-512
# sasl_mechanism = ""
# sasl_username = ""
# sasl_password = ""

[websub]
verification_timeout_seconds = 10
Expand Down
40 changes: 36 additions & 4 deletions event-gateway/gateway-runtime/internal/binding/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func TestWebSubApiTopicName(t *testing.T) {
{"repo-watcher", "v1", "issues", "repo-watcher.v1.issues"},
{"repo-watcher", "v1", "pull-requests", "repo-watcher.v1.pull-requests"},
{"order-api", "v2", "orders", "order-api.v2.orders"},
{"repo/watcher", "v1", "/api/er3", "repo_2f_watcher.v1._2f_api_2f_er3"},
{"repo_watcher", "v1/test", "pull_requests", "repo__watcher.v1_2f_test.pull__requests"},
}

for _, tt := range tests {
Expand All @@ -202,10 +204,40 @@ func TestWebSubApiTopicName(t *testing.T) {
}

func TestWebSubApiSubscriptionTopic(t *testing.T) {
got := WebSubApiSubscriptionTopic("repo-watcher", "v1")
expected := "repo-watcher.v1.__subscriptions"
if got != expected {
t.Errorf("WebSubApiSubscriptionTopic = %q, want %q", got, expected)
tests := []struct {
apiName string
version string
expected string
}{
{"repo-watcher", "v1", "repo-watcher.v1.__subscriptions"},
{"repo/watcher", "v1/test", "repo_2f_watcher.v1_2f_test.__subscriptions"},
}

for _, tt := range tests {
got := WebSubApiSubscriptionTopic(tt.apiName, tt.version)
if got != tt.expected {
t.Errorf("WebSubApiSubscriptionTopic(%q, %q) = %q, want %q",
tt.apiName, tt.version, got, tt.expected)
}
}
}

func TestNormalizeTopicSegment(t *testing.T) {
tests := []struct {
input string
expected string
}{
{"issues", "issues"},
{"/api/er3", "_2f_api_2f_er3"},
{"pull_requests", "pull__requests"},
{"v1/test", "v1_2f_test"},
{"topic#42", "topic_23_42"},
}

for _, tt := range tests {
if got := NormalizeTopicSegment(tt.input); got != tt.expected {
t.Errorf("NormalizeTopicSegment(%q) = %q, want %q", tt.input, got, tt.expected)
}
}
}

Expand Down
48 changes: 44 additions & 4 deletions event-gateway/gateway-runtime/internal/binding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package binding

import (
"fmt"
"path"
"strings"
)
Expand Down Expand Up @@ -97,15 +98,19 @@ type ChannelsConfig struct {
}

// WebSubApiTopicName derives a Kafka topic name for a WebSubApi channel.
// Format: {api-name}.{version}.{channel-name}
// Format: {normalized-api-name}.{normalized-version}.{normalized-channel-name}
// The logical WebSub channel name remains unchanged elsewhere; only the broker topic is normalized.
func WebSubApiTopicName(apiName, version, channelName string) string {
return apiName + "." + version + "." + channelName
return NormalizeTopicSegment(apiName) + "." +
NormalizeTopicSegment(version) + "." +
NormalizeTopicSegment(channelName)
}

// WebSubApiSubscriptionTopic derives the internal subscription sync topic for a WebSubApi.
// Format: {api-name}.{version}.__subscriptions
// Format: {normalized-api-name}.{normalized-version}.__subscriptions
func WebSubApiSubscriptionTopic(apiName, version string) string {
return apiName + "." + version + ".__subscriptions"
return NormalizeTopicSegment(apiName) + "." +
NormalizeTopicSegment(version) + ".__subscriptions"
}

// WebSubApiBasePath derives the shared WebSub HTTP base path for an API.
Expand Down Expand Up @@ -146,3 +151,38 @@ func ensureLeadingSlash(value string) string {
}
return "/" + value
}

// NormalizeTopicSegment converts a logical topic segment to a Kafka-safe name.
// It uses an escape format so unsupported characters do not collide with
// already-valid names:
// - [A-Za-z0-9.-] pass through unchanged
// - '_' becomes '__'
// - everything else becomes '_%x_' (for example '/' -> '_2f_')
func NormalizeTopicSegment(value string) string {
value = strings.TrimSpace(value)
if value == "" {
return ""
}

var normalized strings.Builder
normalized.Grow(len(value))

for _, r := range value {
switch {
case r >= 'a' && r <= 'z':
normalized.WriteRune(r)
case r >= 'A' && r <= 'Z':
normalized.WriteRune(r)
case r >= '0' && r <= '9':
normalized.WriteRune(r)
case r == '.', r == '-':
normalized.WriteRune(r)
case r == '_':
normalized.WriteString("__")
default:
normalized.WriteString(fmt.Sprintf("_%x_", r))
}
}

return normalized.String()
}
12 changes: 12 additions & 0 deletions event-gateway/gateway-runtime/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/knadh/koanf/providers/env"
"github.com/knadh/koanf/providers/file"
"github.com/knadh/koanf/v2"
"github.com/wso2/api-platform/event-gateway/gateway-runtime/internal/connectors/brokerdriver/kafka"
)

// Config is the top-level runtime configuration for the event gateway.
Expand Down Expand Up @@ -108,6 +109,7 @@ func DefaultConfig() *Config {
Kafka: KafkaConfig{
Brokers: []string{"localhost:9092"},
ConsumerGroupPrefix: "event-gateway",
TLS: false,
},
WebSub: WebSubConfig{
VerificationTimeoutSeconds: 10,
Expand Down Expand Up @@ -263,6 +265,16 @@ func validate(cfg *Config) error {
}
}

if err := kafka.ValidateConnectionConfig(kafka.ConnectionConfig{
Brokers: cfg.Kafka.Brokers,
TLS: cfg.Kafka.TLS,
SASLMechanism: cfg.Kafka.SASLMechanism,
SASLUsername: cfg.Kafka.SASLUsername,
SASLPassword: cfg.Kafka.SASLPassword,
}); err != nil {
return err
}

switch cfg.Logging.Level {
case "", "debug", "info", "warn", "error":
default:
Expand Down
115 changes: 115 additions & 0 deletions event-gateway/gateway-runtime/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ func TestLoadAppliesEnvironmentOverrides(t *testing.T) {
t.Setenv("APIP_EGW_SERVER_WEBSUB_TLS_CERT_FILE", certPath)
t.Setenv("APIP_EGW_SERVER_WEBSUB_TLS_KEY_FILE", keyPath)
t.Setenv("APIP_EGW_KAFKA_BROKERS", "kafka-1:9092,kafka-2:9092")
t.Setenv("APIP_EGW_KAFKA_TLS", "true")
t.Setenv("APIP_EGW_KAFKA_SASL_MECHANISM", "scram-sha-512")
t.Setenv("APIP_EGW_KAFKA_SASL_USERNAME", "env-user")
t.Setenv("APIP_EGW_KAFKA_SASL_PASSWORD", "env-pass")
t.Setenv("APIP_EGW_CONTROLPLANE_ENABLED", "true")
t.Setenv("APIP_EGW_CONTROLPLANE_XDS_ADDRESS", "xds:18001")
t.Setenv("APIP_EGW_POLICY_ENGINE_CONFIG_FILE", "/tmp/policies.toml")
Expand All @@ -40,6 +44,9 @@ websub_https_port = 8443

[kafka]
brokers = ["localhost:9092"]
sasl_mechanism = "plain"
sasl_username = "file-user"
sasl_password = "file-pass"

[controlplane]
enabled = false
Expand Down Expand Up @@ -75,6 +82,18 @@ enabled = false
if !reflect.DeepEqual(cfg.Kafka.Brokers, wantBrokers) {
t.Fatalf("expected brokers %v, got %v", wantBrokers, cfg.Kafka.Brokers)
}
if !cfg.Kafka.TLS {
t.Fatalf("expected kafka TLS to be enabled")
}
if cfg.Kafka.SASLMechanism != "scram-sha-512" {
t.Fatalf("expected kafka sasl mechanism scram-sha-512, got %q", cfg.Kafka.SASLMechanism)
}
if cfg.Kafka.SASLUsername != "env-user" {
t.Fatalf("expected kafka sasl username env-user, got %q", cfg.Kafka.SASLUsername)
}
if cfg.Kafka.SASLPassword != "env-pass" {
t.Fatalf("expected kafka sasl password env-pass, got %q", cfg.Kafka.SASLPassword)
}

if !cfg.ControlPlane.Enabled {
t.Fatalf("expected control plane to be enabled")
Expand Down Expand Up @@ -157,6 +176,102 @@ level = "trace"
}
}

func TestLoadKafkaSecurityFromConfigFile(t *testing.T) {
configPath := filepath.Join(t.TempDir(), "config.toml")
if err := os.WriteFile(configPath, []byte(`
[kafka]
brokers = ["secure-kafka:9093"]
tls = true
sasl_mechanism = "plain"
sasl_username = "file-user"
sasl_password = "file-pass"
`), 0o644); err != nil {
t.Fatalf("write config: %v", err)
}

cfg, _, err := Load(configPath)
if err != nil {
t.Fatalf("load config: %v", err)
}

if !reflect.DeepEqual(cfg.Kafka.Brokers, []string{"secure-kafka:9093"}) {
t.Fatalf("unexpected kafka brokers: %v", cfg.Kafka.Brokers)
}
if !cfg.Kafka.TLS {
t.Fatalf("expected kafka TLS to be enabled")
}
if cfg.Kafka.SASLMechanism != "plain" || cfg.Kafka.SASLUsername != "file-user" || cfg.Kafka.SASLPassword != "file-pass" {
t.Fatalf("unexpected kafka security config: %+v", cfg.Kafka)
}
}

func TestLoadRejectsInvalidKafkaSASLConfig(t *testing.T) {
tests := []struct {
name string
configBody string
errContains string
}{
{
name: "invalid mechanism",
configBody: `
[kafka]
brokers = ["kafka:9092"]
sasl_mechanism = "oauth"
sasl_username = "user"
sasl_password = "pass"
`,
errContains: "must be one of",
},
{
name: "credentials without mechanism",
configBody: `
[kafka]
brokers = ["kafka:9092"]
sasl_username = "user"
sasl_password = "pass"
`,
errContains: "require sasl_mechanism",
},
{
name: "mechanism without username",
configBody: `
[kafka]
brokers = ["kafka:9092"]
sasl_mechanism = "scram-sha-256"
sasl_password = "pass"
`,
errContains: "requires both",
},
{
name: "mechanism without password",
configBody: `
[kafka]
brokers = ["kafka:9092"]
sasl_mechanism = "scram-sha-512"
sasl_username = "user"
`,
errContains: "requires both",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
configPath := filepath.Join(t.TempDir(), "config.toml")
if err := os.WriteFile(configPath, []byte(tt.configBody), 0o644); err != nil {
t.Fatalf("write config: %v", err)
}

_, _, err := Load(configPath)
if err == nil {
t.Fatalf("expected load to fail")
}
if !strings.Contains(err.Error(), tt.errContains) {
t.Fatalf("expected error containing %q, got %q", tt.errContains, err.Error())
}
})
}
}

func TestLoadRejectsMissingTLSFilesWhenEnabled(t *testing.T) {
configPath := filepath.Join(t.TempDir(), "config.toml")
missingCert := filepath.Join(t.TempDir(), "missing.crt")
Expand Down
Loading
Loading