Skip to content

feat(kafka): add kafka channel bindings #1388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ public class KafkaChannelTopicConfiguration {
*/
@Min(value = -1, message = "retention.ms must be greater or equals to -1")
@JsonProperty("retention.ms")
private Integer retentionMs;
private Long retentionMs;

/**
Copy link
Preview

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the field type from Integer to Long is a breaking change that could affect existing consumers of this API. Consider maintaining backward compatibility or documenting this breaking change.

Suggested change
/**
/**
* Deprecated getter for backward compatibility. Will be removed in future versions.
* @deprecated Use {@link #getRetentionMsLong()} instead.
*/
@Deprecated
public Integer getRetentionMs() {
return retentionMs != null ? retentionMs.intValue() : null;
}
/**
* Preferred getter for retentionMs as Long.
*/
public Long getRetentionMsLong() {
return retentionMs;
}
/**
* Deprecated setter for backward compatibility. Will be removed in future versions.
* @deprecated Use {@link #setRetentionMs(Long)} instead.
*/
@Deprecated
@JsonSetter("retention.ms")
public void setRetentionMs(Integer retentionMs) {
this.retentionMs = retentionMs != null ? retentionMs.longValue() : null;
}
/**
* Preferred setter for retentionMs as Long.
*/
@JsonSetter("retention.ms")
public void setRetentionMs(Long retentionMs) {
this.retentionMs = retentionMs;
}
/**

Copilot uses AI. Check for mistakes.

* The <a href="https://kafka.apache.org/documentation/#topicconfigs_retention.bytes">retention.bytes</a>
* configuration option.
*/
@Min(value = -1, message = "retention.bytes must be greater or equals to -1")
@JsonProperty("retention.bytes")
private Integer retentionBytes;
private Long retentionBytes;

/**
* The <a href="https://kafka.apache.org/documentation/#topicconfigs_delete.retention.ms">delete.retention.ms</a>
* configuration option.
*/
@PositiveOrZero
@JsonProperty("delete.retention.ms")
private Integer deleteRetentionMs;
private Long deleteRetentionMs;

