Skip to content

Commit

Permalink
Merge pull request #67 from bfncs/refactor/simplify
Browse files Browse the repository at this point in the history
Refactor: remove telescopic builders
  • Loading branch information
SvenKube authored Oct 1, 2021
2 parents 6b291c4 + ea78fca commit af8318d
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 223 deletions.
30 changes: 12 additions & 18 deletions src/main/java/io/retel/ariproxy/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.retel.ariproxy.boundary.callcontext.api.ReportHealth;
import io.retel.ariproxy.boundary.commandsandresponses.AriCommandResponseKafkaProcessor;
import io.retel.ariproxy.boundary.events.WebsocketMessageToProducerRecordTranslator;
import io.retel.ariproxy.boundary.processingpipeline.Run;
import io.retel.ariproxy.health.HealthService;
import io.retel.ariproxy.health.KafkaConnectionCheck;
import io.retel.ariproxy.health.KafkaConnectionCheck.ReportKafkaConnectionHealth;
Expand Down Expand Up @@ -140,14 +139,14 @@ private static void runAriCommandResponseProcessor(
final Sink<ProducerRecord<String, String>, NotUsed> sink =
Producer.plainSink(producerSettings).mapMaterializedValue(done -> NotUsed.getInstance());

AriCommandResponseKafkaProcessor.commandResponseProcessing()
.on(system)
.withHandler(requestAndContext -> Http.get(system).singleRequest(requestAndContext._1))
.withCallContextProvider(callContextProvider)
.withMetricsService(metricsService)
.from(source)
.to(sink)
.run();
AriCommandResponseKafkaProcessor.commandResponseProcessing(
system,
requestAndContext -> Http.get(system).singleRequest(requestAndContext._1),
callContextProvider,
metricsService,
source,
sink)
.run(system);
}

