Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use OkHttp as an alternative Http client #94

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ protected Stream<Object> convertToStreamOfObjects(Stream<String> response, Retur
protected Stream<Object> convertToStreamOfEvents(Stream<String> response, ReturnType returnType) {
final var lineRecord = new CleverClientSSE.LineRecord();
final var events = returnType.getClassByEvent().keySet();

return response
.map(line -> {
logger.debug(RESPONSE_FORMAT, line);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected Object sendAsync(RequestData request) {

@Override
public void shutdown() {
this.httpClient.executor().ifPresent(executor -> {
httpClient.executor().ifPresent(executor -> {
if (executor instanceof ExecutorService) {
((ExecutorService) executor).shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class OkHttpClientAdapter extends HttpClientAdapter {

Expand Down Expand Up @@ -57,12 +60,18 @@ protected Object send(RequestData request) {
try {
var response = okHttpClient.newCall(okHttpRequest).execute();
logger.debug(RESPONSE_CODE_FORMAT, response.code());
var responseContent = getReponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
if (!returnType.isStream()) {
logger.debug(RESPONSE_FORMAT, responseContent);
if (returnType.isStream()) {
var responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
return functions.responseConverter.apply(responseContent, returnType);
} else {
try (response) {
var responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
logger.debug(RESPONSE_FORMAT, responseContent);
return functions.responseConverter.apply(responseContent, returnType);
}
}
return functions.responseConverter.apply(responseContent, returnType);
} catch (IOException e) {
throw new CleverClientException(e);
}
Expand All @@ -84,15 +93,24 @@ public void onFailure(Call call, IOException e) {
@Override
public void onResponse(Call call, Response response) throws IOException {
logger.debug(RESPONSE_CODE_FORMAT, response.code());
var responseContent = getReponseContent(response.body(), returnType);
try {
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
if (!returnType.isStream()) {
if (returnType.isStream()) {
try {
Object responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
responseFuture.complete(functions.responseConverter.apply(responseContent, returnType));
} catch (CleverClientException e) {
response.close();
responseFuture.completeExceptionally(e);
}
} else {
try (response) {
Object responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
logger.debug(RESPONSE_FORMAT, responseContent);
responseFuture.complete(functions.responseConverter.apply(responseContent, returnType));
} catch (CleverClientException e) {
responseFuture.completeExceptionally(e);
}
responseFuture.complete(functions.responseConverter.apply(responseContent, returnType));
} catch (CleverClientException e) {
responseFuture.completeExceptionally(e);
}
}

Expand Down Expand Up @@ -168,11 +186,49 @@ private ResponseData convertToResponseData(Response response, Object responseCon
.build();
}

private Object getReponseContent(ResponseBody responseBody, ReturnType returnType) {
private Object getResponseContent(ResponseBody responseBody, ReturnType returnType) {
try {
if (returnType.isStream()) {
var reader = new BufferedReader(new InputStreamReader(responseBody.byteStream()));
return reader.lines();
BufferedReader reader = new BufferedReader(new InputStreamReader(responseBody.byteStream()));
return StreamSupport.stream(new Spliterator<String>() {

private final Iterator<String> iterator = reader.lines().iterator();
private boolean closed = false;

@Override
public boolean tryAdvance(java.util.function.Consumer<? super String> action) {
if (iterator.hasNext()) {
action.accept(iterator.next());
return true;
}
if (!closed) {
try {
reader.close();
responseBody.close();
closed = true;
} catch (Exception e) {
throw new CleverClientException(e);
}
}
return false;
}

@Override
public Spliterator<String> trySplit() {
return null;
}

@Override
public long estimateSize() {
return Long.MAX_VALUE;
}

@Override
public int characteristics() {
return Spliterator.ORDERED;
}

}, false);
} else if (returnType.isInputStream()) {
return responseBody.byteStream();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ interface HttpProcessorTest {

void setMocksForStreamWithError(Stream<String> result) throws IOException, URISyntaxException;

void testShutdown();

@BeforeAll
static void setup() {
TestSupport.setupConfigurator();
Expand Down Expand Up @@ -343,4 +345,9 @@ default void shouldExecuteDefaultMethodWhenItIsCalled() {
assertEquals(expectedValue, actualValue);
}

@Test
default void shouldShutdownWithoutExceptions() {
testShutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -146,4 +148,14 @@ public void setMocksForStreamWithError(Stream<String> result) throws IOException
when(httpResponseStream.request()).thenReturn(httpRequest);
}

@Override
public void testShutdown() {
var defaultAdapter = new JavaHttpClientAdapter();
assertDoesNotThrow(() -> defaultAdapter.shutdown());

var client = HttpClient.newBuilder().executor(Executors.newFixedThreadPool(2)).build();
var customAdapter = new JavaHttpClientAdapter(client);
assertDoesNotThrow(() -> customAdapter.shutdown());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -179,4 +180,14 @@ public void setMocksForStreamWithError(Stream<String> result) throws IOException
when(okHttpResponse.request()).thenReturn(okHttpRequest);
}

@Override
public void testShutdown() {
var defaultAdapter = new OkHttpClientAdapter();
assertDoesNotThrow(() -> defaultAdapter.shutdown());

var client = new OkHttpClient();
var customAdapter = new OkHttpClientAdapter(client);
assertDoesNotThrow(() -> customAdapter.shutdown());
}

}
Loading