Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling SSE with 'event' entries #52

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,27 @@ Take in account that you need to use **Java 11 or greater**.

We have the following attributes to create a CleverClient object:

| Attribute | Description | Required |
| -------------------|----------------------------------------------|-----------|
| baseUrl | Api's url | mandatory |
| headers | Map of headers (name/value) | optional |
| header | Single header (alternative to headers) | optional |
| httpClient | Java HttpClient object | optional |
| requestInterceptor | Function to modify the request once is built | optional |
| endOfStream | Text used to mark the final of streams | optional |

The attribute ```endOfStream``` is required when you have endpoints sending back streams of data (Server Sent Events - SSE).
| Attribute | Description | Required |
| -------------------|--------------------------------------------------|-----------|
| baseUrl | Api's url | mandatory |
| headers | Map of headers (name/value) | optional |
| header | Single header as a name and a value | optional |
| httpClient | Java HttpClient object | optional |
| requestInterceptor | Function to modify the request once is built | optional |
| eventsToRead | List of events's name we want to read in streams | optional |
| eventToRead | An event's name we want to read in streams | optional |
| endsOfStream | List of texts used to mark the end of streams | optional |
| endOfStream | Text used to mark the end of streams | optional |

The attributes ```event(s)ToRead``` and ```end(s)OfStream``` are required when you have endpoints sending back streams of data (Server Sent Events - SSE).

Example:

```java
final var BASE_URL = "https://api.example.com";
final var HEADER_NAME = "Authorization";
final var HEADER_VALUE = "Bearer qwertyasdfghzxcvb";
final var EVENT_TO_READ = "inventory";
final var END_OF_STREAM = "[DONE]";

var httpClient = HttpClient.newBuilder()
Expand All @@ -132,6 +136,7 @@ var cleverClient = CleverClient.builder()
request.setUrl(url);
return request;
})
.eventToRead(EVENT_TO_READ)
.endOfStream(END_OF_STREAM)
.build();
```
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.github.sashirestela.cleverclient;

import java.net.http.HttpClient;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.UnaryOperator;
Expand All @@ -10,7 +12,7 @@

