Skip to content

Commit

Permalink
Adjusted Worker to support ZIO.
Browse files Browse the repository at this point in the history
  • Loading branch information
pme123 committed Feb 2, 2025
1 parent 8bfb032 commit 172941e
Show file tree
Hide file tree
Showing 33 changed files with 419 additions and 261 deletions.
2 changes: 2 additions & 0 deletions 03-worker/src/main/scala/camundala/worker/EngineContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package worker

import camundala.bpmn.*
import camundala.domain.*
import camundala.worker.CamundalaWorkerError.ServiceError
import zio.ZIO

import java.time.{LocalDate, LocalDateTime}
import scala.reflect.ClassTag
Expand Down
4 changes: 3 additions & 1 deletion 03-worker/src/main/scala/camundala/worker/Handler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import camundala.worker.CamundalaWorkerError.*
import io.circe
import sttp.model.Uri.QuerySegment
import sttp.model.{Method, Uri}
import zio.{IO, ZIO}

import scala.reflect.ClassTag

trait WorkerHandler:
def worker: Worker[?, ?, ?]
def topic: String

def applicationName: String
def registerHandler( register: => Unit): Unit =
def registerHandler(register: => Unit): Unit =
val appPackageName = applicationName.replace("-", ".")
val testMode = sys.env.get("WORKER_TEST_MODE").contains("true") // did not work with lazy val
if testMode || getClass.getName.startsWith(appPackageName)
Expand Down
37 changes: 19 additions & 18 deletions 03-worker/src/main/scala/camundala/worker/WorkerDsl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]:

// needed that it can be called from CSubscriptionPostProcessor
def worker: Worker[In, Out, ?]
def topic: String = worker.topic
def topic: String = worker.topic
def timeout: Duration = 10.seconds

def runWorkFromWorker(in: In)(using EngineRunContext): Option[Either[RunWorkError, Out]] =
Expand All @@ -27,26 +27,27 @@ trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]:
protected def errorHandled(error: CamundalaWorkerError, handledErrors: Seq[String]): Boolean =
error.isMock || // if it is mocked, it is handled in the error, as it also could be a successful output
handledErrors.contains(error.errorCode.toString) || handledErrors.map(
_.toLowerCase
).contains("catchall")
_.toLowerCase
).contains("catchall")

protected def regexMatchesAll(
errorHandled: Boolean,
error: CamundalaWorkerError,
regexHandledErrors: Seq[String]
) =
errorHandled: Boolean,
error: CamundalaWorkerError,
regexHandledErrors: Seq[String]
) =
val errorMsg = error.errorMsg.replace("\n", "")
errorHandled && regexHandledErrors.forall(regex =>
errorMsg.matches(s".*$regex.*")
)
end regexMatchesAll

