Skip to content

Commit

Permalink
Adding websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
sashirestela committed Jan 22, 2025
1 parent 5074c58 commit dec12b0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,29 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class JavaHttpWebSocketAdapter implements WebSocketAdapter {
public class JavaHttpWebSocketAdapter extends WebSocketAdapter {

private static final Logger logger = LoggerFactory.getLogger(JavaHttpWebSocketAdapter.class);
private HttpClient httpClient;
private WebSocket webSocket;
private Consumer<String> messageCallback;
private Action openCallback;
private BiConsumer<Integer, String> closeCallback;
private Consumer<Throwable> errorCallback;
private final StringBuilder dataBuffer = new StringBuilder();
private CompletableFuture<Void> sendFuture;
private CompletableFuture<Void> closeFuture;

public JavaHttpWebSocketAdapter(HttpClient httpClient) {
this.httpClient = httpClient;
logger.debug("Created WebSocketAdapter with custom HttpClient");
logger.debug("Created JavaHttpWebSocketAdapter");
}

public JavaHttpWebSocketAdapter() {
this.httpClient = HttpClient.newHttpClient();
logger.debug("Created WebSocketAdapter with default HttpClient");
this(HttpClient.newHttpClient());
}

@Override
@SuppressWarnings("java:S3776")
public CompletableFuture<Void> connect(String url, Map<String, String> headers) {
logger.info("Connecting to WebSocket URL: {}", url);
logger.debug("Connecting to WebSocket URL: {}", url);
logger.debug("Connection headers: {}", headers);

WebSocket.Builder builder = this.httpClient.newWebSocketBuilder();
Expand All @@ -52,7 +45,7 @@ public CompletableFuture<Void> connect(String url, Map<String, String> headers)
@Override
public void onOpen(WebSocket webSocket) {
JavaHttpWebSocketAdapter.this.webSocket = webSocket;
logger.info("WebSocket connection established");
logger.debug("WebSocket connection established");
if (openCallback != null) {
openCallback.execute();
}
Expand Down Expand Up @@ -82,7 +75,7 @@ public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean

@Override
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
logger.info("WebSocket closing with code: {}, reason: {}", statusCode, reason);
logger.debug("WebSocket closing with code: {}, reason: {}", statusCode, reason);
if (closeCallback != null) {
closeCallback.accept(statusCode, reason);
}
Expand Down Expand Up @@ -130,7 +123,7 @@ public CompletableFuture<Void> send(String message) {
@Override
public void close() {
if (webSocket != null) {
logger.info("Initiating WebSocket close");
logger.debug("Initiating WebSocket close");
closeFuture = new CompletableFuture<>();
webSocket.sendClose(WebSocket.NORMAL_CLOSURE, "Closing connection");
try {
Expand All @@ -145,28 +138,4 @@ public void close() {
}
}

@Override
public void onMessage(Consumer<String> callback) {
logger.trace("Registering message callback");
this.messageCallback = callback;
}

@Override
public void onOpen(Action callback) {
logger.trace("Registering open callback");
this.openCallback = callback;
}

@Override
public void onClose(BiConsumer<Integer, String> callback) {
logger.trace("Registering close callback");
this.closeCallback = callback;
}

@Override
public void onError(Consumer<Throwable> callback) {
logger.trace("Registering error callback");
this.errorCallback = callback;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,25 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class OkHttpWebSocketAdapter implements WebSocketAdapter {
public class OkHttpWebSocketAdapter extends WebSocketAdapter {

private static final Logger logger = LoggerFactory.getLogger(OkHttpWebSocketAdapter.class);
private OkHttpClient okHttpClient;
private WebSocket webSocket;
private Consumer<String> messageCallback;
private Action openCallback;
private BiConsumer<Integer, String> closeCallback;
private Consumer<Throwable> errorCallback;

public OkHttpWebSocketAdapter(OkHttpClient okHttpClient) {
this.okHttpClient = okHttpClient;
logger.debug("Created WebSocketAdapter with custom OkHttpClient");
logger.debug("Created OkHttpWebSocketAdapter");
}

public OkHttpWebSocketAdapter() {
this.okHttpClient = new OkHttpClient();
logger.debug("Created WebSocketAdapter with default OkHttpClient");
this(new OkHttpClient());
}

@Override
public CompletableFuture<Void> connect(String url, Map<String, String> headers) {
logger.info("Connecting to WebSocket URL: {}", url);
logger.debug("Connecting to WebSocket URL: {}", url);
logger.debug("Connection headers: {}", headers);

Request.Builder requestBuilder = new Request.Builder().url(url);
Expand All @@ -47,7 +40,7 @@ public CompletableFuture<Void> connect(String url, Map<String, String> headers)

@Override
public void onOpen(WebSocket webSocket, Response response) {
logger.info("WebSocket connection established with response code: {}", response.code());
logger.debug("WebSocket connection established with response code: {}", response.code());
if (openCallback != null) {
openCallback.execute();
}
Expand All @@ -64,7 +57,7 @@ public void onMessage(WebSocket webSocket, String text) {

@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
logger.info("WebSocket closing with code: {}, reason: {}", code, reason);
logger.debug("WebSocket closing with code: {}, reason: {}", code, reason);
if (closeCallback != null) {
closeCallback.accept(code, reason);
}
Expand Down Expand Up @@ -103,36 +96,12 @@ public CompletableFuture<Void> send(String message) {
@Override
public void close() {
if (webSocket != null) {
logger.info("Initiating WebSocket close");
logger.debug("Initiating WebSocket close");
webSocket.close(1000, "Closing connection");
okHttpClient.dispatcher().executorService().shutdown();
okHttpClient.connectionPool().evictAll();
logger.debug("WebSocket resources cleaned up");
}
}

@Override
public void onMessage(Consumer<String> callback) {
logger.trace("Registering message callback");
this.messageCallback = callback;
}

@Override
public void onOpen(Action callback) {
logger.trace("Registering open callback");
this.openCallback = callback;
}

@Override
public void onClose(BiConsumer<Integer, String> callback) {
logger.trace("Registering close callback");
this.closeCallback = callback;
}

@Override
public void onError(Consumer<Throwable> errorCallback) {
logger.trace("Registering error callback");
this.errorCallback = errorCallback;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,33 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public interface WebSocketAdapter {
public abstract class WebSocketAdapter {

CompletableFuture<Void> connect(String url, Map<String, String> headers);
protected Consumer<String> messageCallback;
protected Action openCallback;
protected BiConsumer<Integer, String> closeCallback;
protected Consumer<Throwable> errorCallback;

CompletableFuture<Void> send(String message);
public abstract CompletableFuture<Void> connect(String url, Map<String, String> headers);

void close();
public abstract CompletableFuture<Void> send(String message);

void onMessage(Consumer<String> callback);
public abstract void close();

void onOpen(Action callback);
public void onMessage(Consumer<String> callback) {
this.messageCallback = callback;
}

void onClose(BiConsumer<Integer, String> callback);
public void onOpen(Action callback) {
this.openCallback = callback;
}

void onError(Consumer<Throwable> callback);
public void onClose(BiConsumer<Integer, String> callback) {
this.closeCallback = callback;
}

public void onError(Consumer<Throwable> errorCallback) {
this.errorCallback = errorCallback;
}

}

0 comments on commit dec12b0

Please sign in to comment.