Skip to content

Commit 528e8cf

Browse files
Merge pull request #54 from delphi-hub/feature/databaseIntegration
The registry now uses an SQL database for storing instance data
2 parents 5f056b6 + 63ccee2 commit 528e8cf

File tree

12 files changed

+878
-257
lines changed

12 files changed

+878
-257
lines changed

build.sbt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,12 @@ lazy val registry = (project in file(".")).
2828
dockerBaseImage := "openjdk:jre-alpine"
2929
)
3030

31+
libraryDependencies ++= List(
32+
"com.typesafe.slick" %% "slick" % "3.2.3",
33+
"com.typesafe.slick" %% "slick-hikaricp" % "3.2.3",
34+
"com.typesafe.slick" %% "slick-codegen" % "3.2.3",
35+
"mysql" % "mysql-connector-java" % "5.1.34",
36+
"org.slf4j" % "slf4j-nop" % "1.6.4"
37+
)
38+
39+
trapExit := false

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Configuration.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package de.upb.cs.swt.delphi.instanceregistry
22

33
import akka.util.Timeout
4+
45
import scala.concurrent.duration.DurationInt
56

67
class Configuration( ) {
@@ -32,6 +33,15 @@ class Configuration( ) {
3233

3334
val dockerOperationTimeout: Timeout = Timeout(20 seconds)
3435

35-
}
36+
//Database configurations
37+
val useInMemoryDB = false
38+
val databaseHost = "jdbc:mysql://localhost/"
39+
val databaseName = ""
40+
val databaseDriver = "com.mysql.jdbc.Driver"
41+
val databaseUsername = ""
42+
val databasePassword = ""
43+
44+
45+
}
3646

3747

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/Registry.scala

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import akka.actor.ActorSystem
44
import akka.stream.ActorMaterializer
55
import de.upb.cs.swt.delphi.instanceregistry.Docker._
66
import de.upb.cs.swt.delphi.instanceregistry.connection.Server
7+
import de.upb.cs.swt.delphi.instanceregistry.daos.{DatabaseInstanceDAO, DynamicInstanceDAO, InstanceDAO}
78