private static void runAriEventProcessor(
Expand Down Expand Up @@ -175,17 +174,12 @@ private static void runAriEventProcessor(
final Sink<ProducerRecord<String, String>, NotUsed> sink =
Producer.plainSink(producerSettings).mapMaterializedValue(done -> NotUsed.getInstance());

final Run processingPipeline =
WebsocketMessageToProducerRecordTranslator.eventProcessing()
.on(system)
.withHandler(applicationReplacedHandler)
.withCallContextProvider(callContextProvider)
.withMetricsService(metricsService)
.from(source)
.to(sink);
final RunnableGraph<NotUsed> processingPipeline =
WebsocketMessageToProducerRecordTranslator.eventProcessing(
system, callContextProvider, metricsService, source, sink, applicationReplacedHandler);

try {
processingPipeline.run();
processingPipeline.run(system);
system.log().debug("Successfully started ari event processor.");
} catch (Exception e) {
system.log().error("Failed to start ari event processor: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import akka.http.javadsl.model.headers.HttpCredentials;
import akka.japi.function.Function;
import akka.japi.function.Procedure;
import akka.stream.*;
import akka.stream.ActorAttributes;
import akka.stream.Attributes;
import akka.stream.Supervision;
import akka.stream.Supervision.Directive;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
Expand All @@ -28,7 +31,6 @@
import com.typesafe.config.ConfigFactory;
import io.retel.ariproxy.boundary.callcontext.api.CallContextProviderMessage;
import io.retel.ariproxy.boundary.commandsandresponses.auxiliary.*;
import io.retel.ariproxy.boundary.processingpipeline.ProcessingPipeline;
import io.retel.ariproxy.metrics.MetricsServiceMessage;
import io.retel.ariproxy.metrics.StopCallSetupTimer;
import io.vavr.Tuple;
Expand Down Expand Up @@ -69,26 +71,8 @@ public class AriCommandResponseKafkaProcessor {
private static final ObjectWriter genericWriter = mapper.writer();
private static final ObjectReader genericReader = mapper.reader();

public static ProcessingPipeline<ConsumerRecord<String, String>, CommandResponseHandler>
commandResponseProcessing() {
return system ->
commandResponseHandler ->
callContextProvider ->
metricsService ->
source ->
sink ->
() ->
run(
system,
commandResponseHandler,
callContextProvider,
metricsService,
source,
sink);
}

private static void run(
final akka.actor.typed.ActorSystem<?> system,
public static RunnableGraph<NotUsed> commandResponseProcessing(
final ActorSystem<?> system,
final CommandResponseHandler commandResponseHandler,
final ActorRef<CallContextProviderMessage> callContextProvider,
final ActorRef<MetricsServiceMessage> metricsService,
Expand All @@ -112,52 +96,54 @@ private static void run(
final String restUser = restConfig.getString(USER);
final String restPassword = restConfig.getString(PASSWORD);

source
.log(">>> ARI COMMAND", ConsumerRecord::value)
.withAttributes(LOG_LEVELS)
.map(AriCommandResponseKafkaProcessor::unmarshallAriCommandEnvelope)
.map(
msgEnvelope -> {
AriCommandResponseProcessing.registerCallContext(
callContextProvider,
final RunnableGraph<NotUsed> notUsedRunnableGraph =
source
.log(">>> ARI COMMAND", ConsumerRecord::value)
.withAttributes(LOG_LEVELS)
.map(AriCommandResponseKafkaProcessor::unmarshallAriCommandEnvelope)
.map(
msgEnvelope -> {
AriCommandResponseProcessing.registerCallContext(
callContextProvider,
msgEnvelope.getCallContext(),
msgEnvelope.getAriCommand(),
system)
.onFailure(
error -> {
throw new IllegalStateException(error);
});
return new CallContextAndCommandRequestContext(
msgEnvelope.getCallContext(),
msgEnvelope.getAriCommand(),
system)
.onFailure(
error -> {
throw new IllegalStateException(error);
});
return new CallContextAndCommandRequestContext(
msgEnvelope.getCallContext(),
msgEnvelope.getCommandId(),
msgEnvelope.getAriCommand());
})
.map(
context ->
Tuple.of(
toHttpRequest(context.getAriCommand(), restUri, restUser, restPassword),
context))
.mapAsync(
1,
requestAndContext ->
commandResponseHandler
.apply(requestAndContext)
.handle(
(response, error) -> {
return Tuple.of(
handleErrorInHTTPResponse(response, error), requestAndContext._2);
}))
.wireTap(Sink.foreach(gatherMetrics(metricsService, stasisApp)))
.mapAsync(1, rawHttpResponseAndContext -> toAriResponse(rawHttpResponseAndContext, system))
.map(
ariResponseAndContext ->
envelopeAriResponseToProducerRecord(
commandsTopic, eventsAndResponsesTopic, ariResponseAndContext))
.log(">>> ARI RESPONSE", ProducerRecord::value)
.withAttributes(LOG_LEVELS)
.toMat(sink, Keep.none())
.withAttributes(ActorAttributes.withSupervisionStrategy(decider))
.run(system);
msgEnvelope.getCommandId(),
msgEnvelope.getAriCommand());
})
.map(
context ->
Tuple.of(
toHttpRequest(context.getAriCommand(), restUri, restUser, restPassword),
context))
.mapAsync(
1,
requestAndContext ->
commandResponseHandler
.apply(requestAndContext)
.handle(
(response, error) -> {
return Tuple.of(
handleErrorInHTTPResponse(response, error), requestAndContext._2);
}))
.wireTap(Sink.foreach(gatherMetrics(metricsService, stasisApp)))
.mapAsync(
1, rawHttpResponseAndContext -> toAriResponse(rawHttpResponseAndContext, system))
.map(
ariResponseAndContext ->
envelopeAriResponseToProducerRecord(
commandsTopic, eventsAndResponsesTopic, ariResponseAndContext))
.log(">>> ARI RESPONSE", ProducerRecord::value)
.withAttributes(LOG_LEVELS)
.toMat(sink, Keep.none())
.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
return notUsedRunnableGraph;
}

private static HttpResponse handleErrorInHTTPResponse(HttpResponse response, Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import io.retel.ariproxy.boundary.commandsandresponses.auxiliary.AriMessageType;
import io.retel.ariproxy.boundary.commandsandresponses.auxiliary.AriResource;
import io.retel.ariproxy.metrics.IncreaseCounter;
import io.retel.ariproxy.metrics.MetricsServiceMessage;
import io.retel.ariproxy.metrics.StartCallSetupTimer;
import io.vavr.collection.List;
import io.vavr.collection.Seq;
import io.vavr.control.Option;
import io.vavr.control.Try;
import java.time.Duration;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
Expand Down Expand Up @@ -179,8 +177,3 @@ public static Option<String> getValueFromMessageByPath(Message message, String p
.toOption();
}
}

@FunctionalInterface
interface MetricsGatherer {
MetricsServiceMessage withCallContextSupplier(Supplier<String> callContextSupplier);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.retel.ariproxy.boundary.events;

import io.retel.ariproxy.metrics.MetricsServiceMessage;
import java.util.function.Supplier;

@FunctionalInterface
interface MetricsGatherer {
MetricsServiceMessage withCallContextSupplier(Supplier<String> callContextSupplier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@
import akka.http.javadsl.model.ws.Message;
import akka.japi.function.Function;
import akka.stream.*;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.retel.ariproxy.boundary.callcontext.api.CallContextProviderMessage;
import io.retel.ariproxy.boundary.callcontext.api.ProviderPolicy;
import io.retel.ariproxy.boundary.commandsandresponses.auxiliary.AriMessageType;
import io.retel.ariproxy.boundary.processingpipeline.ProcessingPipeline;
import io.retel.ariproxy.metrics.MetricsServiceMessage;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -31,30 +31,13 @@ public class WebsocketMessageToProducerRecordTranslator {
private static final Attributes LOG_LEVELS =
Attributes.createLogLevels(Logging.InfoLevel(), Logging.InfoLevel(), Logging.ErrorLevel());

public static ProcessingPipeline<Message, Runnable> eventProcessing() {
return system ->
applicationReplacedHandler ->
callContextProvider ->
metricsService ->
source ->
sink ->
() ->
run(
system,
callContextProvider,
metricsService,
source,
sink,
applicationReplacedHandler);
}

private static void run(
ActorSystem<?> system,
ActorRef<CallContextProviderMessage> callContextProvider,
ActorRef<MetricsServiceMessage> metricsService,
Source<Message, NotUsed> source,
Sink<ProducerRecord<String, String>, NotUsed> sink,
Runnable applicationReplacedHandler) {
public static RunnableGraph<NotUsed> eventProcessing(
final ActorSystem<?> system,
final ActorRef<CallContextProviderMessage> callContextProvider,
final ActorRef<MetricsServiceMessage> metricsService,
final Source<Message, NotUsed> source,
final Sink<ProducerRecord<String, String>, NotUsed> sink,
final Runnable applicationReplacedHandler) {
final Function<Throwable, Supervision.Directive> decider =
t -> {
system.log().error("WebsocketMessageToProducerRecordTranslator stream failed", t);
Expand All @@ -65,7 +48,7 @@ private static void run(
final String commandsTopic = kafkaConfig.getString(COMMANDS_TOPIC);
final String eventsAndResponsesTopic = kafkaConfig.getString(EVENTS_AND_RESPONSES_TOPIC);

source
return source
// .throttle(4 * 13, Duration.ofSeconds(1)) // Note: We die right now for calls/s >= 4.8
.wireTap(
Sink.foreach(msg -> gatherMetrics(msg, metricsService, callContextProvider, system)))
Expand All @@ -82,8 +65,7 @@ private static void run(
.log(">>> ARI EVENT", ProducerRecord::value)
.withAttributes(LOG_LEVELS)
.to(sink)
.withAttributes(ActorAttributes.withSupervisionStrategy(decider))
.run(system);
.withAttributes(ActorAttributes.withSupervisionStrategy(decider));
}

private static void gatherMetrics(
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit af8318d

Please sign in to comment.