Skip to content

Commit 312a7b1

Browse files
authored
Merge pull request #30 from delphi-hub/feature/eventStreaming
Introduced event system
2 parents 3b30246 + 5309d72 commit 312a7b1

File tree

10 files changed

+582
-84
lines changed

10 files changed

+582
-84
lines changed

OpenAPISpecification.yaml

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ paths:
6767
operationId: deleteInstance
6868
parameters:
6969
- in: query
70-
name: InstanceID
70+
name: Id
7171
description: The ID of the instance to be deregistered.
7272
required: true
7373
type: integer
@@ -185,7 +185,7 @@ paths:
185185
operationId: matchInstance
186186
parameters:
187187
- in: query
188-
name: InstanceID
188+
name: Id
189189
description: The ID of the instance that the sender was matched to.
190190
required: true
191191
type: integer
@@ -202,6 +202,30 @@ paths:
202202
description: Invalid ID supplied
203203
'404':
204204
description: No match found
205+
/eventList:
206+
get:
207+
tags:
208+
- Basic Operations
209+
summary: Gets the list of events associated to the specified instance
210+
description: >-
211+
This command retrieves a list of events that are associated to the instance with the specified id.
212+
operationId: eventList
213+
parameters:
214+
- name: Id
215+
in: query
216+
description: Id of the instance
217+
required: true
218+
type: integer
219+
format: int64
220+
responses:
221+
'200':
222+
description: List of events for the specified instance
223+
schema:
224+
type: array
225+
items:
226+
$ref: '#/definitions/Event'
227+
'404':
228+
description: Instance not found
205229
/deploy:
206230
post:
207231
tags:
@@ -272,8 +296,10 @@ paths:
272296
- Docker Operations
273297
summary: Reports the manual stop of an instances to the registry
274298
description: >-
275-
This command informs the registry about an instance that was stopped manually, meaning not via calling /stop on the instance registry. This is only applicable to instances
276-
running inside a docker container, as non-container instances would deregister themselves when stopped.
299+
This command informs the registry about an instance that was stopped
300+
manually, meaning not via calling /stop on the instance registry. This
301+
is only applicable to instances running inside a docker container, as
302+
non-container instances would deregister themselves when stopped.
277303
operationId: reportStop
278304
parameters:
279305
- in: query
@@ -470,6 +496,23 @@ paths:
470496
'500':
471497
description: Internal server error
472498
definitions:
499+
Event:
500+
type: object
501+
required:
502+
- eventType
503+
- payload
504+
properties:
505+
eventType:
506+
type: string
507+
description: Valid types for events
508+
example: NumbersChangedEvent
509+
enum:
510+
- NumbersChangedEvent
511+
- InstanceAddedEvent
512+
- InstanceRemovedEvent
513+
- StateChangedEvent
514+
payload:
515+
type: object
473516
Instance:
474517
type: object
475518
required:

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
1919
libraryDependencies += "org.parboiled" %% "parboiled" % "2.1.4"
2020

2121

22+
2223
lazy val registry = (project in file(".")).
2324
enablePlugins(JavaAppPackaging).
2425
enablePlugins(DockerPlugin).

src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
akka.http.server.websocket.periodic-keep-alive-max-idle = 10 seconds

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ import akka.pattern.ask
55
import akka.util.Timeout
66
import de.upb.cs.swt.delphi.instanceregistry.Docker.DockerActor._
77
import de.upb.cs.swt.delphi.instanceregistry.Docker.{DockerActor, DockerConnection}
8+
import akka.stream.scaladsl.{Keep, Sink, Source}
9+
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
810
import de.upb.cs.swt.delphi.instanceregistry.daos.{DynamicInstanceDAO, InstanceDAO}
911
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState}
10-
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.{Instance, InstanceEnums}
12+
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model._
1113

1214
import scala.concurrent.duration._
1315
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -17,13 +19,17 @@ import scala.util.{Failure, Success, Try}
1719
class RequestHandler(configuration: Configuration, connection: DockerConnection) extends AppLogging {
1820

1921

22+
2023
implicit val system: ActorSystem = Registry.system
24+
implicit val materializer : Materializer = ActorMaterializer()
2125
implicit val ec: ExecutionContext = system.dispatcher
2226

23-
val dockerActor: ActorRef = system.actorOf(DockerActor.props(connection))
24-
2527
private[instanceregistry] val instanceDao: InstanceDAO = new DynamicInstanceDAO(configuration)
2628

29+
val (eventActor, eventPublisher) = Source.actorRef[RegistryEvent](0, OverflowStrategy.dropNew)
30+
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
31+
.run()
32+
val dockerActor: ActorRef = system.actorOf(DockerActor.props(connection))
2733

2834
def initialize(): Unit = {
2935
log.info("Initializing request handler...")
@@ -35,7 +41,8 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
3541
log.info("Done initializing request handler.")
3642
}
3743

38-
def shutdown(): Unit = {
44+
def shutdown() : Unit = {
45+
eventActor ! PoisonPill
3946
instanceDao.shutdown()
4047
}
4148

@@ -56,7 +63,10 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
5663
dockerId = None, instanceState = InstanceState.Running)
5764

5865
instanceDao.addInstance(newInstance) match {
59-
case Success(_) => Success(newID)
66+
case Success(_) =>
67+
fireNumbersChangedEvent(newInstance.componentType)
68+
fireInstanceAddedEvent(newInstance)
69+
Success(newID)
6070
case Failure(x) => Failure(x)
6171
}
6272
}
@@ -74,7 +84,10 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
7484
} else if (isInstanceDockerContainer(instanceId)) {
7585
OperationResult.IsDockerContainer
7686
} else {
87+
val instanceToRemove = instanceDao.getInstance(instanceId).get
88+
fireInstanceRemovedEvent(instanceToRemove)
7789
instanceDao.removeInstance(instanceId)
90+
fireNumbersChangedEvent(instanceToRemove.componentType)
7891
OperationResult.Ok
7992
}
8093
}
@@ -87,6 +100,10 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
87100
instanceDao.allInstances().count(i => i.componentType == compType)
88101
}
89102

