Skip to content

Add a heartbeat executor for SSE emitters #34878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.web.servlet.config.annotation;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -32,6 +33,7 @@
* Helps with configuring options for asynchronous request processing.
*
* @author Rossen Stoyanchev
* @author Réda Housni Alaoui
* @since 3.2
*/
public class AsyncSupportConfigurer {
Expand All @@ -44,6 +46,8 @@ public class AsyncSupportConfigurer {

private final List<DeferredResultProcessingInterceptor> deferredResultInterceptors = new ArrayList<>();

private @Nullable Duration sseHeartbeatPeriod;


/**
* The provided task executor is used for the following:
Expand Down Expand Up @@ -99,6 +103,14 @@ public AsyncSupportConfigurer registerDeferredResultInterceptors(
return this;
}

/**
* Configure the SSE heartbeat period.
* @param sseHeartbeatPeriod The SSE heartbeat period
*/
public AsyncSupportConfigurer setSseHeartbeatPeriod(Duration sseHeartbeatPeriod) {
this.sseHeartbeatPeriod = sseHeartbeatPeriod;
return this;
}

protected @Nullable AsyncTaskExecutor getTaskExecutor() {
return this.taskExecutor;
Expand All @@ -116,4 +128,8 @@ protected List<DeferredResultProcessingInterceptor> getDeferredResultInterceptor
return this.deferredResultInterceptors;
}

protected @Nullable Duration getSseHeartbeatPeriod() {
return this.sseHeartbeatPeriod;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

import jakarta.servlet.ServletContext;
import org.jspecify.annotations.Nullable;
Expand Down Expand Up @@ -693,6 +694,7 @@ public RequestMappingHandlerAdapter requestMappingHandlerAdapter(
}
adapter.setCallableInterceptors(configurer.getCallableInterceptors());
adapter.setDeferredResultInterceptors(configurer.getDeferredResultInterceptors());
Optional.ofNullable(configurer.getSseHeartbeatPeriod()).ifPresent(adapter::setSseHeartbeatPeriod);

return adapter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.springframework.web.servlet.mvc.method.annotation;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -54,6 +56,8 @@
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.converter.support.AllEncompassingFormHttpMessageConverter;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
import org.springframework.ui.ModelMap;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand Down Expand Up @@ -123,6 +127,7 @@
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @author Sebastien Deleuze
* @author Réda Housni Alaoui
* @since 3.1
* @see HandlerMethodArgumentResolver
* @see HandlerMethodReturnValueHandler
Expand Down Expand Up @@ -201,6 +206,9 @@ public class RequestMappingHandlerAdapter extends AbstractHandlerMethodAdapter

private final Map<ControllerAdviceBean, Set<Method>> modelAttributeAdviceCache = new LinkedHashMap<>();

private TaskScheduler taskScheduler = new SimpleAsyncTaskScheduler();

private @Nullable Duration sseHeartbeatPeriod;

/**
* Provide resolvers for custom argument types. Custom resolvers are ordered
Expand Down Expand Up @@ -526,6 +534,20 @@ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDisc
this.parameterNameDiscoverer = parameterNameDiscoverer;
}

/**
* Set the {@link TaskScheduler}
*/
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}

/**
* Sets the heartbeat period that will be used to periodically prob the SSE connection health
*/
public void setSseHeartbeatPeriod(@Nullable Duration sseHeartbeatPeriod) {
this.sseHeartbeatPeriod = sseHeartbeatPeriod;
}

/**
* A {@link ConfigurableBeanFactory} is expected for resolving expressions
* in method argument default values.
Expand Down Expand Up @@ -733,9 +755,12 @@ private List<HandlerMethodReturnValueHandler> getDefaultReturnValueHandlers() {
handlers.add(new ModelAndViewMethodReturnValueHandler());
handlers.add(new ModelMethodProcessor());
handlers.add(new ViewMethodReturnValueHandler());

SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor = Optional.ofNullable(sseHeartbeatPeriod)
.map(period -> new SseEmitterHeartbeatExecutor(taskScheduler, period)).orElse(null);
handlers.add(new ResponseBodyEmitterReturnValueHandler(getMessageConverters(),
this.reactiveAdapterRegistry, this.taskExecutor, this.contentNegotiationManager,
initViewResolvers(), initLocaleResolver()));
initViewResolvers(), initLocaleResolver(), sseEmitterHeartbeatExecutor));
handlers.add(new StreamingResponseBodyReturnValueHandler());
handlers.add(new HttpEntityMethodProcessor(getMessageConverters(),
this.contentNegotiationManager, this.requestResponseBodyAdvice, this.errorResponseInterceptors));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;

Expand Down Expand Up @@ -89,6 +90,7 @@
* </ul>
*
* @author Rossen Stoyanchev
* @author Réda Housni Alaoui
* @since 4.2
*/
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
Expand All @@ -101,6 +103,8 @@ public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodRetur

private final LocaleResolver localeResolver;

@Nullable
private final SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor;

/**
* Simple constructor with reactive type support based on a default instance of
Expand Down Expand Up @@ -143,11 +147,32 @@ public ResponseBodyEmitterReturnValueHandler(
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver) {

this(messageConverters, registry, executor, manager, viewResolvers, localeResolver, null);
}

/**
* Constructor that with added arguments for view rendering.
* @param messageConverters converters to write emitted objects with
* @param registry for reactive return value type support
* @param executor for blocking I/O writes of items emitted from reactive types
* @param manager for detecting streaming media types
* @param viewResolvers resolvers for fragment stream rendering
* @param localeResolver the {@link LocaleResolver} for fragment stream rendering
* @param sseEmitterHeartbeatExecutor for sending periodic events to SSE clients
* @since 6.2
*/
public ResponseBodyEmitterReturnValueHandler(
List<HttpMessageConverter<?>> messageConverters,
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager,
List<ViewResolver> viewResolvers, @Nullable LocaleResolver localeResolver,
@Nullable SseEmitterHeartbeatExecutor sseEmitterHeartbeatExecutor) {

Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
this.sseMessageConverters = initSseConverters(messageConverters);
this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager, null);
this.viewResolvers = viewResolvers;
this.localeResolver = (localeResolver != null ? localeResolver : new AcceptHeaderLocaleResolver());
this.sseEmitterHeartbeatExecutor = sseEmitterHeartbeatExecutor;
}

private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) {
Expand Down Expand Up @@ -239,6 +264,9 @@ public void handleReturnValue(@Nullable Object returnValue, MethodParameter retu
}

emitter.initialize(emitterHandler);
if (emitter instanceof SseEmitter sseEmitter) {
Optional.ofNullable(sseEmitterHeartbeatExecutor).ifPresent(handler -> handler.register(sseEmitter));
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.jspecify.annotations.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
Expand All @@ -41,17 +45,22 @@
* @author Juergen Hoeller
* @author Sam Brannen
* @author Brian Clozel
* @author Réda Housni Alaoui
* @since 4.2
*/
public class SseEmitter extends ResponseBodyEmitter {

private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitter.class);

private static final MediaType TEXT_PLAIN = new MediaType("text", "plain", StandardCharsets.UTF_8);

/**
* Guards access to write operations on the response.
*/
private final Lock writeLock = new ReentrantLock();

private volatile @Nullable Long lastEmissionNanoTime;

/**
* Create a new SseEmitter instance.
*/
Expand Down Expand Up @@ -134,12 +143,31 @@ public void send(SseEventBuilder builder) throws IOException {
this.writeLock.lock();
try {
super.send(dataToSend);
this.lastEmissionNanoTime = System.nanoTime();
}
finally {
this.writeLock.unlock();
}
}

void notifyOfHeartbeatTick(Duration heartbeatPeriod) {
boolean skip = Optional.ofNullable(lastEmissionNanoTime)
.map(lastEmissionNanoTime -> System.nanoTime() - lastEmissionNanoTime)
.map(nanoTimeElapsedSinceLastEmission -> nanoTimeElapsedSinceLastEmission < heartbeatPeriod.toNanos())
.orElse(false);
if (skip) {
return;
}
LOGGER.trace("Sending heartbeat to {}", this);
SseEmitter.SseEventBuilder eventBuilder = SseEmitter.event().name("ping").data("ping", MediaType.TEXT_PLAIN);
try {
send(eventBuilder);
} catch (IOException | RuntimeException e) {
// According to SseEmitter's Javadoc, the container itself will call SseEmitter#completeWithError
LOGGER.debug(e.getMessage());
}
}

@Override
public String toString() {
return "SseEmitter@" + ObjectUtils.getIdentityHexString(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.web.servlet.mvc.method.annotation;


import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.TaskScheduler;

/**
* @author Réda Housni Alaoui
*/
class SseEmitterHeartbeatExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterHeartbeatExecutor.class);

private final TaskScheduler taskScheduler;
private final Set<SseEmitter> emitters = ConcurrentHashMap.newKeySet();

private final Object lifecycleMonitor = new Object();

private final Duration period;

@Nullable
private volatile ScheduledFuture<?> taskFuture;

public SseEmitterHeartbeatExecutor(TaskScheduler taskScheduler, Duration period) {
this.taskScheduler = taskScheduler;
this.period = period;
}

public void register(SseEmitter emitter) {
startIfNeeded();

Runnable closeCallback = () -> emitters.remove(emitter);
emitter.onCompletion(closeCallback);
emitter.onError(t -> closeCallback.run());
emitter.onTimeout(closeCallback);

emitters.add(emitter);
}

boolean isRegistered(SseEmitter emitter) {
return emitters.contains(emitter);
}

private void startIfNeeded() {
if (taskFuture != null) {
return;
}
synchronized (lifecycleMonitor) {
if (taskFuture != null) {
return;
}
taskFuture = taskScheduler.scheduleAtFixedRate(this::notifyEmitters, period);
}
}

private void notifyEmitters() {
LOGGER.atDebug().log(() -> "Notifying %s emitter(s)".formatted(emitters.size()));

for (SseEmitter emitter : emitters) {
if (Thread.currentThread().isInterrupted()) {
return;
}
emitter.notifyOfHeartbeatTick(period);
}
}
}
Loading