Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 41 additions & 28 deletions src/main/java/com/ge/predix/eventhub/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class Client implements AutoCloseable {
private Timer healthCheckTimer;
private Thread healthThread;
private CountDownLatch timerStart;
private final StreamObserver<HealthCheckResponse> healthCheckObserver;

protected EventHubLogger ehLogger;
protected PublishClient publishClient;
Expand Down Expand Up @@ -110,26 +111,34 @@ default void onMessage(Object o){


/**
* Creates a new client from the given configuration
* Creates a new client with default health check observer from the given configuration
*
* @param configuration Configuration that has Event Hub details and preferences
*/
public Client(EventHubConfiguration configuration) {
this(configuration, null);
}

/**
* Creates a new client from the given configuration with specified health check observer
*
* @param configuration Configuration that has Event Hub details and preferences
* @param healthCheckObserver health check observer implementation to use. If {@code null}, uses default implementation
*/
public Client(EventHubConfiguration configuration, StreamObserver<HealthCheckResponse> healthCheckObserver) {
this.ehLogger = new EventHubLogger(this.getClass(), configuration);
this.configuration = configuration;
ehLogger.log(Level.INFO, "starting EventHub client");

this.healthCheckObserver = healthCheckObserver != null ? healthCheckObserver : new DefaultHealthCheckObserver() ;
JSONObject mandatoryClientLog = new JSONObject();
mandatoryClientLog.put("sdk_version", sdkVersionString);
mandatoryClientLog.put("zoneID", this.configuration.getZoneID());
mandatoryClientLog.put("runtimeID", this.configuration.getLoggerConfiguration().getRuntimeId());
System.out.println(mandatoryClientLog);

buildChannel();
startHealthChecker();
initSubscribeClient();
initPublishClient();

}

/**
Expand Down Expand Up @@ -187,29 +196,6 @@ private void startHealthChecker() {
MSG_KEY, "starting health checker");
final HealthGrpc.HealthStub healthStub = HealthGrpc.newStub(originChannel);
final HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(healthCheckUrl).build();
final StreamObserver<HealthCheckResponse> observer = new StreamObserver<HealthCheckResponse>() {
public void onNext(HealthCheckResponse healthCheckResponse) {
ehLogger.log( Level.FINEST,
CLIENT_CHANNEL_MSG,
MSG_KEY, "received health check response" + healthCheckResponse);
}

public void onError(Throwable throwable) {
// Extract the Error Status from the cause throwable via
// io.grpc.Status.fromThrowable(throwable) , this helps for debugging
ehLogger.log( Level.WARNING,
CLIENT_CHANNEL_ERROR,
MSG_KEY, "error in health check",
EXCEPTION_KEY,io.grpc.Status.fromThrowable(throwable)
);
}

public void onCompleted() {
ehLogger.log( Level.FINE,
CLIENT_CHANNEL_MSG,
MSG_KEY, "health check channel complete");
}
};

final TimerTask checkHealth = new TimerTask() {
@Override
Expand All @@ -218,7 +204,7 @@ public void run() {
CLIENT_CHANNEL_MSG,
MSG_KEY, "pinging event hub for health check");
try {
healthStub.check(request, observer);
healthStub.check(request, healthCheckObserver);
}
catch(Exception e){
ehLogger.log( Level.SEVERE,
Expand Down Expand Up @@ -688,4 +674,31 @@ public String toString(){
"configuration", configuration.toString()
).toString();
}

/**
* Default health check observer implementation
*/
protected class DefaultHealthCheckObserver implements StreamObserver<HealthCheckResponse> {
public void onNext(HealthCheckResponse healthCheckResponse) {
ehLogger.log( Level.FINEST,
CLIENT_CHANNEL_MSG,
MSG_KEY, "received health check response" + healthCheckResponse);
}

public void onError(Throwable throwable) {
// Extract the Error Status from the cause throwable via
// io.grpc.Status.fromThrowable(throwable) , this helps for debugging
ehLogger.log( Level.WARNING,
CLIENT_CHANNEL_ERROR,
MSG_KEY, "error in health check",
EXCEPTION_KEY, Status.fromThrowable(throwable)
);
}

public void onCompleted() {
ehLogger.log( Level.FINE,
CLIENT_CHANNEL_MSG,
MSG_KEY, "health check channel complete");
}
}
}