103+
def getEventList(id: Long) : Try[List[RegistryEvent]] = {
104+
instanceDao.getEventsFor(id)
105+
}
106+
90107
def getMatchingInstanceOfType(compType: ComponentType): Try[Instance] = {
91108
log.info(s"Trying to match to instance of type $compType ...")
92109
getNumberOfInstances(compType) match {
@@ -148,8 +165,10 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
148165
instanceDao.addMatchingResult(id, result)
149166
if (result && instance.instanceState == InstanceState.NotReachable) {
150167
instanceDao.setStateFor(instance.id.get, InstanceState.Running)
168+
fireStateChangedEvent(instanceDao.getInstance(id).get)
151169
} else if (!result && instance.instanceState == InstanceState.Running) {
152170
instanceDao.setStateFor(instance.id.get, InstanceState.NotReachable)
171+
fireStateChangedEvent(instanceDao.getInstance(id).get)
153172
}
154173
log.info(s"Applied matching result $result to instance with id $id.")
155174
OperationResult.Ok
@@ -169,17 +188,20 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
169188
deployResult match {
170189
case Failure(ex) =>
171190
log.error(s"Failed to deploy container, docker host not reachable.")
191+
fireDockerOperationErrorEvent(None, s"Deploy failed with message: ${ex.getMessage}")
172192
Failure(new RuntimeException(s"Failed to deploy container, docker host not reachable (${ex.getMessage})."))
173193
case Success((dockerId, host, port)) =>
174194
val normalizedHost = host.substring(1,host.length - 1)
175195
log.info(s"Deployed new container with id $dockerId, host $normalizedHost and port $port.")
176196

177-
val newInstance = Instance(Some(newId), normalizedHost, port, name.getOrElse(s"Generic $componentType"), componentType, Some(dockerId), InstanceState.Stopped)
197+
val newInstance = Instance(Some(newId), normalizedHost, port, name.getOrElse(s"Generic $componentType"), componentType, Some(dockerId), InstanceState.Deploying)
178198
log.info(s"Registering instance $newInstance....")
179199

180200
instanceDao.addInstance(newInstance) match {
181201
case Success(_) =>
182202
log.info("Successfully registered.")
203+
fireInstanceAddedEvent(newInstance)
204+
fireNumbersChangedEvent(newInstance.componentType)
183205
Success(newId)
184206
case Failure(x) =>
185207
log.info(s"Failed to register. Exception: $x")
@@ -213,6 +235,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
213235
instanceDao.setStateFor(instance.id.get, InstanceState.Running)
214236
}
215237
log.info(s"Instance with id $id has reported start.")
238+
fireStateChangedEvent(instanceDao.getInstance(id).get)
216239
OperationResult.Ok
217240
}
218241
}
@@ -245,6 +268,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
245268
instanceDao.setStateFor(instance.id.get, InstanceState.NotReachable)
246269
}
247270
log.info(s"Instance with id $id has reported stop.")
271+
fireStateChangedEvent(instanceDao.getInstance(id).get)
248272
OperationResult.Ok
249273
}
250274
}
@@ -277,6 +301,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
277301
instanceDao.setStateFor(instance.id.get, InstanceState.Failed)
278302
}
279303
log.info(s"Instance with id $id has reported failure.")
304+
fireStateChangedEvent(instanceDao.getInstance(id).get)
280305
OperationResult.Ok
281306
}
282307

@@ -303,9 +328,11 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
303328
(dockerActor ? pause(instance.dockerId.get)).map{
304329
_ => log.info(s"Instance $id paused.")
305330
instanceDao.setStateFor(instance.id.get, InstanceState.Paused)
331+
fireStateChangedEvent(instanceDao.getInstance(id).get)
306332
}.recover {
307333
case ex: Exception =>
308334
log.warning(s"Failed to pause container with id $id. Message is: ${ex.getMessage}")
335+
fireDockerOperationErrorEvent(Some(instance), s"Pause failed with message: ${ex.getMessage}")
309336
}
310337