/**
* The <a href="https://kafka.apache.org/documentation/#topicconfigs_max.message.bytes">max.message.bytes</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ void shouldSerializeKafkaChannel() throws IOException {
.cleanupPolicy(List.of(
KafkaChannelTopicCleanupPolicy.DELETE,
KafkaChannelTopicCleanupPolicy.COMPACT))
.retentionMs(604800000)
.retentionBytes(1000000000)
.deleteRetentionMs(86400000)
.retentionMs(604800000L)
.retentionBytes(1000000000L)
.deleteRetentionMs(86400000L)
.maxMessageBytes(1048588)
.build())
.build()))
Expand All @@ -111,9 +111,9 @@ void shouldSerializeKafkaTopic() throws IOException {
KafkaChannelTopicConfiguration.builder()
.cleanupPolicy(
List.of(KafkaChannelTopicCleanupPolicy.DELETE, KafkaChannelTopicCleanupPolicy.COMPACT))
.retentionMs(604800000)
.retentionBytes(1000000000)
.deleteRetentionMs(86400000)
.retentionMs(604800000L)
.retentionBytes(1000000000L)
.deleteRetentionMs(86400000L)
.maxMessageBytes(1048588)
.confluentKeySchemaValidation(true)
.confluentKeySubjectNameStrategy("TopicNameStrategy")
Expand Down
2 changes: 2 additions & 0 deletions springwolf-bindings/springwolf-kafka-binding/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ dependencies {
implementation libs.spring.core
implementation libs.spring.boot.autoconfigure

implementation libs.commons.lang3

implementation libs.jakarta.annotation.api

compileOnly libs.lombok
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.bindings.kafka.annotations;

import io.github.springwolf.core.asyncapi.annotations.AsyncChannelBinding;
import io.github.springwolf.core.asyncapi.annotations.AsyncListener;
import io.github.springwolf.core.asyncapi.annotations.AsyncPublisher;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* {@code @KafkaAsyncChannelBinding} is a method-level annotation used in combination with {@link AsyncPublisher} or @{@link AsyncListener}.
* It configures the channel binding for the Kafka protocol.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(value = {ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@AsyncChannelBinding
@Inherited
public @interface KafkaAsyncChannelBinding {

String topic() default "";

int partitions() default VALUE_NOT_SET;

int replicas() default VALUE_NOT_SET;

KafkaChannelTopicConfiguration topicConfiguration() default @KafkaChannelTopicConfiguration();

@Retention(RetentionPolicy.CLASS)
@Target({})
@interface KafkaChannelTopicConfiguration {

CleanupPolicy[] cleanup() default {};

long retentionMs() default VALUE_NOT_SET;

long retentionBytes() default VALUE_NOT_SET;

long deleteRetentionMs() default VALUE_NOT_SET;

int maxMessageBytes() default VALUE_NOT_SET;

enum CleanupPolicy {
COMPACT,
DELETE,
}
}

int VALUE_NOT_SET = Integer.MIN_VALUE;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.bindings.kafka.configuration;

import io.github.springwolf.bindings.kafka.scanners.channels.KafkaChannelBindingProcessor;
import io.github.springwolf.bindings.kafka.scanners.messages.KafkaMessageBindingProcessor;
import io.github.springwolf.bindings.kafka.scanners.operations.KafkaOperationBindingProcessor;
import io.github.springwolf.core.asyncapi.scanners.bindings.BindingProcessorPriority;
Expand All @@ -21,6 +22,13 @@
@StandaloneConfiguration
public class SpringwolfKafkaBindingAutoConfiguration {

@Bean
@Order(value = BindingProcessorPriority.PROTOCOL_BINDING)
@ConditionalOnMissingBean
public KafkaChannelBindingProcessor kafkaChannelBindingProcessor(StringValueResolver stringValueResolver) {
return new KafkaChannelBindingProcessor(stringValueResolver);
}

@Bean
@Order(value = BindingProcessorPriority.PROTOCOL_BINDING)
@ConditionalOnMissingBean
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.bindings.kafka.scanners.channels;

import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelBinding;
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicCleanupPolicy;
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicConfiguration;
import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncChannelBinding;
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.AbstractChannelBindingProcessor;
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.ProcessedChannelBinding;
import org.apache.commons.lang3.StringUtils;
import org.springframework.util.StringValueResolver;

import java.util.Arrays;

public class KafkaChannelBindingProcessor extends AbstractChannelBindingProcessor<KafkaAsyncChannelBinding> {

public KafkaChannelBindingProcessor(StringValueResolver stringValueResolver) {
super(stringValueResolver);
}

protected ProcessedChannelBinding mapToChannelBinding(KafkaAsyncChannelBinding bindingAnnotation) {
KafkaChannelBinding.KafkaChannelBindingBuilder bindingBuilder = KafkaChannelBinding.builder();
if (StringUtils.isNotBlank(bindingAnnotation.topic())) {
bindingBuilder.topic(resolveOrNull(bindingAnnotation.topic()));
}
if (bindingAnnotation.partitions() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
bindingBuilder.partitions(bindingAnnotation.partitions());
}
if (bindingAnnotation.replicas() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
bindingBuilder.replicas(bindingAnnotation.replicas());
}
bindingBuilder.topicConfiguration(mapToTopicConfiguration(bindingAnnotation));

return new ProcessedChannelBinding("kafka", bindingBuilder.build());
}

private KafkaChannelTopicConfiguration mapToTopicConfiguration(KafkaAsyncChannelBinding bindingAnnotation) {
KafkaChannelTopicConfiguration.KafkaChannelTopicConfigurationBuilder topicConfiguration =
KafkaChannelTopicConfiguration.builder();

if (bindingAnnotation.topicConfiguration().cleanup().length > 0) {
topicConfiguration.cleanupPolicy(
Arrays.stream(bindingAnnotation.topicConfiguration().cleanup())
.map(this::toKafkaChannelTopicCleanupPolicy)
.toList());
}

if (bindingAnnotation.topicConfiguration().retentionMs() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
topicConfiguration.retentionMs(
bindingAnnotation.topicConfiguration().retentionMs());
}
if (bindingAnnotation.topicConfiguration().retentionBytes() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
topicConfiguration.retentionBytes(
bindingAnnotation.topicConfiguration().retentionBytes());
}
if (bindingAnnotation.topicConfiguration().deleteRetentionMs() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
topicConfiguration.deleteRetentionMs(
bindingAnnotation.topicConfiguration().deleteRetentionMs());
}
if (bindingAnnotation.topicConfiguration().maxMessageBytes() != KafkaAsyncChannelBinding.VALUE_NOT_SET) {
topicConfiguration.maxMessageBytes(
bindingAnnotation.topicConfiguration().maxMessageBytes());
}

KafkaChannelTopicConfiguration buildTopicConfiguration = topicConfiguration.build();
if (KafkaChannelTopicConfiguration.builder().build().equals(buildTopicConfiguration)) {
Copy link
Preview

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a new builder instance and building it on every call is inefficient. Consider creating a static final empty configuration instance for comparison.

Suggested change
if (KafkaChannelTopicConfiguration.builder().build().equals(buildTopicConfiguration)) {
if (EMPTY_TOPIC_CONFIGURATION.equals(buildTopicConfiguration)) {

Copilot uses AI. Check for mistakes.

return null;
}
return buildTopicConfiguration;
}

private KafkaChannelTopicCleanupPolicy toKafkaChannelTopicCleanupPolicy(
KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration.CleanupPolicy cleanupType) {
return switch (cleanupType) {
case COMPACT -> KafkaChannelTopicCleanupPolicy.COMPACT;
case DELETE -> KafkaChannelTopicCleanupPolicy.DELETE;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.bindings.kafka.scanners.channels;

import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelBinding;
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicCleanupPolicy;
import io.github.springwolf.asyncapi.v3.bindings.kafka.KafkaChannelTopicConfiguration;
import io.github.springwolf.bindings.kafka.annotations.KafkaAsyncChannelBinding;
import io.github.springwolf.core.asyncapi.scanners.bindings.channels.ProcessedChannelBinding;
import org.junit.jupiter.api.Test;
import org.springframework.util.StringValueResolver;

import java.lang.reflect.Method;
import java.util.List;
import java.util.Optional;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class KafkaChannelBindingProcessorTest {
private final StringValueResolver stringValueResolver = mock();
Copy link
Preview

Copilot AI Aug 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mock() method should specify the class being mocked for clarity: mock(StringValueResolver.class).

Suggested change
private final StringValueResolver stringValueResolver = mock();
private final StringValueResolver stringValueResolver = mock(StringValueResolver.class);

Copilot uses AI. Check for mistakes.

private final KafkaChannelBindingProcessor processor = new KafkaChannelBindingProcessor(stringValueResolver);

@Test
void processTest() throws NoSuchMethodException {
Method method = KafkaChannelBindingProcessorTest.class.getMethod("methodWithAnnotation");

ProcessedChannelBinding binding = processor.process(method).get();

assertThat(binding.getType()).isEqualTo("kafka");
assertThat(binding.getBinding()).isEqualTo(new KafkaChannelBinding());
}

@Test
void processWithoutAnnotationTest() throws NoSuchMethodException {
Method method = KafkaChannelBindingProcessorTest.class.getMethod("methodWithoutAnnotation");

Optional<ProcessedChannelBinding> binding = processor.process(method);

assertThat(binding).isNotPresent();
}

@Test
void processTestWithFullConfiguration() throws NoSuchMethodException {
when(stringValueResolver.resolveStringValue("test-topic")).thenReturn("resolved-test-topic");

Method method = KafkaChannelBindingProcessorTest.class.getMethod("methodWithFullConfiguration");

ProcessedChannelBinding binding = processor.process(method).get();

assertThat(binding.getType()).isEqualTo("kafka");
assertThat(binding.getBinding())
.isEqualTo(KafkaChannelBinding.builder()
.topic("resolved-test-topic")
.partitions(3)
.replicas(2)
.topicConfiguration(KafkaChannelTopicConfiguration.builder()
.cleanupPolicy(List.of(
KafkaChannelTopicCleanupPolicy.COMPACT, KafkaChannelTopicCleanupPolicy.DELETE))
.retentionMs(86400000L)
.retentionBytes(-1L)
.deleteRetentionMs(86400000L)
.maxMessageBytes(1048588)
.build())
.build());
}

@KafkaAsyncChannelBinding
public void methodWithAnnotation() {}

public void methodWithoutAnnotation() {}

@KafkaAsyncChannelBinding(
topic = "test-topic",
partitions = 3,
replicas = 2,
topicConfiguration =
@KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration(
cleanup = {
KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration.CleanupPolicy.COMPACT,
KafkaAsyncChannelBinding.KafkaChannelTopicConfiguration.CleanupPolicy.DELETE
},
retentionMs = 86400000L,
retentionBytes = -1L,
deleteRetentionMs = 86400000L,
maxMessageBytes = 1048588))
public void methodWithFullConfiguration() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* {@code @AsyncChannelBinding} is a meta-annotation used to identify Channel Binding annotations.
* </p>
* The annotations annotated with {@code @AsyncChannelBinding} are intended to provide the Channel Bindings
* Object documentation. Those implementations are usually available in its own plugin, like {@code springwolf-kafka-plugin}
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(value = {ElementType.ANNOTATION_TYPE})
@Inherited
public @interface AsyncChannelBinding {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// SPDX-License-Identifier: Apache-2.0
package io.github.springwolf.core.asyncapi.scanners.bindings.channels;

import io.github.springwolf.core.asyncapi.annotations.AsyncChannelBinding;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;

import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.ParameterizedType;
import java.util.Arrays;
import java.util.Optional;
import java.util.stream.Stream;

@Slf4j
@RequiredArgsConstructor
public abstract class AbstractChannelBindingProcessor<A> implements ChannelBindingProcessor {

private final StringValueResolver stringValueResolver;

private final Class<A> specificAnnotationClazz =
(Class<A>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];

@Override
public Optional<ProcessedChannelBinding> process(AnnotatedElement annotatedElement) {
return Arrays.stream(annotatedElement.getAnnotations())
.filter(annotation -> annotation.annotationType().isAnnotationPresent(AsyncChannelBinding.class))
.flatMap(this::tryCast)
.findAny()
.map(this::mapToChannelBinding);
}

/**
* Attempt to cast the annotation to the specific annotation
* <p>
* Casting might fail, when multiple, different binding annotations are used,
* which results in an (expected) exception.
* <p>
* If there is an option to previously test casting without casting, then lets change the code here.
*/
private Stream<A> tryCast(Annotation obj) {
try {
return Stream.of(specificAnnotationClazz.cast(obj));
} catch (ClassCastException ex) {
log.trace("Method has multiple bindings defined.", ex);
}
return Stream.empty();
}

protected abstract ProcessedChannelBinding mapToChannelBinding(A bindingAnnotation);

protected String resolveOrNull(String stringValue) {
return StringUtils.hasText(stringValue) ? stringValueResolver.resolveStringValue(stringValue) : null;
}
}
Loading
Loading