protected def filteredOutput(
outputVariables: Seq[String],
allOutputs: Map[String, Any]
): Map[String, Any] =
outputVariables: Seq[String],
allOutputs: Map[String, Any]
): Map[String, Any] =
outputVariables match
case filter if filter.isEmpty => allOutputs
case filter =>
case filter =>
allOutputs
.filter:
case k -> _ => filter.contains(k)
Expand Down Expand Up @@ -143,12 +144,12 @@ trait ServiceWorkerDsl[
protected def serviceTask: ServiceTask[In, Out, ServiceIn, ServiceOut]
protected def apiUri(in: In): Uri // input must be valid - so no errors
// optional
protected def method: Method = Method.GET
protected def method: Method = Method.GET
protected def querySegments(in: In): Seq[QuerySegmentOrParam] =
Seq.empty // input must be valid - so no errors
// mocking out from outService and headers
protected def inputMapper(in: In): Option[ServiceIn] = None // input must be valid - so no errors
protected def inputHeaders(in: In): Map[String, String] =
// mocking out from outService and headers
protected def inputMapper(in: In): Option[ServiceIn] = None // input must be valid - so no errors
protected def inputHeaders(in: In): Map[String, String] =
Map.empty // input must be valid - so no errors
protected def outputMapper(
serviceOut: ServiceResponse[ServiceOut],
Expand All @@ -167,10 +168,10 @@ trait ServiceWorkerDsl[
in: In
): Either[ServiceMappingError, Out] =
serviceResponse.outputBody match
case _: NoOutput => Right(serviceTask.out)
case _: NoOutput => Right(serviceTask.out)
case Some(_: NoOutput) => Right(serviceTask.out)
case None => Right(serviceTask.out)
case _ =>
case None => Right(serviceTask.out)
case _ =>
Left(ServiceMappingError(s"There is an outputMapper missing for '${getClass.getName}'."))
end defaultOutMapper

Expand Down
112 changes: 64 additions & 48 deletions 03-worker/src/main/scala/camundala/worker/WorkerExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import camundala.domain.*
import camundala.worker.CamundalaWorkerError.*
import camundala.bpmn.WithConfig
import io.circe.syntax.*
import zio.*

case class WorkerExecutor[
In <: Product: InOutCodec,
Expand All @@ -13,57 +14,65 @@ case class WorkerExecutor[
](
worker: T
)(using context: EngineRunContext):
given EngineContext = context.engineContext

def execute(
processVariables: Seq[Either[BadVariableError, (String, Option[Json])]]
): Either[CamundalaWorkerError, Map[String, Any]] =
for
processVariables: Seq[IO[BadVariableError, (String, Option[Json])]]
): IO[CamundalaWorkerError, Map[String, Any]] =
(for
validatedInput <- InputValidator.validate(processVariables)
initializedOutput <- Initializer.initVariables(validatedInput)(using context.engineContext)
initializedOutput <- Initializer.initVariables(validatedInput)
mockedOutput <- OutMocker.mockedOutput(validatedInput)
// only run the work if it is not mocked
output <-
if mockedOutput.isEmpty then WorkRunner.run(validatedInput) else Right(mockedOutput.get)
allOutputs = camundaOutputs(validatedInput, initializedOutput, output)
filteredOut = filteredOutput(allOutputs)
if mockedOutput.isEmpty then WorkRunner.run(validatedInput)
else ZIO.succeed(mockedOutput.get)
allOutputs: Map[String, Any] = camundaOutputs(validatedInput, initializedOutput, output)
filteredOut: Map[String, Any] = filteredOutput(allOutputs, context.generalVariables.outputVariables)
// make MockedOutput as error if mocked
_ <- if mockedOutput.isDefined then Left(MockedOutput(filteredOut)) else Right(())
yield filteredOut
_ <- if mockedOutput.isDefined then ZIO.fail(MockedOutput(filteredOut)) else ZIO.succeed(())
yield filteredOut)

object InputValidator:
lazy val prototype = worker.in
lazy val validationHandler = worker.validationHandler

def validate(
inputParamsAsJson: Seq[Either[Any, (String, Option[Json])]]
): Either[ValidatorError, In] =
val jsonResult: Either[ValidatorError, Seq[(String, Option[Json])]] =
inputParamsAsJson
.partition(_.isRight) match
case (successes, failures) if failures.isEmpty =>
Right(
successes.collect { case Right(value) => value }
)
case (_, failures) =>
Left(
ValidatorError(
failures
.collect { case Left(value) => value }
.mkString("Validator Error(s):\n - ", " - ", "\n")
inputParamsAsJson: Seq[IO[Any, (String, Option[Json])]]
): IO[ValidatorError, In] =

val jsonResult: IO[ValidatorError, Seq[(String, Option[Json])]] =
ZIO
.partition(inputParamsAsJson)(i => i)
.flatMap:
case (failures, successes) if failures.isEmpty =>
ZIO.succeed(successes.toSeq)
case (failures, _) =>
ZIO.fail(
ValidatorError(
failures
.collect { case Left(value) => value }
.mkString("Validator Error(s):\n - ", " - ", "\n")
)
)
)
val json: Either[ValidatorError, JsonObject] = jsonResult

val json: IO[ValidatorError, JsonObject] = jsonResult
.map(_.foldLeft(JsonObject()) { case (jsonObj, jsonKey -> jsonValue) =>
if jsonValue.isDefined
then jsonObj.add(jsonKey, jsonValue.get)
else jsonObj
})
def toIn(posJsonObj: Either[ValidatorError, JsonObject]): Either[ValidatorError, In] =

def toIn(posJsonObj: IO[ValidatorError, JsonObject]): IO[ValidatorError, In] =
posJsonObj
.flatMap(jsonObj =>
decodeTo[In](jsonObj.asJson.deepDropNullValues.toString).left
.map(ex => ValidatorError(errorMsg = ex.errorMsg))
.flatMap(in => validationHandler.map(h => h.validate(in)).getOrElse(Right(in)))
ZIO.fromEither(decodeTo[In](jsonObj.asJson.deepDropNullValues.toString))
.mapError(ex => ValidatorError(errorMsg = ex.errorMsg))
.flatMap(in =>
validationHandler.map(h => ZIO.fromEither(h.validate(in))).getOrElse(ZIO.succeed(
in
))
)
)

val in = toIn(json)
Expand All @@ -74,7 +83,7 @@ case class WorkerExecutor[
jsonObj: JsonObject <- json
inputVariables = jsonObj.toMap
configJson: JsonObject =
inputVariables.get("inConfig").getOrElse(i.defaultConfigAsJson).asObject.get
inputVariables.getOrElse("inConfig", i.defaultConfigAsJson).asObject.get
newJsonConfig = worker.inConfigVariableNames
.foldLeft(configJson):
case (configJson, n) =>
Expand All @@ -98,17 +107,19 @@ case class WorkerExecutor[

def initVariables(
validatedInput: In
): InitProcessFunction =
)(using EngineContext): IO[InitProcessError, Map[String, Any]] =
worker.initProcessHandler
.map { vi =>
.map: vi =>
vi.init(validatedInput).map(_ ++ defaultVariables)
}
.getOrElse(Right(defaultVariables))
.map:
ZIO.fromEither
.getOrElse:
ZIO.succeed(defaultVariables)
end Initializer

object OutMocker:

def mockedOutput(in: In): Either[MockerError, Option[Out]] =
def mockedOutput(in: In): IO[MockerError, Option[Out]] =
(
context.generalVariables.isMockedWorker(worker.topic),
context.generalVariables.outputMock,
Expand All @@ -122,49 +133,54 @@ case class WorkerExecutor[
worker.defaultMock(in).map(Some(_))
// otherwise it is not mocked or it is a service mock which is handled in service Worker during running
case (_, None, _) =>
Right(None)
ZIO.succeed(None)
end mockedOutput

private def decodeMock(
json: Json
) =
json.as[Out]
ZIO.fromEither(json.as[Out])
.map:
Some(_)
.left.map: error =>
.mapError: error =>
MockerError(errorMsg = s"$error:\n- $json")
end decodeMock

end OutMocker

object WorkRunner:
def run(inputObject: In): Either[RunWorkError, Out | NoOutput] =
def run(inputObject: In)(using EngineRunContext): IO[RunWorkError, Out | NoOutput] =
worker.runWorkHandler
.map(_.runWork(inputObject))
.getOrElse(Right(NoOutput()))
.map:
_.runWork(inputObject)
.map:
ZIO.fromEither
.getOrElse:
ZIO.succeed(NoOutput())
end WorkRunner

private def camundaOutputs(
initializedInput: In,
internalVariables: Map[String, Any],
output: Out | NoOutput
): Map[String, Any] =
)(using context: EngineRunContext): Map[String, Any] =
context.toEngineObject(initializedInput) ++ internalVariables ++
(output match
case o: NoOutput =>
context.toEngineObject(o)
case _ =>
context.toEngineObject(output.asInstanceOf[Out])
)
)

private def filteredOutput(
allOutputs: Map[String, Any]
allOutputs: Map[String, Any],
outputVariables: Seq[String]
): Map[String, Any] =
val filter = context.generalVariables.outputVariables
if filter.isEmpty then
if outputVariables.isEmpty then
allOutputs
else
allOutputs
.filter { case k -> _ => filter.contains(k) }
.filter { case k -> _ => outputVariables.contains(k) }
end if
end filteredOutput

Expand Down
Loading

0 comments on commit 172941e

Please sign in to comment.