Skip to content

Commit 904013c

Browse files
author
Daniel Bustamante Ospina
authored
Merge pull request #86 from reactive-commons/feature/eda
Feature/eda
2 parents 2dc8443 + 925c059 commit 904013c

File tree

64 files changed

+3176
-55
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+3176
-55
lines changed

async/async-commons-api/async-commons-api.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ dependencies {
77
api project(':domain-events-api')
88
compileOnly 'io.projectreactor:reactor-core'
99
testImplementation 'io.projectreactor:reactor-test'
10+
implementation 'io.cloudevents:cloudevents-json-jackson:2.5.0'
1011
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,25 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import reactor.core.publisher.Mono;
56

67
public interface DirectAsyncGateway {
78
<T> Mono<Void> sendCommand(Command<T> command, String targetName);
9+
10+
<T> Mono<Void> sendCommand(Command<T> command, String targetName, String domain);
11+
12+
Mono<Void> sendCommand(CloudEvent command, String targetName);
13+
14+
Mono<Void> sendCommand(CloudEvent command, String targetName, String domain);
15+
816
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type);
17+
18+
<T, R> Mono<R> requestReply(AsyncQuery<T> query, String targetName, Class<R> type, String domain);
19+
20+
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type);
21+
22+
<R extends CloudEvent> Mono<R> requestReply(CloudEvent query, String targetName, Class<R> type, String domain);
23+
924
<T> Mono<Void> reply(T response, From from);
1025
}

async/async-commons-api/src/main/java/org/reactivecommons/async/api/HandlerRegistry.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.reactivecommons.async.api;
22

3+
import io.cloudevents.CloudEvent;
4+
import io.cloudevents.core.provider.EventFormatProvider;
5+
import io.cloudevents.jackson.JsonFormat;
36
import lombok.AccessLevel;
47
import lombok.Getter;
58
import lombok.NoArgsConstructor;
@@ -13,24 +16,36 @@
1316

1417
import java.lang.reflect.ParameterizedType;
1518
import java.util.List;
19+
import java.util.Map;
20+
import java.util.concurrent.ConcurrentHashMap;
1621
import java.util.concurrent.CopyOnWriteArrayList;
1722

1823
@Getter
1924
@NoArgsConstructor(access = AccessLevel.PACKAGE)
2025
public class HandlerRegistry {
21-
22-
private final List<RegisteredEventListener<?>> eventListeners = new CopyOnWriteArrayList<>();
26+
public static final String DEFAULT_DOMAIN = "app";
27+
private final Map<String, List<RegisteredEventListener<?>>> domainEventListeners = new ConcurrentHashMap<>();
2328
private final List<RegisteredEventListener<?>> dynamicEventHandlers = new CopyOnWriteArrayList<>();
2429
private final List<RegisteredEventListener<?>> eventNotificationListener = new CopyOnWriteArrayList<>();
2530
private final List<RegisteredQueryHandler<?, ?>> handlers = new CopyOnWriteArrayList<>();
2631
private final List<RegisteredCommandHandler<?>> commandHandlers = new CopyOnWriteArrayList<>();
2732

33+
2834
public static HandlerRegistry register() {
29-
return new HandlerRegistry();
35+
HandlerRegistry instance = new HandlerRegistry();
36+
instance.domainEventListeners.put(DEFAULT_DOMAIN, new CopyOnWriteArrayList<>());
37+
return instance;
38+
}
39+
40+
public <T> HandlerRegistry listenDomainEvent(String domain, String eventName, EventHandler<T> handler, Class<T> eventClass) {
41+
domainEventListeners.computeIfAbsent(domain, ignored -> new CopyOnWriteArrayList<>())
42+
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
43+
return this;
3044
}
3145

3246
public <T> HandlerRegistry listenEvent(String eventName, EventHandler<T> handler, Class<T> eventClass) {
33-
eventListeners.add(new RegisteredEventListener<>(eventName, handler, eventClass));
47+
domainEventListeners.computeIfAbsent(DEFAULT_DOMAIN, ignored -> new CopyOnWriteArrayList<>())
48+
.add(new RegisteredEventListener<>(eventName, handler, eventClass));
3449
return this;
3550
}
3651

@@ -67,7 +82,21 @@ public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> han
6782
}
6883

