Skip to content

Commit 1d1c893

Browse files
committed
GH-1321 - Revamped event publication registry and JDBC implementation.
1 parent 1200c92 commit 1d1c893

File tree

65 files changed

+2914
-91
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2914
-91
lines changed

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/EventPublication.java

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Optional;
2020
import java.util.UUID;
2121

22+
import org.jspecify.annotations.Nullable;
2223
import org.springframework.context.ApplicationEvent;
2324
import org.springframework.context.PayloadApplicationEvent;
2425

@@ -92,12 +93,69 @@ default boolean isCompleted() {
9293
return getCompletionDate().isPresent();
9394
}
9495

96+
/**
97+
* Return the publication's {@link Status}.
98+
*
99+
* @return will never be {@literal null}.
100+
* @since 2.0
101+
*/
102+
Status getStatus();
103+
104+
/**
105+
* Returns the last time the {@link EventPublication} was resubmitted.
106+
*
107+
* @return can be {@literal null}.
108+
* @since 2.0
109+
*/
110+
@Nullable
111+
Instant getLastResubmissionDate();
112+
113+
/**
114+
* Returns the number of completion attempts.
115+
*
116+
* @since 2.0
117+
*/
118+
int getCompletionAttempts();
119+
95120
/*
96121
* (non-Javadoc)
97122
* @see java.lang.Comparable#compareTo(java.lang.Object)
98123
*/
99-
@SuppressWarnings("javadoc")
100124
default int compareTo(EventPublication that) {
101125
return this.getPublicationDate().compareTo(that.getPublicationDate());
102126
}
127+
128+
/**
129+
* The status of an {@link EventPublication}.
130+
*
131+
* @author Oliver Drotbohm
132+
* @since 2.0
133+
*/
134+
enum Status {
135+
136+
/**
137+
* The event publication has been published initially.
138+
*/
139+
PUBLISHED,
140+
141+
/**
142+
* An event listener has picked up the publication and is processing it.
143+
*/
144+
PROCESSING,
145+
146+
/**
147+
* The processing of the publication has successfully completed.
148+
*/
149+
COMPLETED,
150+
151+
/**
152+
* The processing ended up in a failure.
153+
*/
154+
FAILED,
155+
156+
/**
157+
* A previously failed publication has been resubmitted for processing.
158+
*/
159+
RESUBMITTED;
160+
}
103161
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events;
17+
18+
/**
19+
* All uncompleted event publications.
20+
*
21+
* @author Oliver Drotbohm
22+
* @since 2.0
23+
*/
24+
public interface FailedEventPublications {
25+
26+
/**
27+
* Initiate the re-submission of failed {@link EventPublication} according to the given {@link ResubmissionOptions}.
28+
*
29+
* @param options must not be {@literal null}.
30+
*/
31+
void resubmit(ResubmissionOptions options);
32+
}