311338
OperationResult.Ok
@@ -336,9 +363,11 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
336363
(dockerActor ? unpause(instance.dockerId.get)).map{
337364
_ => log.info(s"Instance $id resumed.")
338365
instanceDao.setStateFor(instance.id.get, InstanceState.Running)
366+
fireStateChangedEvent(instanceDao.getInstance(id).get)
339367
}.recover {
340368
case ex: Exception =>
341369
log.warning(s"Failed to resume container with id $id. Message is: ${ex.getMessage}")
370+
fireDockerOperationErrorEvent(Some(instance), s"Resume failed with message: ${ex.getMessage}")
342371
}
343372

344373
OperationResult.Ok
@@ -371,9 +400,11 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
371400
(dockerActor ? stop(instance.dockerId.get)).map{
372401
_ => log.info(s"Instance $id stopped.")
373402
instanceDao.setStateFor(instance.id.get, InstanceState.Stopped)
403+
fireStateChangedEvent(instance)
374404
}.recover {
375405
case ex: Exception =>
376406
log.warning(s"Failed to stop container with id $id. Message is: ${ex.getMessage}")
407+
fireDockerOperationErrorEvent(Some(instance), s"Stop failed with message: ${ex.getMessage}")
377408
}
378409

379410
OperationResult.Ok
@@ -405,6 +436,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
405436
}.recover {
406437
case ex: Exception =>
407438
log.warning(s"Failed to start container with id $id. Message is: ${ex.getMessage}")
439+
fireDockerOperationErrorEvent(Some(instance), s"Start failed with message: ${ex.getMessage}")
408440
}
409441

410442
OperationResult.Ok
@@ -436,15 +468,18 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
436468

437469
(dockerActor ? delete(instance.dockerId.get)).map{
438470
_ => log.info(s"Container for instance $id deleted.")
439-
instanceDao.setStateFor(instance.id.get, InstanceState.Stopped)
440471
}.recover {
441472
case ex: Exception =>
442473
log.warning(s"Failed to delete container for instance with id $id. Message is: ${ex.getMessage}")
474+
fireDockerOperationErrorEvent(Some(instance), s"Delete failed with message: ${ex.getMessage}")
443475
}
444476

445477
//Delete data either way
446478
instanceDao.removeInstance(id) match {
447-
case Success(_) => OperationResult.Ok
479+
case Success(_) =>
480+
fireNumbersChangedEvent(instance.componentType)
481+
fireInstanceRemovedEvent(instance)
482+
OperationResult.Ok
448483
case Failure(_) => OperationResult.InternalError
449484
}
450485
} else {
@@ -462,7 +497,7 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
462497
instanceDao.getInstance(id)
463498
}
464499

465-
def instanceHasState(id: Long, state: InstanceEnums.State): Boolean = {
500+
def instanceHasState(id: Long, state: InstanceState): Boolean = {
466501
instanceDao.getInstance(id) match {
467502
case Some(instance) => instance.instanceState == state
468503
case None => false
@@ -473,6 +508,36 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
473508
instanceDao.getDockerIdFor(id).isSuccess
474509
}
475510

511+
private def fireNumbersChangedEvent(componentType: ComponentType): Unit = {
512+
val newNumber = instanceDao.getInstancesOfType(componentType).size
513+
eventActor ! RegistryEventFactory.createNumbersChangedEvent(componentType, newNumber)
514+
}
515+
516+
private def fireInstanceAddedEvent(addedInstance: Instance): Unit = {
517+
val event = RegistryEventFactory.createInstanceAddedEvent(addedInstance)
518+
eventActor ! event
519+
instanceDao.addEventFor(addedInstance.id.get, event)
520+
}
521+
522+
private def fireInstanceRemovedEvent(removedInstance: Instance): Unit = {
523+
//Do not add removed event, instance will not be present in DAO anymore
524+
eventActor ! RegistryEventFactory.createInstanceRemovedEvent(removedInstance)
525+
}
526+
527+
private def fireStateChangedEvent(updatedInstance: Instance): Unit = {
528+
val event = RegistryEventFactory.createStateChangedEvent(updatedInstance)
529+
eventActor ! event
530+
instanceDao.addEventFor(updatedInstance.id.get, event)
531+
}
532+
533+
private def fireDockerOperationErrorEvent(affectedInstance: Option[Instance], errorMessage: String): Unit = {
534+
val event = RegistryEventFactory.createDockerOperationErrorEvent(affectedInstance, errorMessage)
535+
eventActor ! event
536+
if(affectedInstance.isDefined){
537+
instanceDao.addEventFor(affectedInstance.get.id.get, event)
538+
}
539+
}
540+
476541
private def countConsecutivePositiveMatchingResults(id: Long): Int = {
477542
if (!instanceDao.hasInstance(id) || instanceDao.getMatchingResultsFor(id).get.isEmpty) {
478543
0

0 commit comments

Comments
 (0)