Skip to content

Commit 29bc84c

Browse files
committed
GH-891 - DefaultEventPublicationRegistry now properly unregisters in-progress publication on failed resubmission.
DefaultEventPublicationRegistry.processIncompletePublications(…) now actively unregisters the publication from being considered in progress after completion (either successful or failed). While CompletionRegisteringAdvisor should take care of that on the target listeners we now leave the publications in progress in consistent state independent of the actual target being invoked. Decrease log level of the failed listener invocation as it's not unusual for the listener to fail. Improved PublicationsInProgress by switching to a concurrent map internally to avoid ConcurrentModificationExceptions in case of multiple threads.
1 parent d5477d1 commit 29bc84c

File tree

2 files changed

+92
-7
lines changed

2 files changed

+92
-7
lines changed

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistry.java

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import java.time.Instant;
2121
import java.util.Collection;
2222
import java.util.HashSet;
23+
import java.util.Iterator;
2324
import java.util.Optional;
2425
import java.util.Set;
26+
import java.util.concurrent.ConcurrentHashMap;
2527
import java.util.function.Consumer;
2628
import java.util.function.Predicate;
2729
import java.util.stream.Stream;
@@ -216,9 +218,12 @@ public void processIncompletePublications(Predicate<EventPublication> filter,
216218

217219
} catch (Exception o_O) {
218220

219-
if (LOGGER.isErrorEnabled()) {
220-
LOGGER.error("Error republishing event publication " + it, o_O);
221+
if (LOGGER.isInfoEnabled()) {
222+
LOGGER.info("Error republishing event publication %s.".formatted(it), o_O);
221223
}
224+
225+
} finally {
226+
inProgress.unregister(it);
222227
}
223228
});
224229
}
@@ -249,6 +254,30 @@ public void destroy() {
249254
}
250255
}
251256

257+
/**
258+
* Returns all {@link PublicationsInProgress}.
259+
*
260+
* @return will never be {@literal null}.
261+
* @since 1.3
262+
*/
263+
PublicationsInProgress getPublicationsInProgress() {
264+
return inProgress;
265+
}
266+
267+
/**
268+
* Marks the given {@link TargetEventPublication} as failed.
269+
*
270+
* @param publication must not be {@literal null}.
271+
* @see #markFailed(Object, PublicationTargetIdentifier)
272+
* @since 1.3
273+
*/
274+
void markFailed(TargetEventPublication publication) {
275+
276+
Assert.notNull(publication, "TargetEventPublication must not be null!");
277+
278+
markFailed(publication.getEvent(), publication.getTargetIdentifier());
279+
}
280+
252281
private static String getConfirmationMessage(Collection<?> publications) {
253282

254283
var size = publications.size();
@@ -266,9 +295,9 @@ private static String getConfirmationMessage(Collection<?> publications) {
266295
* @author Oliver Drotbohm
267296
* @since 1.3
268297
*/
269-
static class PublicationsInProgress {
298+
static class PublicationsInProgress implements Iterable<TargetEventPublication> {
270299

271-
private final Set<TargetEventPublication> publications = new HashSet<>();
300+
private final Set<TargetEventPublication> publications = ConcurrentHashMap.newKeySet();
272301

273302
/**
274303
* Registers the given {@link TargetEventPublication} as currently processed.
@@ -301,6 +330,18 @@ void unregister(Object event, PublicationTargetIdentifier identifier) {
301330
.ifPresent(publications::remove);
302331
}
303332

333+
/**
334+
* Unregisters the {@link TargetEventPublication}..
335+
*
336+
* @param publication must not be {@literal null}.
337+
*/
338+
void unregister(TargetEventPublication publication) {
339+
340+
Assert.notNull(publication, "TargetEventPublication must not be null!");
341+
342+
publications.remove(publication);
343+
}
344+
304345
/**
305346
* Returns the {@link TargetEventPublication} associated with the given event and
306347
* {@link PublicationTargetIdentifier}.
@@ -318,5 +359,14 @@ Optional<TargetEventPublication> getPublication(Object event, PublicationTargetI
318359
.filter(it -> it.isAssociatedWith(event, identifier))
319360
.findFirst();
320361
}
362+
363+
/*
364+
* (non-Javadoc)
365+
* @see java.lang.Iterable#iterator()
366+
*/
367+
@Override
368+
public Iterator<TargetEventPublication> iterator() {
369+
return new HashSet<>(publications).iterator();
370+
}
321371
}
322372
}

spring-modulith-events/spring-modulith-events-core/src/test/java/org/springframework/modulith/events/core/DefaultEventPublicationRegistryUnitTests.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.time.Clock;
2424
import java.time.Instant;
2525
import java.time.ZoneId;
26+
import java.util.function.Consumer;
2627
import java.util.stream.Stream;
2728

2829
import org.junit.jupiter.api.Test;
@@ -39,17 +40,15 @@
3940
class DefaultEventPublicationRegistryUnitTests {
4041

4142
@Mock EventPublicationRepository repository;
42-
@Mock Clock clock;
4343

4444
@Test // GH-206
4545
void usesCustomClockIfConfigured() {
4646

4747
when(repository.create(any())).then(returnsFirstArg());
4848

4949
var now = Instant.now();
50-
var clock = Clock.fixed(now, ZoneId.systemDefault());
5150

52-
var registry = new DefaultEventPublicationRegistry(repository, clock);
51+
var registry = createRegistry(now);
5352

5453
var identifier = PublicationTargetIdentifier.of("id");
5554
var publications = registry.store(new Object(), Stream.of(identifier));
@@ -59,4 +58,40 @@ void usesCustomClockIfConfigured() {
5958
assertThat(it.getTargetIdentifier()).isEqualTo(identifier);
6059
});
6160
}
61+
62+
@Test // GH-819
63+
void removesFailingResubmissionFromInProgressPublications() {
64+
65+
when(repository.create(any())).then(returnsFirstArg());
66+
67+
var registry = createRegistry(Instant.now());
68+
var identifier = PublicationTargetIdentifier.of("id");
69+
70+
var failedPublications = registry.store(new Object(), Stream.of(identifier)).stream()
71+
.peek(registry::markFailed)
72+
.toList();
73+
74+
// Failed completions are not present in the in progress ones
75+
assertThat(registry.getPublicationsInProgress()).isEmpty();
76+
77+
when(repository.findIncompletePublications()).thenReturn(failedPublications);
78+
79+
registry.processIncompletePublications(__ -> true, failingConsumer(), null);
80+
81+
// Failed re-submissions are not held in the in progress ones, either.
82+
assertThat(registry.getPublicationsInProgress()).isEmpty();
83+
}
84+
85+
private DefaultEventPublicationRegistry createRegistry(Instant instant) {
86+
87+
var clock = Clock.fixed(instant, ZoneId.systemDefault());
88+
89+
return new DefaultEventPublicationRegistry(repository, clock);
90+
}
91+
92+
private Consumer<TargetEventPublication> failingConsumer() {
93+
return __ -> {
94+
throw new IllegalStateException();
95+
};
96+
}
6297
}

0 commit comments

Comments
 (0)