spring-modulith-events/spring-modulith-events-api/src/main/java/org/springframework/modulith/events/IncompleteEventPublications.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,12 @@ public interface IncompleteEventPublications {
4141
* @param duration must not be {@literal null}.
4242
*/
4343
void resubmitIncompletePublicationsOlderThan(Duration duration);
44+
45+
/**
46+
* Triggers the re-submission of incomplete {@link EventPublication}s matching the given {@link ResubmissionOptions}.
47+
*
48+
* @param options must not be {@literal null}.
49+
* @since 2.0
50+
*/
51+
void resubmitIncompletePublications(ResubmissionOptions options);
4452
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events;
17+
18+
import java.time.Duration;
19+
import java.util.function.Predicate;
20+
21+
import org.springframework.util.Assert;
22+
23+
/**
24+
* Options to be considered during {@link org.springframework.modulith.events.EventPublication} re-submission.
25+
*
26+
* @author Oliver Drotbohm
27+
* @since 2.0
28+
*/
29+
public class ResubmissionOptions {
30+
31+
private final int maxInFlight;
32+
private final int batchSize;
33+
private final Duration minAge;
34+
private final Predicate<EventPublication> filter;
35+
36+
private ResubmissionOptions(int maxInFlight, int batchSize, Duration minAge, Predicate<EventPublication> filter) {
37+
38+
Assert.isTrue(maxInFlight > 0, "Max in flight number must be greater than zero!");
39+
Assert.isTrue(batchSize > 0, "Batch size must be greater than zero!");
40+
Assert.notNull(minAge, "Minimum age must not be null!");
41+
Assert.isTrue(!minAge.isNegative(), "Minimum age must not be negative!");
42+
Assert.notNull(filter, "Filter must not be null!");
43+
44+
this.maxInFlight = maxInFlight;
45+
this.batchSize = batchSize;
46+
this.minAge = minAge;
47+
this.filter = filter;
48+
}
49+
50+
/**
51+
* Creates a new {@link ResubmissionOptions} with no bound for in-flight publications, a batch size of 100, no minimum
52+
* age and including all {@link EventPublication} instances.
53+
*
54+
* @return will never be {@literal null}.
55+
*/
56+
public static ResubmissionOptions defaults() {
57+
return new ResubmissionOptions(Integer.MAX_VALUE, 100, Duration.ZERO, __ -> true);
58+
}
59+
60+
public int getMaxInFlight() {
61+
return maxInFlight;
62+
}
63+
64+
/**
65+
* Configures the number of publications that are supposed to be in flight concurrently. This means that for each
66+
* re-submission attempt, only a number less than or equal to the configured value will be resubmitted.
67+
*
68+
* @param maxInFlight must not be less than or equal to zero.
69+
* @return will never be {@literal null}.
70+
*/
71+
public ResubmissionOptions withMaxInFlight(int maxInFlight) {
72+
return new ResubmissionOptions(maxInFlight, batchSize, minAge, filter);
73+
}
74+
75+
public int getBatchSize() {
76+
return batchSize;
77+
}
78+
79+
/**
80+
* Configures the batch size with which to read publications from the database.
81+
*
82+
* @param batchSize must not be less than or equal to zero.
83+
* @return will never be {@literal null}.
84+
*/
85+
public ResubmissionOptions withBatchSize(int batchSize) {
86+
return new ResubmissionOptions(maxInFlight, batchSize, minAge, filter);
87+
}
88+
89+
public Duration getMinAge() {
90+
return minAge;
91+
}
92+
93+
/**
94+
* Configures the minimum age of event publications to qualify for re-submission.
95+
*
96+
* @param minAge must not {@literal null} be negative.
97+
* @return will never be {@literal null}.
98+
*/
99+
public ResubmissionOptions withMinAge(Duration minAge) {
100+
return new ResubmissionOptions(maxInFlight, batchSize, minAge, filter);
101+
}
102+
103+
public Predicate<EventPublication> getFilter() {
104+
return filter;
105+
}
106+
107+
/**
108+
* Configures which {@link EventPublication}s to resubmit in a re-submission attempt.
109+
*
110+
* @param filter must not be {@literal null}.
111+
* @return will never be {@literal null}.
112+
*/
113+
public ResubmissionOptions withFilter(Predicate<EventPublication> filter) {
114+
return new ResubmissionOptions(maxInFlight, batchSize, minAge, filter);
115+
}
116+
}

spring-modulith-events/spring-modulith-events-core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
<groupId>org.springframework.boot</groupId>
4949
<artifactId>spring-boot-autoconfigure</artifactId>
5050
</dependency>
51+
52+
<dependency>
53+
<groupId>org.springframework.boot</groupId>
54+
<artifactId>spring-boot-configuration-processor</artifactId>
55+
<optional>true</optional>
56+
</dependency>
5157

5258
<!-- Test -->
5359

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationAutoConfiguration.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
3434
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties;
3535
import org.springframework.boot.autoconfigure.task.TaskExecutionProperties.Shutdown;
36+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
3637
import org.springframework.context.Lifecycle;
3738
import org.springframework.context.annotation.Bean;
3839
import org.springframework.context.annotation.Import;
@@ -56,7 +57,8 @@
5657
* @author Dmitry Belyaev
5758
*/
5859
@AutoConfiguration
59-
@Import(AsyncEnablingConfiguration.class)
60+
@Import({ AsyncEnablingConfiguration.class, StalenessMonitorConfiguration.class })
61+
@EnableConfigurationProperties(StalenessProperties.class)
6062
public class EventPublicationAutoConfiguration extends EventPublicationConfiguration {
6163

6264
@Override

spring-modulith-events/spring-modulith-events-core/src/main/java/org/springframework/modulith/events/config/EventPublicationConfiguration.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class EventPublicationConfiguration {
4444
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
4545
DefaultEventPublicationRegistry eventPublicationRegistry(EventPublicationRepository repository,
4646
ObjectProvider<Clock> clock) {
47+
4748
return new DefaultEventPublicationRegistry(repository, clock.getIfAvailable(() -> Clock.systemUTC()));
4849
}
4950

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.modulith.events.config;
17+
18+
import java.time.Duration;
19+
20+
import org.springframework.util.Assert;
21+
22+
/**
23+
* General utilities.
24+
*
25+
* @author Oliver Drotbohm
26+
* @since 2.0
27+
*/
28+
class EventUtils {
29+
30+
/**
31+
* Creates a human-readable {@link String} from a {@link Duration}.
32+
*
33+
* @param duration must not be {@literal null}.
34+
* @return will never be {@literal null}.
35+
*/
36+
static String prettyPrint(Duration duration) {
37+
38+
Assert.notNull(duration, "Duration must not be null!");
39+
40+
return duration.toString()
41+
.substring(2)
42+
.replaceAll("(\\d[HMS])(?!$)", "$1 ")
43+
.toLowerCase();
44+
}
45+
}

0 commit comments

Comments
 (0)