Skip to content

Commit 651411c

Browse files
Merge pull request #122 from delphi-hub/feature/checklinks
Fix links state update bug
2 parents 9449e22 + ce3b7fa commit 651411c

File tree

1 file changed

+69
-41
lines changed

1 file changed

+69
-41
lines changed

src/main/scala/de/upb/cs/swt/delphi/instanceregistry/RequestHandler.scala

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import scala.concurrent.{Await, ExecutionContext, Future}
3636
import scala.language.postfixOps
3737
import scala.util.{Failure, Success, Try}
3838

39+
40+
// scalastyle:off number.of.methods
3941
class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao: InstanceDAO, connection: DockerConnection) extends AppLogging {
4042

4143

@@ -122,15 +124,15 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
122124
}
123125

124126
def getAllInstancesOfType(compType: Option[ComponentType]): List[Instance] = {
125-
if(compType.isDefined){
127+
if (compType.isDefined) {
126128
instanceDao.getInstancesOfType(compType.get)
127129
} else {
128130
instanceDao.allInstances()
129131
}
130132
}
131133

132134
def getNumberOfInstances(compType: Option[ComponentType]): Int = {
133-
if(compType.isDefined){
135+
if (compType.isDefined) {
134136
instanceDao.allInstances().count(i => i.componentType == compType.get)
135137
} else {
136138
instanceDao.allInstances().length
@@ -242,7 +244,6 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
242244
OperationResult.IdUnknown
243245
} else {
244246
val matchedInstance = instanceDao.getInstance(matchedInstanceId).get
245-
246247
//Update list of matching results
247248
instanceDao.addMatchingResult(matchedInstanceId, matchingSuccess)
248249
//Update state of matchedInstance accordingly
@@ -257,18 +258,41 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
257258

258259
//Update link state
259260
if (!matchingSuccess) {
260-
val link = InstanceLink(callerId, matchedInstanceId, LinkState.Failed)
261-
instanceDao.updateLink(link) match {
261+
262+
setActiveLinksToFailed(matchedInstanceId) match {
262263
case Success(_) =>
263-
fireLinkStateChangedEvent(link)
264264
OperationResult.Ok
265-
case Failure(_) => OperationResult.InternalError //Should not happen
265+
case Failure(_) =>
266+
// Message logged by method
267+
OperationResult.InternalError
266268
}
267269
} else {
268270
OperationResult.Ok
269271
}
272+
}
273+
}
274+
275+
276+
def setActiveLinksToFailed(failedInstanceId: Long): Try[Unit] = {
277+
278+
val linksToFailedInstance = instanceDao.getLinksTo(failedInstanceId)
279+
var errors = false
270280

281+
for (link <- linksToFailedInstance) {
282+
//Do not update outdated links
283+
if (link.linkState == LinkState.Assigned) {
284+
val newLink = InstanceLink(link.idFrom, failedInstanceId, LinkState.Failed)
285+
instanceDao.updateLink(newLink) match {
286+
case Success(_) =>
287+
fireLinkStateChangedEvent(link)
288+
case Failure(ex) =>
289+
errors = true
290+
log.warning(s"There was a failure while updating the link state ${ex.getMessage}")
291+
}
292+
}
271293
}
294+
295+
if(errors) Failure(new RuntimeException("Link updates unsuccessful")) else Success()
272296
}
273297

274298
// scalastyle:off method.length
@@ -332,6 +356,7 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
332356
Failure(ex)
333357
}
334358
}
359+
335360
// scalastyle:on method.length
336361

337362
/** *
@@ -569,6 +594,7 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
569594
OperationResult.InvalidStateForOperation
570595
}
571596
}
597+
572598
// scalastyle:on method.length cyclomatic.complexity
573599

574600
/** *
@@ -723,21 +749,21 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
723749

724750
val f: Future[(OperationResult.Value, Option[String])] =
725751
(dockerActor ? LogsMessage(instance.dockerId.get, stdErrSelected, stream = false)) (configuration.dockerOperationTimeout).map {
726-
logVal: Any =>
727-
val logResult = logVal.asInstanceOf[Try[String]]
728-
logResult match {
729-
case Success(logContent) =>
730-
(OperationResult.Ok, Some(logContent))
731-
case Failure(ex) =>
732-
log.warning(s"Failed to get logs from actor, exception: ${ex.getMessage}")
733-
(OperationResult.InternalError, None)
734-
}
752+
logVal: Any =>
753+
val logResult = logVal.asInstanceOf[Try[String]]
754+
logResult match {
755+
case Success(logContent) =>
756+
(OperationResult.Ok, Some(logContent))
757+
case Failure(ex) =>
758+
log.warning(s"Failed to get logs from actor, exception: ${ex.getMessage}")
759+
(OperationResult.InternalError, None)
760+
}
735761

736-
}.recover {
737-
case ex: Exception =>
738-
fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to get logs with message: ${ex.getMessage}")
739-
(OperationResult.InternalError, None)
740-
}
762+
}.recover {
763+
case ex: Exception =>
764+
fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to get logs with message: ${ex.getMessage}")
765+
(OperationResult.InternalError, None)
766+
}
741767
Await.result(f, configuration.dockerOperationTimeout.duration)
742768
}
743769
}
@@ -752,21 +778,21 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
752778

753779
val f: Future[(OperationResult.Value, Option[Publisher[Message]])] =
754780
(dockerActor ? LogsMessage(instance.dockerId.get, stdErrSelected, stream = true)) (configuration.dockerOperationTimeout).map {
755-
publisherVal: Any =>
756-
val publisherResult = publisherVal.asInstanceOf[Try[Publisher[Message]]]
757-
publisherResult match {
758-
case Success(publisher) =>
759-
(OperationResult.Ok, Some(publisher))
760-
case Failure(ex) =>
761-
log.warning(s"Failed to stream logs from actor, exception: ${ex.getMessage}")
762-
(OperationResult.InternalError, None)
763-
}
781+
publisherVal: Any =>
782+
val publisherResult = publisherVal.asInstanceOf[Try[Publisher[Message]]]
783+
publisherResult match {
784+
case Success(publisher) =>
785+
(OperationResult.Ok, Some(publisher))
786+
case Failure(ex) =>
787+
log.warning(s"Failed to stream logs from actor, exception: ${ex.getMessage}")
788+
(OperationResult.InternalError, None)
789+
}
764790

765-
}.recover {
766-
case ex: Exception =>
767-
fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to stream logs with message: ${ex.getMessage}")
768-
(OperationResult.InternalError, None)
769-
}
791+
}.recover {
792+
case ex: Exception =>
793+
fireDockerOperationErrorEvent(Some(instance), errorMessage = s"Failed to stream logs with message: ${ex.getMessage}")
794+
(OperationResult.InternalError, None)
795+
}
770796
Await.result(f, configuration.dockerOperationTimeout.duration)
771797
}
772798
}
@@ -839,6 +865,7 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
839865
}
840866
}
841867
}
868+
842869
// scalastyle:on method.length cyclomatic.complexity
843870

844871
/**
@@ -925,12 +952,12 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
925952
* Handles a call to /command. container id and command must be present,
926953
* Will run the command into the container with provide parameters
927954
*
928-
* @param id container id the command will run on
929-
* @param command the command to run
930-
* Format is a single character [a-Z] or ctrl-<@value> where <v@alue> is one of: a-z, @, [, , or _
931-
* @param privileged runs the process with extended privileges
932-
* @param user A string value specifying the user, and optionally, group to run the process inside the container,
933-
* Format is one of: "user", "user:group", "uid", or "uid:gid".
955+
* @param id container id the command will run on
956+
* @param command the command to run
957+
* Format is a single character [a-Z] or ctrl-<@value> where <v@alue> is one of: a-z, @, [, , or _
958+
* @param privileged runs the process with extended privileges
959+
* @param user A string value specifying the user, and optionally, group to run the process inside the container,
960+
* Format is one of: "user", "user:group", "uid", or "uid:gid".
934961
* @return
935962
*/
936963
def handleCommand(id: Long, command: String, privileged: Option[Boolean], user: Option[String]): OperationResult.Value = {
@@ -1082,6 +1109,7 @@ class RequestHandler(configuration: Configuration, authDao: AuthDAO, instanceDao
10821109
instanceDao.addEventFor(link.idTo, event)
10831110
}
10841111

1112+
10851113
private def countConsecutivePositiveMatchingResults(id: Long): Int = {
10861114
if (!instanceDao.hasInstance(id) || instanceDao.getMatchingResultsFor(id).get.isEmpty) {
10871115
0

0 commit comments

Comments
 (0)