Skip to content

Commit

Permalink
Handling stream closing for OkHttp
Browse files Browse the repository at this point in the history
  • Loading branch information
sashirestela committed Jan 16, 2025
1 parent dd433b8 commit 223233a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ protected Stream<Object> convertToStreamOfObjects(Stream<String> response, Retur
protected Stream<Object> convertToStreamOfEvents(Stream<String> response, ReturnType returnType) {
final var lineRecord = new CleverClientSSE.LineRecord();
final var events = returnType.getClassByEvent().keySet();

return response
.map(line -> {
logger.debug(RESPONSE_FORMAT, line);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,18 @@ protected Object send(RequestData request) {
try {
var response = okHttpClient.newCall(okHttpRequest).execute();
logger.debug(RESPONSE_CODE_FORMAT, response.code());
var responseContent = getReponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
if (!returnType.isStream()) {
logger.debug(RESPONSE_FORMAT, responseContent);
if (returnType.isStream()) {
var responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
return functions.responseConverter.apply(responseContent, returnType);
} else {
try (response) {
var responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
logger.debug(RESPONSE_FORMAT, responseContent);
return functions.responseConverter.apply(responseContent, returnType);
}
}
return functions.responseConverter.apply(responseContent, returnType);
} catch (IOException e) {
throw new CleverClientException(e);
}
Expand All @@ -84,15 +90,24 @@ public void onFailure(Call call, IOException e) {
@Override
public void onResponse(Call call, Response response) throws IOException {
logger.debug(RESPONSE_CODE_FORMAT, response.code());
var responseContent = getReponseContent(response.body(), returnType);
try {
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
if (!returnType.isStream()) {
if (returnType.isStream()) {
try {
Object responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
responseFuture.complete(functions.responseConverter.apply(responseContent, returnType));
} catch (CleverClientException e) {
response.close();
responseFuture.completeExceptionally(e);
}
} else {
try (response) {
Object responseContent = getResponseContent(response.body(), returnType);
throwExceptionIfErrorIsPresent(convertToResponseData(response, responseContent));
logger.debug(RESPONSE_FORMAT, responseContent);
responseFuture.complete(functions.responseConverter.apply(responseContent, returnType));
} catch (CleverClientException e) {
responseFuture.completeExceptionally(e);
}
responseFuture.complete(functions.responseConverter.apply(responseContent, returnType));
} catch (CleverClientException e) {
responseFuture.completeExceptionally(e);
}
}

Expand Down Expand Up @@ -168,11 +183,17 @@ private ResponseData convertToResponseData(Response response, Object responseCon
.build();
}

private Object getReponseContent(ResponseBody responseBody, ReturnType returnType) {
private Object getResponseContent(ResponseBody responseBody, ReturnType returnType) {
try {
if (returnType.isStream()) {
var reader = new BufferedReader(new InputStreamReader(responseBody.byteStream()));
return reader.lines();
BufferedReader reader = new BufferedReader(new InputStreamReader(responseBody.byteStream()));
return reader.lines().onClose(() -> {
try {
responseBody.close();
} catch (Exception e) {
throw new CleverClientException(e);
}
});
} else if (returnType.isInputStream()) {
return responseBody.byteStream();
} else {
Expand Down

0 comments on commit 223233a

Please sign in to comment.