Skip to content

Commit a4a4b58

Browse files
authored
Merge pull request #78 from delphi-hub/feature/websockets
Feature/websockets & Angular 7
2 parents 6e2ebc9 + 8bf6355 commit a4a4b58

38 files changed

+2506
-1516
lines changed

.travis.yml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
11
language: scala
22
scala:
33
- 2.12.4
4+
5+
addons:
6+
apt:
7+
sources:
8+
- google-chrome
9+
packages:
10+
- google-chrome-stable
11+
412
before_install:
513
- npm install -g @angular/cli
614
script:
7-
- 'if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then sbt ++$TRAVIS_SCALA_VERSION test; cd client && npm install && ng build --prod && cd .. ; fi'
8-
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then sbt ++$TRAVIS_SCALA_VERSION coverage test coverageReport coverageAggregate codacyCoverage; cd client && npm install && ng build --prod && cd .. ; fi'
15+
- 'if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then sbt ++$TRAVIS_SCALA_VERSION test; cd client && npm install && ng build --prod && npm run test -- --no-progress --browsers=ChromeHeadlessCI && cd .. ; fi'
16+
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then sbt ++$TRAVIS_SCALA_VERSION coverage test coverageReport coverageAggregate codacyCoverage; cd client && npm install && ng build --prod && npm run test -- --no-progress --browsers=ChromeHeadlessCI && cd .. ; fi'
917
after_success:
1018
- 'if [ "$TRAVIS_PULL_REQUEST" = "false" ]; then bash <(curl -s https://codecov.io/bash); fi'