6984
public <T, R> HandlerRegistry serveQuery(String resource, QueryHandler<T, R> handler, Class<R> queryClass) {
70-
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
85+
if(queryClass == CloudEvent.class){
86+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) ->
87+
{
88+
CloudEvent query = EventFormatProvider
89+
.getInstance()
90+
.resolveFormat(JsonFormat.CONTENT_TYPE)
91+
.deserialize(message);
92+
93+
return handler.handle((R) query);
94+
95+
} , byte[].class));
96+
}
97+
else{
98+
handlers.add(new RegisteredQueryHandler<>(resource, (ignored, message) -> handler.handle(message), queryClass));
99+
}
71100
return this;
72101
}
73102

async/async-commons-api/src/test/java/org/reactivecommons/async/api/HandlerRegistryTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static org.assertj.core.api.Assertions.assertThat;
1818
import static org.mockito.Mockito.mock;
19+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
1920

2021
class HandlerRegistryTest {
2122
private final HandlerRegistry registry = HandlerRegistry.register();
@@ -27,7 +28,7 @@ void shouldListenEventWithTypeInferenceWhenClassInstanceIsUsed() {
2728

2829
registry.listenEvent(name, eventHandler);
2930

30-
assertThat(registry.getEventListeners())
31+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
3132
.anySatisfy(registered -> assertThat(registered)
3233
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
3334
.containsExactly(name, SomeDataClass.class, eventHandler)).hasSize(1);
@@ -43,7 +44,7 @@ void shouldRegisterPatternEventHandlerWithTypeInference() {
4344
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
4445
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
4546

46-
assertThat(registry.getEventListeners())
47+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
4748
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
4849
.usingRecursiveComparison()
4950
.isEqualTo(expectedRegisteredEventListener));
@@ -62,7 +63,7 @@ void shouldRegisterPatternEventHandler() {
6263
RegisteredEventListener<SomeDataClass> expectedRegisteredEventListener =
6364
new RegisteredEventListener<>(eventNamePattern, eventHandler, SomeDataClass.class);
6465

65-
assertThat(registry.getEventListeners())
66+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
6667
.anySatisfy(registeredEventListener -> assertThat(registeredEventListener)
6768
.usingRecursiveComparison()
6869
.isEqualTo(expectedRegisteredEventListener));
@@ -84,7 +85,7 @@ public void listenEvent() {
8485
EventHandler<SomeDataClass> handler = mock(EventHandler.class);
8586
registry.listenEvent(name, handler, SomeDataClass.class);
8687

87-
assertThat(registry.getEventListeners())
88+
assertThat(registry.getDomainEventListeners().get(DEFAULT_DOMAIN))
8889
.anySatisfy(registered -> assertThat(registered)
8990
.extracting(RegisteredEventListener::getPath, RegisteredEventListener::getInputClass, RegisteredEventListener::getHandler)
9091
.containsExactly(name, SomeDataClass.class, handler)).hasSize(1);

async/async-commons/src/main/java/org/reactivecommons/async/commons/converters/MessageConverter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.reactivecommons.async.commons.converters;
22

3+
import io.cloudevents.CloudEvent;
34
import org.reactivecommons.api.domain.Command;
45
import org.reactivecommons.api.domain.DomainEvent;
56
import org.reactivecommons.async.api.AsyncQuery;

async/async-commons/src/test/java/org/reactivecommons/async/commons/utils/matcher/KeyMatcherPerformanceWildcardTest.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,12 @@
88
import java.io.IOException;
99
import java.nio.file.Files;
1010
import java.nio.file.Paths;
11-
import java.util.*;
11+
import java.util.ArrayList;
12+
import java.util.HashMap;
13+
import java.util.HashSet;
14+
import java.util.List;
15+
import java.util.Map;
16+
import java.util.Set;
1217
import java.util.stream.Collectors;
1318

1419
class KeyMatcherPerformanceWildcardTest {
@@ -27,13 +32,13 @@ public void setUp() {
2732
File file = new File(classLoader.getResource("wildcard_names_for_matching.txt").getFile());
2833
File file2 = new File(classLoader.getResource("concrete_names_for_matching.txt").getFile());
2934
try {
30-
Set<String> names = new HashSet<>(Files
35+
Set<String> names = new HashSet<>(Files
3136
.readAllLines(Paths.get(file.getAbsolutePath())));
3237
candidates = names.stream()
3338
.collect(Collectors.toMap(name -> name, name -> name));
3439
testList = new ArrayList<>(new HashSet<>(Files
35-
.readAllLines(Paths.get(file2.getAbsolutePath()))));
36-
testResultList = new ArrayList<>(testList.size()*10);
40+
.readAllLines(Paths.get(file2.getAbsolutePath()))));
41+
testResultList = new ArrayList<>(testList.size() * 10);
3742
} catch (IOException e) {
3843
e.printStackTrace();
3944
}
@@ -43,14 +48,14 @@ public void setUp() {
4348
void keyMatcherLookupShouldPerformInLessThan30Micros() {
4449
final int size = testList.size();
4550
final long init = System.currentTimeMillis();
46-
for (int i = 0; i< size*10; ++i){
47-
testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i%size)));
51+
for (int i = 0; i < size * 10; ++i) {
52+
testResultList.add(keyMatcher.match(candidates.keySet(), testList.get(i % size)));
4853
}
4954
final long end = System.currentTimeMillis();
5055

5156

5257
final long total = end - init;
53-
final double microsPerLookup = ((total+0.0)/testResultList.size())*1000;
58+
final double microsPerLookup = ((total + 0.0) / testResultList.size()) * 1000;
5459
System.out.println("Performed Lookups: " + testResultList.size());
5560
System.out.println("Total Execution Time: " + total + "ms");
5661
System.out.println("Microseconds per lookup: " + microsPerLookup + "us");
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
ext {
2+
artifactId = 'async-commons-rabbit-starter-eda'
3+
artifactDescription = 'Async Commons Starter EDA'
4+
}
5+
6+
dependencies {
7+
api project(':async-rabbit')
8+
compileOnly 'org.springframework.boot:spring-boot-starter'
9+
compileOnly 'org.springframework.boot:spring-boot-starter-actuator'
10+
11+
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
12+
13+
testImplementation 'io.projectreactor:reactor-test'
14+
testImplementation 'org.springframework.boot:spring-boot-starter-actuator'
15+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.reactivecommons.async;
2+
3+
import io.micrometer.core.instrument.MeterRegistry;
4+
import org.reactivecommons.async.commons.config.BrokerConfig;
5+
import org.reactivecommons.async.commons.converters.MessageConverter;
6+
import org.reactivecommons.async.commons.reply.ReactiveReplyRouter;
7+
import org.reactivecommons.async.rabbit.RabbitDirectAsyncGateway;
8+
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
9+
import org.reactivecommons.async.rabbit.config.ConnectionManager;
10+
11+
import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;
12+
13+
public class RabbitEDADirectAsyncGateway extends RabbitDirectAsyncGateway {
14+
private final ConnectionManager manager;
15+
16+
public RabbitEDADirectAsyncGateway(BrokerConfig config, ReactiveReplyRouter router, ConnectionManager manager, String exchange, MessageConverter converter, MeterRegistry meterRegistry) {
17+
super(config, router, manager.getSender(DEFAULT_DOMAIN), exchange, converter, meterRegistry);
18+
this.manager = manager;
19+
}
20+
21+
@Override
22+
protected ReactiveMessageSender resolveSender(String domain) {
23+
return manager.getSender(domain);
24+
}
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.CommandListenersConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(CommandListenersConfig.class)
14+
@Configuration
15+
public @interface EnableCommandListeners {
16+
}
17+
18+
19+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.reactivecommons.async.impl.config.annotations;
2+
3+
import org.reactivecommons.async.rabbit.config.DirectAsyncGatewayConfig;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.context.annotation.Import;
6+
7+
import java.lang.annotation.*;
8+
9+
10+
@Retention(RetentionPolicy.RUNTIME)
11+
@Target({ElementType.TYPE})
12+
@Documented
13+
@Import(DirectAsyncGatewayConfig.class)
14+
@Configuration
15+
public @interface EnableDirectAsyncGateway {
16+
}
17+
18+
19+

0 commit comments

Comments
 (0)