89
import scala.concurrent.ExecutionContext
910
import scala.language.postfixOps
@@ -15,14 +16,24 @@ object Registry extends AppLogging {
1516

1617

1718
val configuration = new Configuration()
18-
val requestHandler = new RequestHandler(configuration, DockerConnection.fromEnvironment())
1919

20+
private val dao : InstanceDAO = {
21+
if (configuration.useInMemoryDB) {
22+
new DynamicInstanceDAO(configuration)
23+
} else {
24+
new DatabaseInstanceDAO(configuration)
25+
}
26+
}
2027

21-
def main(args: Array[String]): Unit = {
28+
private val requestHandler = new RequestHandler(configuration, dao, DockerConnection.fromEnvironment())
29+
30+
private val server: Server = new Server(requestHandler)
2231

32+
33+
def main(args: Array[String]): Unit = {
2334
requestHandler.initialize()
2435
log.info("Starting server ...")
25-
Server.startServer(configuration.bindHost, configuration.bindPort)
36+
server.startServer(configuration.bindHost, configuration.bindPort)
2637
log.info("Shutting down ...")
2738
requestHandler.shutdown()
2839
system.terminate()

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala

Lines changed: 67 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import de.upb.cs.swt.delphi.instanceregistry.Docker.{ContainerAlreadyStoppedExce
1010
import akka.stream.scaladsl.{Keep, Sink, Source}
1111
import akka.stream.{ActorMaterializer, Materializer, OverflowStrategy}
1212
import de.upb.cs.swt.delphi.instanceregistry.connection.RestClient
13-
import de.upb.cs.swt.delphi.instanceregistry.daos.{DynamicInstanceDAO, InstanceDAO}
13+
import de.upb.cs.swt.delphi.instanceregistry.daos.InstanceDAO
1414
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.InstanceEnums.{ComponentType, InstanceState}
1515
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model.LinkEnums.LinkState
1616
import de.upb.cs.swt.delphi.instanceregistry.io.swagger.client.model._
@@ -20,15 +20,14 @@ import scala.concurrent.{Await, ExecutionContext, Future}
2020
import scala.language.postfixOps
2121
import scala.util.{Failure, Success, Try}
2222

23-
class RequestHandler(configuration: Configuration, connection: DockerConnection) extends AppLogging {
23+
class RequestHandler(configuration: Configuration, instanceDao: InstanceDAO, connection: DockerConnection) extends AppLogging {
2424

2525

2626

2727
implicit val system: ActorSystem = Registry.system
2828
implicit val materializer : Materializer = ActorMaterializer()
2929
implicit val ec: ExecutionContext = system.dispatcher
3030

31-
private[instanceregistry] val instanceDao: InstanceDAO = new DynamicInstanceDAO(configuration)
3231

3332
val (eventActor, eventPublisher) = Source.actorRef[RegistryEvent](10, OverflowStrategy.dropNew)
3433
.toMat(Sink.asPublisher(fanout = true))(Keep.both)
@@ -67,20 +66,18 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
6766
* @return Newly assigned ID if successful
6867
*/
6968
def handleRegister(instance: Instance): Try[Long] = {
70-
val newID = generateNextId()
7169

72-
log.info(s"Assigned new id $newID to registering instance with name ${instance.name}.")
73-
74-
val newInstance = Instance(id = Some(newID), name = instance.name, host = instance.host,
70+
val noIdInstance = Instance(id = None, name = instance.name, host = instance.host,
7571
portNumber = instance.portNumber, componentType = instance.componentType,
7672
dockerId = None, instanceState = InstanceState.Running, labels = instance.labels,
7773
linksTo = List.empty[InstanceLink], linksFrom = List.empty[InstanceLink])
7874

79-
instanceDao.addInstance(newInstance) match {
80-
case Success(_) =>
81-
fireNumbersChangedEvent(newInstance.componentType)
82-
fireInstanceAddedEvent(newInstance)
83-
Success(newID)
75+
instanceDao.addInstance(noIdInstance) match {
76+
case Success(id) =>
77+
fireNumbersChangedEvent(instanceDao.getInstance(id).get.componentType)
78+
fireInstanceAddedEvent(instanceDao.getInstance(id).get)
79+
log.info(s"Assigned new id $id to registering instance with name ${instance.name}.")
80+
Success(id)
8481
case Failure(x) => Failure(x)
8582
}
8683
}
@@ -243,48 +240,68 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
243240
}
244241

245242
def handleDeploy(componentType: ComponentType, name: Option[String]): Try[Long] = {
246-
val newId = generateNextId()
247-
248243
log.info(s"Deploying container of type $componentType")
244+
val instance = Instance(None,
245+
"",
246+
-1L,
247+
name.getOrElse(s"Generic $componentType"),
248+
componentType,
249+
None,
250+
InstanceState.Deploying,
251+
List.empty[String],
252+
List.empty[InstanceLink],
253+
List.empty[InstanceLink]
254+
)
255+
256+
instanceDao.addInstance(instance) match {
257+
case Success(id) =>
258+
implicit val timeout: Timeout = configuration.dockerOperationTimeout
249259

250-
implicit val timeout: Timeout = configuration.dockerOperationTimeout
251-
252-
val future: Future[Any] = dockerActor ? create(componentType, newId)
253-
val deployResult = Await.result(future, timeout.duration).asInstanceOf[Try[(String, String, Int)]]
254-
255-
deployResult match {
256-
case Failure(ex) =>
257-
log.error(s"Failed to deploy container, docker host not reachable.")
258-
fireDockerOperationErrorEvent(None, s"Deploy failed with message: ${ex.getMessage}")
259-
Failure(new RuntimeException(s"Failed to deploy container, docker host not reachable (${ex.getMessage})."))
260-
case Success((dockerId, host, port)) =>
261-
val normalizedHost = host.substring(1,host.length - 1)
262-
log.info(s"Deployed new container with id $dockerId, host $normalizedHost and port $port.")
263-
264-
val newInstance = Instance(Some(newId),
265-
normalizedHost,
266-
port,
267-
name.getOrElse(s"Generic $componentType"),
268-
componentType,
269-
Some(dockerId),
270-
InstanceState.Deploying,
271-
List.empty[String],
272-
List.empty[InstanceLink],
273-
List.empty[InstanceLink]
274-
)
275-
log.info(s"Registering instance $newInstance....")
276-
277-
instanceDao.addInstance(newInstance) match {
278-
case Success(_) =>
279-
log.info("Successfully registered.")
280-
fireInstanceAddedEvent(newInstance)
281-
fireNumbersChangedEvent(newInstance.componentType)
282-
Success(newId)
283-
case Failure(x) =>
284-
log.info(s"Failed to register. Exception: $x")
285-
Failure(x)
260+
val future: Future[Any] = dockerActor ? create(componentType, id)
261+
val deployResult = Await.result(future, timeout.duration).asInstanceOf[Try[(String, String, Int)]]
262+
263+
deployResult match {
264+
case Failure(ex) =>
265+
log.error(s"Failed to deploy container, docker host not reachable.")
266+
instanceDao.removeInstance(id)
267+
fireDockerOperationErrorEvent(None, s"Deploy failed with message: ${ex.getMessage}")
268+
Failure(new RuntimeException(s"Failed to deploy container, docker host not reachable (${ex.getMessage})."))
269+
case Success((dockerId, host, port)) =>
270+
val normalizedHost = host.substring(1, host.length - 1)
271+
log.info(s"Deployed new container with id $dockerId, host $normalizedHost and port $port.")
272+
273+
val newInstance = Instance(Some(id),
274+
normalizedHost,
275+
port,
276+
name.getOrElse(s"Generic $componentType"),
277+
componentType,
278+
Some(dockerId),
279+
InstanceState.Deploying,
280+
List.empty[String],
281+
List.empty[InstanceLink],
282+
List.empty[InstanceLink]
283+
)
284+
285+
instanceDao.updateInstance(newInstance) match {
286+
case Success(_) =>
287+
log.info("Successfully registered.")
288+
fireInstanceAddedEvent(newInstance)
289+
fireNumbersChangedEvent(newInstance.componentType)
290+
Success(id)
291+
case Failure(x) =>
292+
log.info(s"Failed to register. Exception: $x")
293+
Failure(x)
294+
}
286295
}
296+
case Failure(ex) =>
297+
Failure(ex)
287298
}
299+
300+
301+
302+
303+
304+
288305
}
289306

290307
/** *
@@ -962,14 +979,6 @@ class RequestHandler(configuration: Configuration, connection: DockerConnection)
962979

963980
}
964981

965-
private def generateNextId(): Long = {
966-
if (instanceDao.allInstances().isEmpty) {
967-
0L
968-
} else {
969-
(instanceDao.allInstances().map(i => i.id.getOrElse(0L)) max) + 1L
970-
}
971-
}
972-
973982
private def assignmentAllowed(instanceType: ComponentType) : Boolean = {
974983
instanceType == ComponentType.Crawler || instanceType == ComponentType.WebApi || instanceType == ComponentType.WebApp
975984
}

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/connection/Server.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import scala.util.{Failure, Success}
2020
/**
2121
* Web server configuration for Instance Registry API.
2222
*/
23-
object Server extends HttpApp
23+
class Server (handler: RequestHandler) extends HttpApp
2424
with InstanceJsonSupport
2525
with EventJsonSupport
2626
with InstanceLinkJsonSupport
@@ -30,8 +30,6 @@ object Server extends HttpApp
3030
implicit val materializer : ActorMaterializer = ActorMaterializer()
3131
implicit val ec : ExecutionContext = system.dispatcher
3232

33-
private val handler : RequestHandler = Registry.requestHandler
34-
3533
//Routes that map http endpoints to methods in this object
3634
override def routes : server.Route =
3735
/****************BASIC OPERATIONS****************/

0 commit comments

Comments
 (0)