import io.github.sashirestela.cleverclient.http.HttpProcessor;
import io.github.sashirestela.cleverclient.http.HttpRequestData;
import io.github.sashirestela.cleverclient.support.CleverClientSSE;
import io.github.sashirestela.cleverclient.support.Configurator;
import io.github.sashirestela.cleverclient.util.CommonUtil;
import lombok.Builder;
import lombok.Getter;
Expand All @@ -26,7 +28,6 @@
public class CleverClient {
private static final Logger logger = LoggerFactory.getLogger(CleverClient.class);

@NonNull
private final String baseUrl;
private final Map<String, String> headers;
private final HttpClient httpClient;
Expand All @@ -48,18 +49,22 @@ public class CleverClient {
*/
@Builder
public CleverClient(@NonNull String baseUrl, @Singular Map<String, String> headers, HttpClient httpClient,
UnaryOperator<HttpRequestData> requestInterceptor, String endOfStream) {
UnaryOperator<HttpRequestData> requestInterceptor, @Singular("eventToRead") List<String> eventsToRead,
@Singular("endOfStream") List<String> endsOfStream) {
this.baseUrl = baseUrl;
this.headers = Optional.ofNullable(headers).orElse(Map.of());
this.httpClient = Optional.ofNullable(httpClient).orElse(HttpClient.newHttpClient());
this.requestInterceptor = requestInterceptor;
CleverClientSSE.setEndOfStream(endOfStream);
this.httpProcessor = HttpProcessor.builder()
.baseUrl(this.baseUrl)
.headers(CommonUtil.mapToListOfString(this.headers))
.httpClient(this.httpClient)
.requestInterceptor(this.requestInterceptor)
.build();
Configurator.builder()
.eventsToRead(Optional.ofNullable(eventsToRead).orElse(Arrays.asList()))
.endsOfStream(Optional.ofNullable(endsOfStream).orElse(Arrays.asList()))
.build();
logger.debug("CleverClient has been created.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ public <S, T> Object sendRequest(HttpClient httpClient, HttpRequest httpRequest,

throwExceptionIfErrorIsPresent(response, Stream.class);

final var record = new CleverClientSSE.LineRecord();

return response.body()
.peek(rawData -> logger.debug("Response : {}", rawData))
.map(CleverClientSSE::new)
.peek(line -> logger.debug("Response : {}", line))
.peek(line -> record.updateWith(line))
.map(line -> new CleverClientSSE(record))
.filter(CleverClientSSE::isActualData)
.map(event -> JsonUtil.jsonToObject(event.getActualData(), responseClass));
.map(item -> JsonUtil.jsonToObject(item.getActualData(), responseClass));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ public <S, T> Object sendRequest(HttpClient httpClient, HttpRequest httpRequest,

throwExceptionIfErrorIsPresent(httpResponse, Stream.class);

final var record = new CleverClientSSE.LineRecord();

return httpResponse.body()
.peek(rawData -> logger.debug("Response : {}", rawData))
.map(CleverClientSSE::new)
.peek(line -> logger.debug("Response : {}", line))
.peek(line -> record.updateWith(line))
.map(line -> new CleverClientSSE(record))
.filter(CleverClientSSE::isActualData)
.map(event -> JsonUtil.jsonToObject(event.getActualData(), responseClass));
.map(item -> JsonUtil.jsonToObject(item.getActualData(), responseClass));

} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

public class CleverClientException extends RuntimeException {

public CleverClientException(String message) {
super(message);
}

public CleverClientException(String message, Object... parameters) {
super(MessageFormat.format(message, Arrays.copyOfRange(parameters, 0, parameters.length - 1)),
(Throwable) parameters[parameters.length - 1]);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,72 @@
package io.github.sashirestela.cleverclient.support;

import java.util.List;
import java.util.stream.Collectors;

public class CleverClientSSE {

private static final String EVENT_HEADER = "event: ";
private static final String DATA_HEADER = "data: ";
private static final String SEPARATOR = "";

private static List<String> linesToCheck = null;

private static String endOfStream = null;
private LineRecord record;
private List<String> eventsToRead;
private List<String> endsOfStream;

private String rawData;
public CleverClientSSE(LineRecord record) {
this.record = record;
this.eventsToRead = Configurator.one().getEventsToRead();
this.endsOfStream = Configurator.one().getEndsOfStream();

public CleverClientSSE(String rawData) {
this.rawData = rawData;
if (linesToCheck == null) {
linesToCheck = this.eventsToRead.stream().filter(etr -> !etr.isEmpty()).map(etr -> (EVENT_HEADER + etr))
.collect(Collectors.toList());
linesToCheck.add(SEPARATOR);
}
}

public String getRawData() {
return rawData;
public LineRecord getRecord() {
return record;
}

public boolean isActualData() {
return rawData.startsWith(DATA_HEADER) && (endOfStream == null || !rawData.contains(endOfStream));
return linesToCheck.contains(record.previous()) && record.current().startsWith(DATA_HEADER)
&& endsOfStream.stream().anyMatch(eos -> !record.current().contains(eos));
}

public String getActualData() {
return rawData.replace(DATA_HEADER, "").strip();
return record.current().replace(DATA_HEADER, "").strip();
}

public static void setEndOfStream(String endOfStream) {
CleverClientSSE.endOfStream = endOfStream;
public static class LineRecord {

private String currentLine;
private String previousLine;

public LineRecord(String previousLine, String currentLine) {
this.previousLine = previousLine;
this.currentLine = currentLine;
}

public LineRecord() {
this("", "");
}

public void updateWith(String line) {
this.previousLine = this.currentLine;
this.currentLine = line;
}

public String current() {
return this.currentLine;
}

public String previous() {
return this.previousLine;
}

}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.github.sashirestela.cleverclient.support;

import java.util.List;

import lombok.Builder;
import lombok.Getter;
import lombok.Singular;

@Getter
public class Configurator {

private static Configurator configurator;

private List<String> eventsToRead;
private List<String> endsOfStream;

private Configurator() {
}

@Builder
public Configurator(@Singular("eventToRead") List<String> eventsToRead,
@Singular("endOfStream") List<String> endsOfStream) {
if (configurator != null) {
return;
}
configurator = new Configurator();
configurator.eventsToRead = eventsToRead;
configurator.endsOfStream = endsOfStream;
}

public static Configurator one() {
if (configurator == null) {
throw new CleverClientException("You have to call Configurator.builder() first.");
}
return configurator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,49 @@

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

import io.github.sashirestela.cleverclient.support.CleverClientSSE.LineRecord;

class CleverClientSSETest {

@Test
void shouldReturnExpectedValueWhenRawDataHasDifferentValues() {
Object[][] testData = {
{ new CleverClientSSE("data: This is the actual data."), true },
{ new CleverClientSSE("data : This is the actual data."), false },
{ new CleverClientSSE("\n"), false },
{ new CleverClientSSE(""), false }
};
for (Object[] data : testData) {
CleverClientSSE event = (CleverClientSSE) data[0];
boolean actualCondition = event.isActualData();
boolean expectedCondition = (boolean) data[1];
assertEquals(expectedCondition, actualCondition);
}
@BeforeAll
static void setup() {
Configurator.builder()
.eventToRead("process")
.endOfStream("END")
.build();
}

@Test
void shouldReturnExpectedValueWhenRawDataHasDifferentValuesAndAEndOfStreamIsSetted() {
CleverClientSSE.setEndOfStream("END");
void shouldReturnExpectedValueWhenRawDataHasDifferentValues() {
Object[][] testData = {
{ new CleverClientSSE("data: This is the actual data."), true },
{ new CleverClientSSE("data: END"), false }
{ new CleverClientSSE(new LineRecord("event: process", "data: This is the actual data.")), true },
{ new CleverClientSSE(new LineRecord("", "data: This is the actual data.")), true },
{ new CleverClientSSE(new LineRecord("event: other", "data: This is the actual data.")), false },
{ new CleverClientSSE(new LineRecord("event: process", "data : This is the actual data.")), false },
{ new CleverClientSSE(new LineRecord("", "data : This is the actual data.")), false },
{ new CleverClientSSE(new LineRecord("", "\n")), false },
{ new CleverClientSSE(new LineRecord("", "")), false },
{ new CleverClientSSE(new LineRecord("event: process", "data: END")), false },
{ new CleverClientSSE(new LineRecord("", "data: END")), false }
};
for (Object[] data : testData) {
CleverClientSSE event = (CleverClientSSE) data[0];
boolean actualCondition = event.isActualData();
boolean expectedCondition = (boolean) data[1];
var event = (CleverClientSSE) data[0];
var actualCondition = event.isActualData();
var expectedCondition = (boolean) data[1];
assertEquals(expectedCondition, actualCondition);
}
}

@Test
@SuppressWarnings("unused")
void shouldReturnTheActualDataWhenRawDataMeetsConditions() {
CleverClientSSE event = new CleverClientSSE("data: This is the actual data. ");
String rawData = event.getRawData();
String actualData = event.getActualData();
String expectedData = "This is the actual data.";
CleverClientSSE event = new CleverClientSSE(new LineRecord("event: process", "data: This is the actual data. "));
var rawData = event.getRecord();
var actualData = event.getActualData();
var expectedData = "This is the actual data.";
assertEquals(expectedData, actualData);
}
}
Loading