Skip to content

Commit

Permalink
Adding websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashirestela committed Jan 22, 2025
1 parent c47a83a commit b41d8d5
Show file tree
Hide file tree
Showing 9 changed files with 738 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.github.sashirestela.cleverclient.example;

import io.github.sashirestela.cleverclient.websocket.JavaHttpWebSocketAdapter;
import io.github.sashirestela.cleverclient.websocket.WebSocketAdapter;

import java.util.Map;

public class WebSocketExample {

protected WebSocketAdapter webSocketAdapter;

public WebSocketExample() {
this.webSocketAdapter = new JavaHttpWebSocketAdapter();
}

public void run() {
final String BASE_URL = "wss://s13970.blr1.piesocket.com/v3/1?api_key=" + System.getenv("PIESOCKET_API_KEY")
+ "&notify_self=1";

webSocketAdapter.onOpen(() -> System.out.println("Connected"));
webSocketAdapter.onMessage(message -> System.out.println("Received: " + message));
webSocketAdapter.onClose((code, message) -> System.out.println("Closed"));

webSocketAdapter.connect(BASE_URL, Map.of()).join();
webSocketAdapter.send("Hello World!").join();
webSocketAdapter.send("Welcome to the Jungle!").join();
webSocketAdapter.close();
}

public static void main(String[] args) {
var example = new WebSocketExample();
example.run();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.sashirestela.cleverclient.example;

import io.github.sashirestela.cleverclient.websocket.OkHttpWebSocketAdapter;

public class WebSocketExampleOkHttp extends WebSocketExample {

public WebSocketExampleOkHttp() {
this.webSocketAdapter = new OkHttpWebSocketAdapter();
}

public static void main(String[] args) {
var example = new WebSocketExampleOkHttp();
example.run();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.github.sashirestela.cleverclient.http.HttpResponseData;
import io.github.sashirestela.cleverclient.support.Configurator;
import io.github.sashirestela.cleverclient.util.CommonUtil;
import io.github.sashirestela.cleverclient.websocket.WebSocketAdapter;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -37,6 +38,7 @@ public class CleverClient {
private final UnaryOperator<HttpRequestData> requestInterceptor;
private final UnaryOperator<HttpResponseData> responseInterceptor;
private final HttpClientAdapter clientAdapter;
private final WebSocketAdapter webSockewAdapter;
private final HttpProcessor httpProcessor;

/**
Expand All @@ -49,6 +51,8 @@ public class CleverClient {
* @param responseInterceptor Function to modify the response once it has been received.
* @param clientAdapter Component to call http services. If none is passed the
* JavaHttpClientAdapter will be used. Optional.
* @param webSocketAdapter Component to do web socket interactions. If none is passed the
* JavaHttpWebSocketAdapter will be used. Optional.
* @param endsOfStream Texts used to mark the final of streams when handling server sent
* events (SSE). Optional.
* @param objectMapper Provides Json conversions either to and from objects. Optional.
Expand All @@ -57,8 +61,8 @@ public class CleverClient {
@SuppressWarnings("java:S107")
public CleverClient(@NonNull String baseUrl, @Singular Map<String, String> headers, Consumer<Object> bodyInspector,
UnaryOperator<HttpRequestData> requestInterceptor, UnaryOperator<HttpResponseData> responseInterceptor,
HttpClientAdapter clientAdapter, @Singular("endOfStream") List<String> endsOfStream,
ObjectMapper objectMapper) {
HttpClientAdapter clientAdapter, WebSocketAdapter webSocketAdapter,
@Singular("endOfStream") List<String> endsOfStream, ObjectMapper objectMapper) {
this.baseUrl = baseUrl;
this.headers = Optional.ofNullable(headers).orElse(Map.of());
this.bodyInspector = bodyInspector;
Expand All @@ -67,6 +71,7 @@ public CleverClient(@NonNull String baseUrl, @Singular Map<String, String> heade
this.clientAdapter = Optional.ofNullable(clientAdapter).orElse(new JavaHttpClientAdapter());
this.clientAdapter.setRequestInterceptor(this.requestInterceptor);
this.clientAdapter.setResponseInterceptor(this.responseInterceptor);
this.webSockewAdapter = webSocketAdapter;

this.httpProcessor = HttpProcessor.builder()
.baseUrl(this.baseUrl)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.github.sashirestela.cleverclient.websocket;

@FunctionalInterface
public interface Action {

void execute();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package io.github.sashirestela.cleverclient.websocket;

import io.github.sashirestela.cleverclient.support.CleverClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class JavaHttpWebSocketAdapter implements WebSocketAdapter {

private static final Logger logger = LoggerFactory.getLogger(JavaHttpWebSocketAdapter.class);
private HttpClient httpClient;
private WebSocket webSocket;
private Consumer<String> messageCallback;
private Action openCallback;
private BiConsumer<Integer, String> closeCallback;
private Consumer<Throwable> errorCallback;
private final StringBuilder dataBuffer = new StringBuilder();
private CompletableFuture<Void> sendFuture;
private CompletableFuture<Void> closeFuture;

public JavaHttpWebSocketAdapter(HttpClient httpClient) {
this.httpClient = httpClient;
logger.debug("Created WebSocketAdapter with custom HttpClient");
}

public JavaHttpWebSocketAdapter() {
this.httpClient = HttpClient.newHttpClient();
logger.debug("Created WebSocketAdapter with default HttpClient");
}

@Override
@SuppressWarnings("java:S3776")
public CompletableFuture<Void> connect(String url, Map<String, String> headers) {
logger.info("Connecting to WebSocket URL: {}", url);
logger.debug("Connection headers: {}", headers);

WebSocket.Builder builder = this.httpClient.newWebSocketBuilder();
headers.forEach(builder::header);

CompletableFuture<Void> connectFuture = new CompletableFuture<>();

builder.buildAsync(URI.create(url), new WebSocket.Listener() {

@Override
public void onOpen(WebSocket webSocket) {
JavaHttpWebSocketAdapter.this.webSocket = webSocket;
logger.info("WebSocket connection established");
if (openCallback != null) {
openCallback.execute();
}
connectFuture.complete(null);
webSocket.request(1);
}

@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
logger.trace("Received text data chunk, last={}", last);
dataBuffer.append(data);
if (last) {
if (messageCallback != null) {
String completeMessage = dataBuffer.toString();
logger.debug("Received message: {}", completeMessage);
messageCallback.accept(completeMessage);
}
dataBuffer.setLength(0);
if (sendFuture != null) {
sendFuture.complete(null);
sendFuture = null;
}
}
webSocket.request(1);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
logger.info("WebSocket closing with code: {}, reason: {}", statusCode, reason);
if (closeCallback != null) {
closeCallback.accept(statusCode, reason);
}
if (closeFuture != null) {
closeFuture.complete(null);
}
return CompletableFuture.completedFuture(null);
}

@Override
public void onError(WebSocket webSocket, Throwable error) {
logger.error("WebSocket error occurred", error);
if (errorCallback != null) {
errorCallback.accept(error);
}
if (sendFuture != null) {
sendFuture.completeExceptionally(error);
}
if (closeFuture != null) {
closeFuture.completeExceptionally(error);
}
connectFuture.completeExceptionally(error);
}

});

return connectFuture;
}

@Override
public CompletableFuture<Void> send(String message) {
if (webSocket == null) {
logger.error("Attempt to send message before WebSocket connection is established");
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new CleverClientException("WebSocket is not connected"));
return future;
}

logger.debug("Sending message: {}", message);
sendFuture = new CompletableFuture<>();
webSocket.sendText(message, true);
return sendFuture;
}

@Override
public void close() {
if (webSocket != null) {
logger.info("Initiating WebSocket close");
closeFuture = new CompletableFuture<>();
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "Closing connection");
try {
closeFuture.join();
logger.debug("WebSocket close completed normally");
} catch (Exception e) {
logger.error("Error during WebSocket close", e);
if (errorCallback != null) {
errorCallback.accept(e);
}
}
}
}

@Override
public void onMessage(Consumer<String> callback) {
logger.trace("Registering message callback");
this.messageCallback = callback;
}

@Override
public void onOpen(Action callback) {
logger.trace("Registering open callback");
this.openCallback = callback;
}

@Override
public void onClose(BiConsumer<Integer, String> callback) {
logger.trace("Registering close callback");
this.closeCallback = callback;
}

@Override
public void onError(Consumer<Throwable> callback) {
logger.trace("Registering error callback");
this.errorCallback = callback;
}

}
Loading

0 comments on commit b41d8d5

Please sign in to comment.