Skip to content

Commit

Permalink
Merge pull request #60 from sashirestela/51-add-event-class-for-sse
Browse files Browse the repository at this point in the history
Add Event class for handling SSE
  • Loading branch information
sashirestela authored Mar 22, 2024
2 parents dbdd0de + d016bdb commit 3337bac
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 64 deletions.
28 changes: 15 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Library that makes it easy to use the Java HttpClient to perform http operations
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=sashirestela_cleverclient&metric=alert_status)](https://sonarcloud.io/summary/new_code?id=sashirestela_cleverclient)
[![codecov](https://codecov.io/gh/sashirestela/cleverclient/graph/badge.svg?token=PEYAFW3EWD)](https://codecov.io/gh/sashirestela/cleverclient)
![Maven Central](https://img.shields.io/maven-central/v/io.github.sashirestela/cleverclient)
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/sashirestela/cleverclient/maven.yml)
![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/sashirestela/cleverclient/build_java_maven.yml)

### Table of Contents
- [Description](#-description)
Expand Down Expand Up @@ -162,7 +162,7 @@ var cleverClient = CleverClient.builder()
| PUT | Method | PUT endpoint's url | optional | One |
| DELETE | Method | DELETE endpoint's url | optional | One |
| Multipart | Method | (None) | none | One |
| StreamType | Method | type class and events array | mandatory both | Many |
| StreamType | Method | class type and events array | mandatory both | Many |
| Path | Parameter | Path parameter name in url | mandatory | One |
| Query | Parameter | Query parameter name in url | mandatory | One |
| Query | Parameter | (None for Pojos) | none | One |
Expand All @@ -172,15 +172,15 @@ var cleverClient = CleverClient.builder()
* ```Header``` Used to include more headers (pairs of name and value) at interface or method level. It is possible to have multiple Header annotations for the same target.
* ```GET, POST, PUT, DELETE``` are used to mark the typical http methods (endpoints).
* ```Multipart``` is used to mark an endpoint with a multipart/form-data request. This is required when you need to upload files.
* ```StreamType``` is used with methods whose return type is Stream of Objects. Commonly you will use more than one to indicate what class (type) is related to what events (array of Strings).
* ```StreamType``` is used with methods whose return type is Stream of [Event](./src/main/java/io/github/sashirestela/cleverclient/Event.java). Commonly you will use more than one to indicate what class (type) is related to what events (array of Strings).
* ```Path``` is used to replace the path parameter name in url with the matched method parameter's value.
* ```Query``` is used to add a query parameter to the url in the way: [?]queryValue=parameterValue[&...] for scalar parameters. Also it can be used for POJOs using its properties and values.
* ```Body``` is used to mark a method parameter as the endpoint's payload request, so the request will be application/json at least the endpoint is annotated with Multipart.
* Check the above [Description's example](#-description) or the [Test](https://github.com/sashirestela/cleverclient/tree/main/src/test/java/io/github/sashirestela/cleverclient) folder to see more of these interface annotations in action.

### Supported Response Types

The reponse types are determined from the method responses. We don't need any annotation for that. We have six response types: [Stream](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/Stream.html) of elements, [List](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/List.html) of elements, [Generic](https://docs.oracle.com/javase/tutorial/java/generics/types.html) type, Custom type, [Binary](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/InputStream.html) type, [String](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/String.html) type and Stream of [Object](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Object.html), and all of them can be asynchronous or synchronous. For async responses you have to use the Java class [CompletableFuture](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html).
The reponse types are determined from the method responses. We don't need any annotation for that. We have six response types: [Stream](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/stream/Stream.html) of elements, [List](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/List.html) of elements, [Generic](https://docs.oracle.com/javase/tutorial/java/generics/types.html) type, Custom type, [Binary](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/io/InputStream.html) type, [String](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/String.html) type and Stream of [Event](./src/main/java/io/github/sashirestela/cleverclient/Event.java), and all of them can be asynchronous or synchronous. For async responses you have to use the Java class [CompletableFuture](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/CompletableFuture.html).

| Method's Response Type | Sync/Async | Description |
|------------------------------------|------------|-----------------------------|
Expand All @@ -196,12 +196,14 @@ The reponse types are determined from the method responses. We don't need any an
| InputStream | Sync | Binary type |
| CompletableFuture\<String> | Async | String type |
| String | Sync | String type |
| CompletableFuture<Stream\<Object>> | Async | SSE (*) as Stream of Object |
| Stream\<Object> | Sync | SSE (*) as Stream of Object |
| CompletableFuture<Stream\<Event>> | Async | SSE (*) as Stream of Event |
| Stream\<Event> | Sync | SSE (*) as Stream of Event |

(*) SSE: Server Sent Events

```CompletableFuture<Stream<Object>>``` and ```Stream<Object>``` are used for handling SSE with multiple events and classes.
* ```CompletableFuture<Stream<T>>``` and ```Stream<T>``` are used for handling SSE without events and data of the class ```T``` only.
* ```CompletableFuture<Stream<Event>>``` and ```Stream<Event>``` are used for handling SSE with multiple events and data of different classes.
* The [Event](./src/main/java/io/github/sashirestela/cleverclient/Event.java) class will bring the event name and the data object.

### Interface Default Methods

Expand All @@ -211,6 +213,12 @@ You can create interface default methods to execute special requirements such as
@Resource("/v1/chat/completions")
interface Completions {

@POST
Stream<ChatResponse> createSyncStreamBasic(@Body ChatRequest chatRequest);

@POST
CompletableFuture<Stream<ChatResponse>> createAsyncStreamBasic(@Body ChatRequest chatRequest);

default Stream<ChatResponse> createSyncStream(ChatRequest chatRequest) {
var request = chatRequest.withStream(true);
return createSyncStreamBasic(request);
Expand All @@ -221,12 +229,6 @@ interface Completions {
return createAsyncStreamBasic(request);
}

@POST
Stream<ChatResponse> createSyncStreamBasic(@Body ChatRequest chatRequest);

@POST
CompletableFuture<Stream<ChatResponse>> createAsyncStreamBasic(@Body ChatRequest chatRequest);

}
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>io.github.sashirestela</groupId>
<artifactId>cleverclient</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
<packaging>jar</packaging>

<name>cleverclient</name>
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/github/sashirestela/cleverclient/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.github.sashirestela.cleverclient;

import lombok.Builder;
import lombok.Value;

@Value
@Builder
public class Event {

String name;
Object data;

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.sashirestela.cleverclient.sender;

import io.github.sashirestela.cleverclient.Event;
import io.github.sashirestela.cleverclient.support.CleverClientSSE;
import io.github.sashirestela.cleverclient.support.CleverClientSSE.LineRecord;
import io.github.sashirestela.cleverclient.support.ReturnType;
Expand All @@ -10,7 +11,7 @@
import java.net.http.HttpResponse.BodyHandlers;
import java.util.stream.Stream;

public class HttpAsyncStreamObjectSender extends HttpSender {
public class HttpAsyncStreamEventSender extends HttpSender {

@Override
public Object sendRequest(HttpClient httpClient, HttpRequest httpRequest, ReturnType returnType) {
Expand All @@ -22,17 +23,20 @@ public Object sendRequest(HttpClient httpClient, HttpRequest httpRequest, Return
throwExceptionIfErrorIsPresent(response, Stream.class);

final var lineRecord = new LineRecord();
final var eventsWithHeader = returnType.getClassByEvent().keySet();
final var events = returnType.getClassByEvent().keySet();

return response.body()
.map(line -> {
logger.debug("Response : {}", line);
lineRecord.updateWith(line);
return new CleverClientSSE(lineRecord, eventsWithHeader);
return new CleverClientSSE(lineRecord, events);
})
.filter(CleverClientSSE::isActualData)
.map(item -> JsonUtil.jsonToObject(item.getActualData(),
returnType.getClassByEvent().get(item.getMatchedEvent())));
.map(item -> Event.builder()
.name(item.getMatchedEvent())
.data(JsonUtil.jsonToObject(item.getActualData(),
returnType.getClassByEvent().get(item.getMatchedEvent())))
.build());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public class HttpSenderFactory {

private HttpSenderFactory() {
sendersMap = new EnumMap<>(Category.class);
sendersMap.put(Category.ASYNC_STREAM_OBJECT, HttpAsyncStreamObjectSender::new);
sendersMap.put(Category.ASYNC_STREAM_EVENT, HttpAsyncStreamEventSender::new);
sendersMap.put(Category.ASYNC_STREAM, HttpAsyncStreamSender::new);
sendersMap.put(Category.ASYNC_LIST, HttpAsyncListSender::new);
sendersMap.put(Category.ASYNC_GENERIC, HttpAsyncGenericSender::new);
sendersMap.put(Category.ASYNC_CUSTOM, HttpAsyncCustomSender::new);
sendersMap.put(Category.ASYNC_BINARY, HttpAsyncBinarySender::new);
sendersMap.put(Category.ASYNC_PLAIN_TEXT, HttpAsyncPlainTextSender::new);
sendersMap.put(Category.SYNC_STREAM_OBJECT, HttpSyncStreamObjectSender::new);
sendersMap.put(Category.SYNC_STREAM_EVENT, HttpSyncStreamEventSender::new);
sendersMap.put(Category.SYNC_STREAM, HttpSyncStreamSender::new);
sendersMap.put(Category.SYNC_LIST, HttpSyncListSender::new);
sendersMap.put(Category.SYNC_GENERIC, HttpSyncGenericSender::new);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.sashirestela.cleverclient.sender;

import io.github.sashirestela.cleverclient.Event;
import io.github.sashirestela.cleverclient.support.CleverClientException;
import io.github.sashirestela.cleverclient.support.CleverClientSSE;
import io.github.sashirestela.cleverclient.support.CleverClientSSE.LineRecord;
Expand All @@ -12,7 +13,7 @@
import java.net.http.HttpResponse.BodyHandlers;
import java.util.stream.Stream;

public class HttpSyncStreamObjectSender extends HttpSender {
public class HttpSyncStreamEventSender extends HttpSender {

@Override
public Object sendRequest(HttpClient httpClient, HttpRequest httpRequest, ReturnType returnType) {
Expand All @@ -23,17 +24,20 @@ public Object sendRequest(HttpClient httpClient, HttpRequest httpRequest, Return
throwExceptionIfErrorIsPresent(httpResponse, Stream.class);

final var lineRecord = new LineRecord();
final var eventsWithHeader = returnType.getClassByEvent().keySet();
final var events = returnType.getClassByEvent().keySet();

return httpResponse.body()
.map(line -> {
logger.debug("Response : {}", line);
lineRecord.updateWith(line);
return new CleverClientSSE(lineRecord, eventsWithHeader);
return new CleverClientSSE(lineRecord, events);
})
.filter(CleverClientSSE::isActualData)
.map(item -> JsonUtil.jsonToObject(item.getActualData(),
returnType.getClassByEvent().get(item.getMatchedEvent())));
.map(item -> Event.builder()
.name(item.getMatchedEvent())
.data(JsonUtil.jsonToObject(item.getActualData(),
returnType.getClassByEvent().get(item.getMatchedEvent())))
.build());

} catch (IOException | InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,42 @@

public class CleverClientSSE {

public static final String EVENT_HEADER = "event: ";
private static final String EVENT_HEADER = "event: ";
private static final String DATA_HEADER = "data: ";
private static final String SEPARATOR = "";

private LineRecord lineRecord;
private List<String> endsOfStream;
private Set<String> eventsWithHeader;
private Set<String> events;

public CleverClientSSE(LineRecord lineRecord) {
this.lineRecord = lineRecord;
this.endsOfStream = Configurator.one().getEndsOfStream();
this.eventsWithHeader = Set.of(SEPARATOR);
this.events = Set.of(SEPARATOR);
}

public CleverClientSSE(LineRecord lineRecord, Set<String> eventsWithHeader) {
public CleverClientSSE(LineRecord lineRecord, Set<String> events) {
this.lineRecord = lineRecord;
this.endsOfStream = Configurator.one().getEndsOfStream();
this.eventsWithHeader = eventsWithHeader;
this.events = events;
}

public boolean isActualData() {
return eventsWithHeader.contains(lineRecord.previous()) && lineRecord.current().startsWith(DATA_HEADER)
return isMatchedEvent() && lineRecord.current().startsWith(DATA_HEADER)
&& endsOfStream.stream().anyMatch(eos -> !lineRecord.current().contains(eos));
}

public String getActualData() {
return lineRecord.current().replace(DATA_HEADER, "").strip();
}

private boolean isMatchedEvent() {
return events.stream()
.anyMatch(ev -> lineRecord.previous().equals((ev.equals(SEPARATOR) ? SEPARATOR : EVENT_HEADER + ev)));
}

public String getMatchedEvent() {
return eventsWithHeader.contains(lineRecord.previous()) ? lineRecord.previous() : null;
return isMatchedEvent() ? lineRecord.previous().replace(EVENT_HEADER, "").strip() : null;
}

public static class LineRecord {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ReturnType {
private static final String LIST = "java.util.List";
private static final String INPUTSTREAM = "java.io.InputStream";
private static final String STRING = "java.lang.String";
private static final String OBJECT = "java.lang.Object";
private static final String EVENT = "io.github.sashirestela.cleverclient.Event";

private static final String REGEX = "[<>]";
private static final String JAVA_PCK = "java";
Expand All @@ -38,6 +38,10 @@ public ReturnType(String fullClassName) {

public ReturnType(Method method) {
this(method.getGenericReturnType().getTypeName());
setClassByEventIfExists(method);
}

private void setClassByEventIfExists(Method method) {
if (method.isAnnotationPresent(StreamType.List.class)) {
this.classByEvent = calculateClassByEvent(
method.getDeclaredAnnotationsByType(StreamType.List.class)[0].value());
Expand All @@ -51,7 +55,7 @@ private Map<String, Class<?>> calculateClassByEvent(StreamType[] streamTypeList)
Map<String, Class<?>> map = new ConcurrentHashMap<>();
Arrays.stream(streamTypeList).forEach(streamType -> {
Arrays.stream(streamType.events())
.forEach(event -> map.put(CleverClientSSE.EVENT_HEADER + event, streamType.type()));
.forEach(event -> map.put(event, streamType.type()));
});
return map;
}
Expand Down Expand Up @@ -96,8 +100,8 @@ public Category category() {

private Category asyncCategory() {
if (isStream()) {
if (isObject()) {
return Category.ASYNC_STREAM_OBJECT;
if (isEvent()) {
return Category.ASYNC_STREAM_EVENT;
} else {
return Category.ASYNC_STREAM;
}
Expand All @@ -118,8 +122,8 @@ private Category asyncCategory() {

private Category syncCategory() {
if (isStream()) {
if (isObject()) {
return Category.SYNC_STREAM_OBJECT;
if (isEvent()) {
return Category.SYNC_STREAM_EVENT;
} else {
return Category.SYNC_STREAM;
}
Expand Down Expand Up @@ -156,7 +160,7 @@ private boolean isGeneric() {
}

private boolean isCustom() {
return !isInputStream() && !isString() && !isObject() && (size == 1 || (size == 2 && isAsync()));
return !isInputStream() && !isString() && !isEvent() && (size == 1 || (size == 2 && isAsync()));
}

private boolean isBinary() {
Expand All @@ -175,19 +179,19 @@ private boolean isString() {
return STRING.equals(returnTypeArray[lastIndex]);
}

private boolean isObject() {
return OBJECT.equals(returnTypeArray[lastIndex]);
private boolean isEvent() {
return EVENT.equals(returnTypeArray[lastIndex]);
}

public enum Category {
ASYNC_STREAM_OBJECT,
ASYNC_STREAM_EVENT,
ASYNC_STREAM,
ASYNC_LIST,
ASYNC_GENERIC,
ASYNC_CUSTOM,
ASYNC_BINARY,
ASYNC_PLAIN_TEXT,
SYNC_STREAM_OBJECT,
SYNC_STREAM_EVENT,
SYNC_STREAM,
SYNC_LIST,
SYNC_GENERIC,
Expand Down
Loading

0 comments on commit 3337bac

Please sign in to comment.