Skip to content

Commit b25e144

Browse files
garyrussellartembilan
authored andcommitted
GH-563: Add default @KafkaHandler
Resolves #563 Invoke a default handler if no match on payload. Basically a copy of similar code in Spring AMQP.
1 parent 56db7ea commit b25e144

File tree

8 files changed

+248
-23
lines changed

8 files changed

+248
-23
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaHandler.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,4 +48,12 @@
4848
@Documented
4949
public @interface KafkaHandler {
5050

51+
/**
52+
* When true, designate that this is the default fallback method if the payload type
53+
* matches no other {@link KafkaHandler} method. Only one method can be so designated.
54+
* @return true if this is the default method.
55+
* @since 2.1.3
56+
*/
57+
boolean isDefault() default false;
58+
5159
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -333,12 +333,20 @@ private Set<KafkaListener> findListenerAnnotations(Method method) {
333333
private void processMultiMethodListeners(Collection<KafkaListener> classLevelListeners, List<Method> multiMethods,
334334
Object bean, String beanName) {
335335
List<Method> checkedMethods = new ArrayList<Method>();
336+
Method defaultMethod = null;
336337
for (Method method : multiMethods) {
337-
checkedMethods.add(checkProxy(method, bean));
338+
Method checked = checkProxy(method, bean);
339+
if (AnnotationUtils.findAnnotation(method, KafkaHandler.class).isDefault()) {
340+
final Method toAssert = defaultMethod;
341+
Assert.state(toAssert == null, () -> "Only one @KafkaHandler can be marked 'isDefault', found: "
342+
+ toAssert.toString() + " and " + method.toString());
343+
defaultMethod = checked;
344+
}
345+
checkedMethods.add(checked);
338346
}
339347
for (KafkaListener classLevelListener : classLevelListeners) {
340-
MultiMethodKafkaListenerEndpoint<K, V> endpoint = new MultiMethodKafkaListenerEndpoint<K, V>(checkedMethods,
341-
bean);
348+
MultiMethodKafkaListenerEndpoint<K, V> endpoint =
349+
new MultiMethodKafkaListenerEndpoint<K, V>(checkedMethods, defaultMethod, bean);
342350
endpoint.setBeanFactory(this.beanFactory);
343351
processListener(endpoint, classLevelListener, bean, bean.getClass(), beanName);
344352
}
@@ -680,7 +688,7 @@ private <T> Collection<T> getBeansOfType(Class<T> type) {
680688
*/
681689
private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
682690

683-
private DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
691+
private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
684692

685693
private MessageHandlerMethodFactory messageHandlerMethodFactory;
686694

spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import org.springframework.kafka.listener.adapter.DelegatingInvocableHandler;
2424
import org.springframework.kafka.listener.adapter.HandlerAdapter;
2525
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
26+
import org.springframework.lang.Nullable;
2627
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2728

2829
/**
@@ -41,20 +42,44 @@ public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerE
4142

4243
private final List<Method> methods;
4344

45+
private final Method defaultMethod;
46+
47+
/**
48+
* Construct an instance for the provided methods and bean with no default method.
49+
* @param methods the methods.
50+
* @param bean the bean.
51+
*/
4452
public MultiMethodKafkaListenerEndpoint(List<Method> methods, Object bean) {
53+
this(methods, null, bean);
54+
}
55+
56+
/**
57+
* Construct an instance for the provided methods, default method and bean.
58+
* @param methods the methods.
59+
* @param defaultMethod the default method.
60+
* @param bean the bean.
61+
* @since 2.1.3
62+
*/
63+
public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method defaultMethod, Object bean) {
4564
this.methods = methods;
65+
this.defaultMethod = defaultMethod;
4666
setBean(bean);
4767
}
4868

4969
@Override
5070
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
5171
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
72+
InvocableHandlerMethod defaultHandler = null;
5273
for (Method method : this.methods) {
53-
invocableHandlerMethods.add(getMessageHandlerMethodFactory()
54-
.createInvocableHandlerMethod(getBean(), method));
74+
InvocableHandlerMethod handler = getMessageHandlerMethodFactory()
75+
.createInvocableHandlerMethod(getBean(), method);
76+
invocableHandlerMethods.add(handler);
77+
if (method.equals(this.defaultMethod)) {
78+
defaultHandler = handler;
79+
}
5580
}
5681
DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods,
57-
getBean(), getResolver(), getBeanExpressionContext());
82+
defaultHandler, getBean(), getResolver(), getBeanExpressionContext());
5883
return new HandlerAdapter(delegatingHandler);
5984
}
6085

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.springframework.expression.common.TemplateParserContext;
3636
import org.springframework.expression.spel.standard.SpelExpressionParser;
3737
import org.springframework.kafka.KafkaException;
38+
import org.springframework.lang.Nullable;
3839
import org.springframework.messaging.Message;
3940
import org.springframework.messaging.handler.annotation.Header;
4041
import org.springframework.messaging.handler.annotation.Payload;
@@ -61,6 +62,8 @@ public class DelegatingInvocableHandler {
6162

6263
private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();
6364

65+
private final InvocableHandlerMethod defaultHandler;
66+
6467
private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new HashMap<>();
6568

6669
private final Object bean;
@@ -78,7 +81,23 @@ public class DelegatingInvocableHandler {
7881
*/
7982
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object bean,
8083
BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
84+
this(handlers, null, bean, beanExpressionResolver, beanExpressionContext);
85+
}
86+
87+
/**
88+
* Construct an instance with the supplied handlers for the bean.
89+
* @param handlers the handlers.
90+
* @param defaultHandler the default handler.
91+
* @param bean the bean.
92+
* @param beanExpressionResolver the resolver.
93+
* @param beanExpressionContext the context.
94+
* @since 2.1.3
95+
*/
96+
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
97+
@Nullable InvocableHandlerMethod defaultHandler,
98+
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
8199
this.handlers = new ArrayList<>(handlers);
100+
this.defaultHandler = defaultHandler;
82101
this.bean = bean;
83102
this.resolver = beanExpressionResolver;
84103
this.beanExpressionContext = beanExpressionContext;
@@ -174,13 +193,19 @@ protected InvocableHandlerMethod findHandlerForPayload(Class<? extends Object> p
174193
for (InvocableHandlerMethod handler : this.handlers) {
175194
if (matchHandlerMethod(payloadClass, handler)) {
176195
if (result != null) {
177-
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
178-
result.getMethod().getName() + " and " + handler.getMethod().getName());
196+
boolean resultIsDefault = result.equals(this.defaultHandler);
197+
if (!handler.equals(this.defaultHandler) && !resultIsDefault) {
198+
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
199+
result.getMethod().getName() + " and " + handler.getMethod().getName());
200+
}
201+
if (!resultIsDefault) {
202+
continue; // otherwise replace the result with the actual match
203+
}
179204
}
180205
result = handler;
181206
}
182207
}
183-
return result;
208+
return result != null ? result : this.defaultHandler;
184209
}
185210

186211
protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, InvocableHandlerMethod handler) {
@@ -221,4 +246,8 @@ public String getMethodNameFor(Object payload) {
221246
return handlerForPayload == null ? "no match" : handlerForPayload.getMethod().toGenericString(); //NOSONAR
222247
}
223248

249+
public boolean hasDefaultHandler() {
250+
return this.defaultHandler != null;
251+
}
252+
224253
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2016 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -47,6 +47,13 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
4747
if (this.invokerHandlerMethod != null) {
4848
return this.invokerHandlerMethod.invoke(message, providedArgs);
4949
}
50+
else if (this.delegatingHandler.hasDefaultHandler()) {
51+
// Needed to avoid returning raw Message which matches Object
52+
Object[] args = new Object[providedArgs.length + 1];
53+
args[0] = message.getPayload();
54+
System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);
55+
return this.delegatingHandler.invoke(message, args);
56+
}
5057
else {
5158
return this.delegatingHandler.invoke(message, providedArgs);
5259
}

0 commit comments

Comments
 (0)