Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transport level retry #954

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class DefaultTransportOptions implements TransportOptions {
private final Map<String, String> parameters;
private final Function<List<String>, Boolean> onWarnings;
private boolean keepResponseBodyOnException;
private BackoffPolicy backoffPolicy;

public static final DefaultTransportOptions EMPTY = new DefaultTransportOptions();

Expand All @@ -49,10 +50,12 @@ public DefaultTransportOptions(
@Nullable HeaderMap headers,
@Nullable Map<String, String> parameters,
@Nullable Function<List<String>, Boolean> onWarnings,
boolean keepResponseBodyOnException
boolean keepResponseBodyOnException,
BackoffPolicy backoffPolicy
) {
this(headers,parameters,onWarnings);
this.keepResponseBodyOnException = keepResponseBodyOnException;
this.backoffPolicy = backoffPolicy;
}

public DefaultTransportOptions(
Expand All @@ -65,10 +68,11 @@ public DefaultTransportOptions(
Collections.emptyMap() : Collections.unmodifiableMap(parameters);
this.onWarnings = onWarnings;
this.keepResponseBodyOnException = false;
this.backoffPolicy = BackoffPolicy.noBackoff();
}

protected DefaultTransportOptions(AbstractBuilder<?> builder) {
this(builder.headers, builder.parameters, builder.onWarnings, builder.keepResponseBodyOnException);
this(builder.headers, builder.parameters, builder.onWarnings, builder.keepResponseBodyOnException, builder.backoffPolicy);
}

public static DefaultTransportOptions of(@Nullable TransportOptions options) {
Expand Down Expand Up @@ -105,6 +109,11 @@ public boolean keepResponseBodyOnException() {
return keepResponseBodyOnException;
}

@Override
public BackoffPolicy backoffPolicy() {
return backoffPolicy;
}

@Override
public Builder toBuilder() {
return new Builder(this);
Expand All @@ -129,6 +138,7 @@ public abstract static class AbstractBuilder<BuilderT extends AbstractBuilder<Bu
private Map<String, String> parameters;
private Function<List<String>, Boolean> onWarnings;
private boolean keepResponseBodyOnException;
private BackoffPolicy backoffPolicy;

public AbstractBuilder() {
}
Expand All @@ -138,6 +148,7 @@ public AbstractBuilder(DefaultTransportOptions options) {
this.parameters = copyOrNull(options.parameters);
this.onWarnings = options.onWarnings;
this.keepResponseBodyOnException = options.keepResponseBodyOnException;
this.backoffPolicy = options.backoffPolicy;
}

protected abstract BuilderT self();
Expand All @@ -148,6 +159,12 @@ public BuilderT keepResponseBodyOnException(boolean value) {
return self();
}

@Override
public BuilderT backoffPolicy(BackoffPolicy policy) {
this.backoffPolicy = policy;
return self();
}

@Override
public BuilderT addHeader(String name, String value) {
if (name.equalsIgnoreCase(HeaderMap.CLIENT_META)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import co.elastic.clients.transport.instrumentation.Instrumentation;
import co.elastic.clients.transport.instrumentation.NoopInstrumentation;
import co.elastic.clients.transport.instrumentation.OpenTelemetryForElasticsearch;
import co.elastic.clients.transport.rest_client.RetryRestClientHttpClient;
import co.elastic.clients.util.ApiTypeHelper;
import co.elastic.clients.util.BinaryData;
import co.elastic.clients.util.ByteArrayBinaryData;
Expand Down Expand Up @@ -100,9 +101,15 @@ public ElasticsearchTransportBase(
@Nullable Instrumentation instrumentation
) {
this.mapper = jsonpMapper;
this.httpClient = httpClient;
this.transportOptions = httpClient.createOptions(options);

if (this.transportOptions.backoffPolicy()!=BackoffPolicy.noBackoff()){
this.httpClient = new RetryRestClientHttpClient(httpClient,this.transportOptions.backoffPolicy());
}
else {
this.httpClient = httpClient;
}

// If no instrumentation is provided, fallback to OpenTelemetry and ultimately noop
if (instrumentation == null) {
instrumentation = OpenTelemetryForElasticsearch.getDefault();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public interface TransportOptions {

Builder toBuilder();

BackoffPolicy backoffPolicy();

default TransportOptions with(Consumer<Builder> fn) {
Builder builder = toBuilder();
fn.accept(builder);
Expand All @@ -73,5 +75,7 @@ interface Builder extends ObjectBuilder<TransportOptions> {
* streamed by the http library.
*/
Builder keepResponseBodyOnException(boolean value);

Builder backoffPolicy(BackoffPolicy policy);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package co.elastic.clients.transport.rest_client;

import org.elasticsearch.client.Cancellable;

import java.util.concurrent.CompletableFuture;

/**
* The {@code Future} implementation returned by async requests.
* It wraps the RestClient's cancellable and propagates cancellation.
*/
public class RequestFuture<T> extends CompletableFuture<T> {
private volatile Cancellable cancellable;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && cancellable != null) {
cancellable.cancel();
}
return cancelled;
}

public void setCancellable(Cancellable cancellable) {
this.cancellable = cancellable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,6 @@ public class RestClientHttpClient implements TransportHttpClient {

private static final ConcurrentHashMap<String, ContentType> ContentTypeCache = new ConcurrentHashMap<>();

/**
* The {@code Future} implementation returned by async requests.
* It wraps the RestClient's cancellable and propagates cancellation.
*/
private static class RequestFuture<T> extends CompletableFuture<T> {
private volatile Cancellable cancellable;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && cancellable != null) {
cancellable.cancel();
}
return cancelled;
}
}

private final RestClient restClient;

public RestClientHttpClient(RestClient restClient) {
Expand Down Expand Up @@ -110,7 +93,7 @@ public CompletableFuture<Response> performRequestAsync(
return future;
}

future.cancellable = restClient.performRequestAsync(restRequest, new ResponseListener() {
future.setCancellable(restClient.performRequestAsync(restRequest, new ResponseListener() {
@Override
public void onSuccess(org.elasticsearch.client.Response response) {
future.complete(new RestResponse(response));
Expand All @@ -120,7 +103,7 @@ public void onSuccess(org.elasticsearch.client.Response response) {
public void onFailure(Exception exception) {
future.completeExceptionally(exception);
}
});
}));

return future;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package co.elastic.clients.transport.rest_client;

import co.elastic.clients.transport.BackoffPolicy;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.Version;
import co.elastic.clients.transport.http.HeaderMap;
Expand All @@ -44,6 +45,8 @@ public class RestClientOptions implements TransportOptions {

boolean keepResponseBodyOnException;

BackoffPolicy backoffPolicy;

@VisibleForTesting
static final String CLIENT_META_VALUE = getClientMeta();
@VisibleForTesting
Expand All @@ -65,8 +68,9 @@ static RestClientOptions of(@Nullable TransportOptions options) {
return builder.build();
}

public RestClientOptions(RequestOptions options, boolean keepResponseBodyOnException) {
public RestClientOptions(RequestOptions options, boolean keepResponseBodyOnException, BackoffPolicy backoffPolicy) {
this.keepResponseBodyOnException = keepResponseBodyOnException;
this.backoffPolicy = backoffPolicy;
this.options = addBuiltinHeaders(options.toBuilder()).build();
}

Expand Down Expand Up @@ -107,6 +111,11 @@ public boolean keepResponseBodyOnException() {
return this.keepResponseBodyOnException;
}

@Override
public BackoffPolicy backoffPolicy() {
return backoffPolicy;
}

@Override
public Builder toBuilder() {
return new Builder(options.toBuilder());
Expand All @@ -118,6 +127,8 @@ public static class Builder implements TransportOptions.Builder {

private boolean keepResponseBodyOnException;

private BackoffPolicy backoffPolicy;

public Builder(RequestOptions.Builder builder) {
this.builder = builder;
}
Expand Down Expand Up @@ -197,14 +208,20 @@ public TransportOptions.Builder keepResponseBodyOnException(boolean value) {
return this;
}

@Override
public TransportOptions.Builder backoffPolicy(BackoffPolicy policy) {
this.backoffPolicy = policy;
return this;
}

@Override
public RestClientOptions build() {
return new RestClientOptions(addBuiltinHeaders(builder).build(), keepResponseBodyOnException);
return new RestClientOptions(addBuiltinHeaders(builder).build(), keepResponseBodyOnException, backoffPolicy);
}
}

static RestClientOptions initialOptions() {
return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false);
return new RestClientOptions(SafeResponseConsumer.DEFAULT_REQUEST_OPTIONS, false, BackoffPolicy.noBackoff());
}

private static RequestOptions.Builder addBuiltinHeaders(RequestOptions.Builder builder) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package co.elastic.clients.transport.rest_client;

import co.elastic.clients.transport.BackoffPolicy;
import co.elastic.clients.transport.TransportOptions;
import co.elastic.clients.transport.http.TransportHttpClient;
import org.elasticsearch.client.ResponseException;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

public class RetryRestClientHttpClient implements TransportHttpClient {

private TransportHttpClient delegate;
private BackoffPolicy backoffPolicy;

public RetryRestClientHttpClient(TransportHttpClient delegate, BackoffPolicy backoffPolicy) {
this.delegate = delegate;
this.backoffPolicy = backoffPolicy;
}

@Override
public Response performRequest(String endpointId, @Nullable Node node, Request request,
TransportOptions options) throws IOException {
return performRequestRetry(endpointId, node, request, options, backoffPolicy.iterator());
}

public Response performRequestRetry(String endpointId, @Nullable Node node, Request request,
TransportOptions options, Iterator<Long> backoffIter) throws IOException {
try {
return delegate.performRequest(endpointId, node, request, options);
} catch (ResponseException e) {
if (e.getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded?
// synchronous retry
if (backoffIter.hasNext()) {
try {
Thread.sleep(backoffIter.next());
} catch (InterruptedException ie) {
throw e; // TODO okay with masking IE and just returning original exception?
}
System.out.println("Retrying");
return performRequestRetry(endpointId, node, request, options, backoffIter);
}
}
// error not retryable
throw e;
}
}

@Override
public CompletableFuture<Response> performRequestAsync(String endpointId, @Nullable Node node,
Request request, TransportOptions options) {
RequestFuture<Response> futureResult = new RequestFuture<>();
return performRequestAsyncRetry(endpointId, node, request, options, backoffPolicy.iterator(),
futureResult);
}

public CompletableFuture<Response> performRequestAsyncRetry(String endpointId, @Nullable Node node,
Request request,
TransportOptions options,
Iterator<Long> backoffIter,
CompletableFuture<Response> futureResult) {
CompletableFuture<Response> res = delegate.performRequestAsync(endpointId, node, request, options);

res.whenComplete((resp, e) -> {
if (e != null) {
if (e instanceof ResponseException) {
if (((ResponseException) e).getResponse().getStatusLine().getStatusCode() == 503) { // TODO list of statuses, configurable or hardcoded?
if (backoffIter.hasNext()) {
try {
Thread.sleep(backoffIter.next());
} catch (InterruptedException ie) {
// TODO okay with masking IE and just returning original exception?
futureResult.completeExceptionally(e);
}
System.out.println("Retrying");
performRequestAsyncRetry(endpointId, node, request, options, backoffIter,futureResult);
}
}
}
}
else {
futureResult.complete(resp);
}
});

return futureResult;
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.BackoffPolicy;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.Endpoint;
import co.elastic.clients.transport.TransportOptions;
Expand Down Expand Up @@ -69,10 +70,16 @@ public boolean keepResponseBodyOnException() {
return false;
}

@Override
public BackoffPolicy backoffPolicy() {
return BackoffPolicy.noBackoff();
}

@Override
public Builder toBuilder() {
return null;
}

};

public void setResult(Object result) {
Expand Down
Loading
Loading