Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.time.Duration;
import java.util.Optional;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.netty.http.client.HttpClient;

/**
Expand Down Expand Up @@ -52,6 +53,11 @@ public interface ConnectionContext {
*/
RootProvider getRootProvider();

/**
* The {@link Scheduler} to use for token operations
*/
Scheduler getTokenScheduler();

/**
* Attempt to explicitly trust the TLS certificate of an endpoint. Implementations can choose whether any actual trusting will happen.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
Expand Down Expand Up @@ -74,6 +76,7 @@ abstract class _DefaultConnectionContext implements ConnectionContext {
@PreDestroy
public final void dispose() {
getConnectionProvider().ifPresent(ConnectionProvider::dispose);
getTokenScheduler().dispose();
getThreadPool().dispose();

try {
Expand Down Expand Up @@ -154,6 +157,12 @@ public Mono<Void> trust(String host, int port) {
.orElse(Mono.empty());
}

@Override
@Value.Derived
public Scheduler getTokenScheduler() {
return Schedulers.newSingle(String.format("token-provider-%s/%d", getApiHost(), getPort().orElse(DEFAULT_PORT)));
}

/**
* Additional configuration for the underlying HttpClient
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider {
private final ConcurrentMap<ConnectionContext, Mono<String>> refreshTokens =
new ConcurrentHashMap<>(1);

private final ConcurrentMap<ConnectionContext, Mono<String>> activeTokenRequests =
new ConcurrentHashMap<>(1);

/**
* The client id. Defaults to {@code cf}.
*/
Expand Down Expand Up @@ -297,30 +300,107 @@ private void setAuthorization(HttpHeaders headers) {
headers.set(AUTHORIZATION, String.format("Basic %s", encoded));
}

private Mono<String> token(ConnectionContext connectionContext) {
Mono<String> cached =
this.refreshTokens
.getOrDefault(connectionContext, Mono.empty())
.flatMap(
refreshToken ->
refreshToken(connectionContext, refreshToken)
.doOnSubscribe(
s ->
LOGGER.debug(
"Negotiating using refresh"
+ " token")))
.switchIfEmpty(
primaryToken(connectionContext)
private Mono<String> token(final ConnectionContext connectionContext) {
/*
* Beware of issue #1146!
* There can be both multiple concurrent callers coming here during creation of the Mono
* and there can be concurrent callers during the subscription/execution of the Mono:
* - The latter is very harmful: This may lead to concurrent execution of the logic
* written in requestToken and hence to multiple requests to the UAA server using
* the same *value for the refresh token*! The UAA server will sequentialize them,
* one will go through just as normal and a new refresh token gets issued.
* The UAA server invalidates the old refresh token. The second request then arrives
* with the old refresh token and gets rejected. In an earlier version this led
* to caching the second request, hence to cache an error. This caused deadlocks.
* - The first is only "not nice", if the second issue is resolved: It causes that
* we will request two access tokens from the UAA server shortly after the other,
* but having use appropriate refresh tokens sequentially. The second request
* simply is to be considered waste.
*
* The coding below fixes both issues: It ensures that the execution of the Mono
* is synchronized and it ensures that two threads arriving to fetch the JWT in a
* non-caching situation does not trigger "wasteful" requests to the UAA server.
*/

/*
* We use Mono.defer to ensure that the execution of the locking not happens
* during creation of the Mono (where it is of little relevance), but
* during the execution/subscription.
*/
return Mono.defer(
() -> {
// Check if there's already an active token request
final Mono<String> existingRequest =
this.activeTokenRequests.get(connectionContext);
if (existingRequest != null) {
LOGGER.debug(
"Reusing existing UAA JWT token request for connection context");
return existingRequest;
}

final Mono<String> baseTokenRequest =
createTokenRequest(connectionContext)
.doOnSubscribe(
s -> LOGGER.debug("Starting new UAA JWT token request"))
.doOnSuccess(
token ->
LOGGER.debug(
"UAA JWT token request completed"
+ " successfully"))
.doOnError(
error ->
LOGGER.debug(
"UAA JWT token request failed", error))
.doFinally(
signal -> {
// Clear the active request when done (success or
// error)
this.activeTokenRequests.remove(connectionContext);
});

// Apply cache duration from connection context
final Mono<String> newTokenRequest =
connectionContext
.getCacheDuration()
.map(baseTokenRequest::cache)
.orElseGet(baseTokenRequest::cache)
/*
* Ensure execution on single thread.
* This prevents sending requests to the UAA server with expired refresh tokens.
*/
.publishOn(connectionContext.getTokenScheduler());

// Store the active request atomically
final Mono<String> actualRequest =
this.activeTokenRequests.putIfAbsent(
connectionContext, newTokenRequest);
if (actualRequest != null) {
// Another thread beat us to it, use their request. This prevents "wasteful"
// requests.
LOGGER.debug(
"Another thread created token request first, using theirs instead");
return actualRequest;
}

// We successfully stored our request, use it
return newTokenRequest;
});
}

private Mono<String> createTokenRequest(final ConnectionContext connectionContext) {
return this.refreshTokens
.getOrDefault(connectionContext, Mono.empty())
.flatMap(
refreshToken ->
refreshToken(connectionContext, refreshToken)
.doOnSubscribe(
s ->
LOGGER.debug(
"Negotiating using token"
+ " provider")));

return connectionContext
.getCacheDuration()
.map(cached::cache)
.orElseGet(cached::cache)
"Negotiating using refresh token")))
.switchIfEmpty(
primaryToken(connectionContext)
.doOnSubscribe(
s -> LOGGER.debug("Negotiating using token provider")))
.checkpoint();
}

Expand Down
Loading