app/actors/ClientSocketActor.scala

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package actors
2+
import akka.actor._
3+
import actors.PublishSocketMessageActor.{AddOutActor, PublishMessage, StopMessage}
4+
import models.EventEnums.EventType
5+
import models.{EventJsonSupport, SocketMessage}
6+
import play.api.Logger
7+
import spray.json._
8+
9+
import scala.collection.mutable.HashSet
10+
import play.api.libs.json._
11+
import play.api.libs.functional.syntax._
12+
13+
object ClientSocketActor {
14+
def props(out: ActorRef, publisher: ActorRef): Props = Props(new ClientSocketActor(out, publisher))
15+
}
16+
17+
class ClientSocketActor(out: ActorRef, publisher: ActorRef) extends Actor with EventJsonSupport {
18+
19+
val myEvents: HashSet[EventType] = HashSet.empty[EventType]
20+
21+
implicit val messageReads: Reads[SocketMessage] = ((JsPath \ "event").read[EventType] and
22+
(JsPath \ "payload").readNullable[String])(SocketMessage.apply _)
23+
implicit val messageWrites: Writes[SocketMessage] = Json.writes[SocketMessage]
24+
25+
override def preStart() {
26+
Logger.debug("pre start called in client" + self)
27+
out ! "successfully registered"
28+
}
29+
30+
override def postStop() {
31+
Logger.debug("post stop called in client" + self)
32+
publisher ! StopMessage(self)
33+
}
34+
35+
def receive: PartialFunction[Any, Unit] = {
36+
case msg: String =>
37+
val json = Json.parse(msg)
38+
val result = json.validate[SocketMessage]
39+
result.fold(
40+
errors => {Logger.error("error parsing message to json" + msg + " with error " + errors)},
41+
socketMsg => {
42+
Logger.debug("successfully parsed socket message" + socketMsg)
43+
if (socketMsg.event == EventType.Heartbeat) {
44+
out ! "Heartbeat"
45+
} else {
46+
publisher ! AddOutActor(self, socketMsg.event)
47+
}
48+
}
49+
)
50+
51+
case SocketMessage(event, payload) =>
52+
Logger.debug("received socket message in client" + SocketMessage)
53+
if (!myEvents.contains(event)) {
54+
myEvents += event
55+
publisher ! AddOutActor(self, event)
56+
}
57+
58+
case PublishMessage(msg) =>
59+
Logger.debug("received publish message in client" + self)
60+
out ! msg.toJson(eventFormat).toString()
61+
}
62+
63+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package actors
2+
import akka.actor._
3+
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
4+
import actors.PublishSocketMessageActor.{AddOutActor, PublishMessage, StopMessage}
5+
import akka.http.scaladsl.Http
6+
import akka.stream.Materializer
7+
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
8+
import models.EventEnums.EventType
9+
import models.{EventJsonSupport, RegistryEvent}
10+
import play.api.Logger
11+
import spray.json._
12+
13+
import scala.collection.mutable
14+
import scala.collection.mutable.ListBuffer
15+
import scala.concurrent.Promise
16+
17+
object PublishSocketMessageActor {
18+
def props(irBasePath: String, mat: Materializer, actorSys: ActorSystem):Props = Props(new PublishSocketMessageActor(irBasePath, mat, actorSys))
19+
final case class AddOutActor(out: ActorRef, event: EventType)
20+
final case class PublishMessage(msg: RegistryEvent)
21+
final case class StopMessage(toStop: ActorRef)
22+
}
23+
24+
class PublishSocketMessageActor(irBasePath: String, mat: Materializer, actorSys: ActorSystem) extends Actor with EventJsonSupport {
25+
26+
val eventActorMap: mutable.HashMap[EventType, ListBuffer[ActorRef]] = new mutable.HashMap[EventType, ListBuffer[ActorRef]]()
27+
28+
override def preStart() {
29+
30+
Logger.debug("pre start called in publisher" + self)
31+
val flow: Flow[Message, Message, Promise[Option[Message]]] =
32+
Flow.fromSinkAndSourceMat(
33+
Sink.foreach[Message]{ msg =>
34+
self ! msg},
35+
Source(List(TextMessage("one"), TextMessage("two")))
36+
.concatMat(Source.maybe[Message])(Keep.right))(Keep.right)
37+
38+
39+
Http()(actorSys).singleWebSocketRequest(
40+
WebSocketRequest("ws://" + irBasePath + "/events"),
41+
flow)(mat)
42+
43+
44+
}
45+
46+
override def postStop() {
47+
Logger.debug("post stop called in publisher" + self)
48+
}
49+
50+
def receive: PartialFunction[Any, Unit] = {
51+
52+
case StopMessage(toStop) =>
53+
Logger.debug("stop received" + toStop)
54+
for ((k, v) <- eventActorMap) v -= toStop
55+
56+
case AddOutActor(out, event) =>
57+
Logger.debug("received add out actor" + out)
58+
if (!eventActorMap.contains(event)){
59+
eventActorMap += (event -> new ListBuffer[ActorRef]())
60+
}
61+
eventActorMap(event) += out
62+
63+
64+
case TextMessage.Strict(msg) =>
65+
Logger.debug("received something " + msg)
66+
val registryEvent = msg.parseJson.convertTo[RegistryEvent](eventFormat)
67+
self ! PublishMessage(registryEvent)
68+
69+
case PublishMessage(msg) =>
70+
Logger.debug("publish message called with message" + msg)
71+
if(eventActorMap.contains(msg.eventType)){
72+
73+
val list = eventActorMap(msg.eventType)
74+
list.foreach(actor => {
75+
Logger.debug("sending message to actor" + actor)
76+
actor ! PublishMessage(msg)
77+
})
78+
}
79+
}
80+
81+
}

app/controllers/InstanceRegistryController.scala

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818

1919
package controllers
2020

21-
import akka.actor.ActorSystem
21+
import akka.actor.{ActorRef, ActorSystem}
2222
import javax.inject.Inject
2323
import play.api.Configuration
2424

2525
import scala.concurrent.ExecutionContext
2626
import play.api.libs.concurrent.CustomExecutionContext
2727
import play.api.libs.ws.WSClient
28-
import play.api.mvc.{Action, AnyContent, BaseController, ControllerComponents}
28+
import akka.stream.Materializer
29+
import play.api.libs.streams.ActorFlow
30+
import actors.{ClientSocketActor, PublishSocketMessageActor}
31+
import play.api.mvc._
2932

3033

3134
trait MyExecutionContext extends ExecutionContext
@@ -35,7 +38,7 @@ trait MyExecutionContext extends ExecutionContext
3538
* which should be used to handle client connections.
3639
* @param system
3740
*/
38-
class MyExecutionContextImpl @Inject()(system: ActorSystem)
41+
class MyExecutionContextImpl @Inject()(implicit system: ActorSystem)
3942
extends CustomExecutionContext(system, "my.executor") with MyExecutionContext
4043

4144
/**
@@ -44,11 +47,19 @@ class MyExecutionContextImpl @Inject()(system: ActorSystem)
4447
* @param controllerComponents
4548
* @param ws
4649
*/
47-
class InstanceRegistryController @Inject()(myExecutionContext: MyExecutionContext,
50+
51+
52+
class InstanceRegistryController @Inject()(implicit system: ActorSystem, mat: Materializer, myExecutionContext: MyExecutionContext,
4853
val controllerComponents: ControllerComponents,
4954
ws: WSClient, config: Configuration)
5055
extends BaseController {
56+
57+
58+
lazy val pubActor: Option[ActorRef] = Some(system.actorOf(PublishSocketMessageActor.props(instanceRegistryBasePath, mat, system), "publish-actor"))
59+
5160
val instanceRegistryUri = config.get[String]("app.instanceRegistryUri")
61+
val instanceRegistryBasePath = config.get[String]("app.instanceRegistryBasePath")
62+
5263
def instances(componentType: String): Action[AnyContent] = Action.async {
5364

5465
ws.url(instanceRegistryUri + "/instances").addQueryStringParameters("ComponentType" -> componentType).get().map { response =>
@@ -58,6 +69,15 @@ class InstanceRegistryController @Inject()(myExecutionContext: MyExecutionContex
5869
}(myExecutionContext)
5970
}
6071

72+
def socket: WebSocket = WebSocket.accept[String, String] {
73+
request => {
74+
ActorFlow.actorRef { out =>
75+
ClientSocketActor.props(out, pubActor.get)
76+
}
77+
}
78+
}
79+
80+
6181
def numberOfInstances(componentType: String) : Action[AnyContent] = Action.async {
6282
// TODO: handle what should happen if the instance registry is not reachable.
6383
// TODO: create constants for the urls
@@ -71,4 +91,4 @@ class InstanceRegistryController @Inject()(myExecutionContext: MyExecutionContex
7191
}(myExecutionContext)
7292
}
7393

74-
}
94+
}

0 commit comments

Comments
 (0)