From 172941e90979c332da76270d5a02414e7c36c1fe Mon Sep 17 00:00:00 2001 From: Pascal Mengelt Date: Sun, 2 Feb 2025 12:15:30 +0100 Subject: [PATCH] Adjusted Worker to support ZIO. --- .../camundala/worker/EngineContext.scala | 2 + .../main/scala/camundala/worker/Handler.scala | 4 +- .../scala/camundala/worker/WorkerDsl.scala | 37 ++--- .../camundala/worker/WorkerExecutor.scala | 112 ++++++++------ .../src/main/scala/camundala/worker/ast.scala | 49 +++--- .../main/scala/camundala/worker/exports.scala | 3 +- .../test/scala/camundala/worker/astTest.scala | 11 +- .../camundala.worker.WorkerExecutorTest.scala | 27 ++-- .../camunda7/worker/C7WorkerHandler.scala | 42 +++-- .../camunda7/worker/Camunda7Context.scala | 3 + .../camunda7/worker/CamundaHelper.scala | 79 +++++----- .../worker/ProcessVariablesExtractor.scala | 12 +- .../worker/{c8zio => c7zio}/C7Client.scala | 4 +- .../worker/{c8zio => c7zio}/C7Context.scala | 12 +- .../worker/{c8zio => c7zio}/C7Worker.scala | 32 ++-- .../{c8zio => c7zio}/C7WorkerRegistry.scala | 2 +- .../{c8zio => c7zio}/CamundaHelper.scala | 2 +- .../ProcessVariablesExtractor.scala | 2 +- .../worker/c7zio/RestApiClient.scala | 146 ++++++++++++++++++ .../worker/{c8zio => c7zio}/exports.scala | 2 +- .../oauth/OAuthPasswordFlow.scala | 2 +- .../{c8zio => c7zio}/oauth/TokenCache.scala | 2 +- .../{c8zio => c7zio}/oauth/TokenService.scala | 2 +- .../worker/{c8zio => c7zio}/C8Client.scala | 2 +- .../worker/{c8zio => c7zio}/C8Context.scala | 4 +- .../worker/{c8zio => c7zio}/C8Worker.scala | 72 ++++----- .../{c8zio => c7zio}/C8WorkerRegistry.scala | 2 +- .../newWorker/CompanyWorkerHandler.scala | 2 +- .../demos/newWorker/ExampleJob2Worker.scala | 2 +- .../demos/newWorker/ExampleJobWorker.scala | 2 +- .../demos/newWorker/TestWorker2App.scala | 2 +- .../demos/newWorker/TestWorkerApp.scala | 2 +- .../invoice/worker/ComposedWorker.scala | 1 + 33 files changed, 419 insertions(+), 261 deletions(-) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C7Client.scala (97%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C7Context.scala (74%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C7Worker.scala (83%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C7WorkerRegistry.scala (97%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/CamundaHelper.scala (99%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/ProcessVariablesExtractor.scala (98%) create mode 100644 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/RestApiClient.scala rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/exports.scala (88%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/oauth/OAuthPasswordFlow.scala (98%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/oauth/TokenCache.scala (88%) rename 04-worker-c7zio/src/main/scala/camundala/worker/{c8zio => c7zio}/oauth/TokenService.scala (98%) rename 04-worker-c8zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C8Client.scala (97%) rename 04-worker-c8zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C8Context.scala (82%) rename 04-worker-c8zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C8Worker.scala (64%) rename 04-worker-c8zio/src/main/scala/camundala/worker/{c8zio => c7zio}/C8WorkerRegistry.scala (97%) diff --git a/03-worker/src/main/scala/camundala/worker/EngineContext.scala b/03-worker/src/main/scala/camundala/worker/EngineContext.scala index 4adb7ee9..40833c1b 100644 --- a/03-worker/src/main/scala/camundala/worker/EngineContext.scala +++ b/03-worker/src/main/scala/camundala/worker/EngineContext.scala @@ -3,6 +3,8 @@ package worker import camundala.bpmn.* import camundala.domain.* +import camundala.worker.CamundalaWorkerError.ServiceError +import zio.ZIO import java.time.{LocalDate, LocalDateTime} import scala.reflect.ClassTag diff --git a/03-worker/src/main/scala/camundala/worker/Handler.scala b/03-worker/src/main/scala/camundala/worker/Handler.scala index 0a03b5da..a9cc3c9c 100644 --- a/03-worker/src/main/scala/camundala/worker/Handler.scala +++ b/03-worker/src/main/scala/camundala/worker/Handler.scala @@ -7,6 +7,8 @@ import camundala.worker.CamundalaWorkerError.* import io.circe import sttp.model.Uri.QuerySegment import sttp.model.{Method, Uri} +import zio.{IO, ZIO} + import scala.reflect.ClassTag trait WorkerHandler: @@ -14,7 +16,7 @@ trait WorkerHandler: def topic: String def applicationName: String - def registerHandler( register: => Unit): Unit = + def registerHandler(register: => Unit): Unit = val appPackageName = applicationName.replace("-", ".") val testMode = sys.env.get("WORKER_TEST_MODE").contains("true") // did not work with lazy val if testMode || getClass.getName.startsWith(appPackageName) diff --git a/03-worker/src/main/scala/camundala/worker/WorkerDsl.scala b/03-worker/src/main/scala/camundala/worker/WorkerDsl.scala index 6c25f3c2..f511bd1a 100644 --- a/03-worker/src/main/scala/camundala/worker/WorkerDsl.scala +++ b/03-worker/src/main/scala/camundala/worker/WorkerDsl.scala @@ -12,7 +12,7 @@ trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]: // needed that it can be called from CSubscriptionPostProcessor def worker: Worker[In, Out, ?] - def topic: String = worker.topic + def topic: String = worker.topic def timeout: Duration = 10.seconds def runWorkFromWorker(in: In)(using EngineRunContext): Option[Either[RunWorkError, Out]] = @@ -27,26 +27,27 @@ trait WorkerDsl[In <: Product: InOutCodec, Out <: Product: InOutCodec]: protected def errorHandled(error: CamundalaWorkerError, handledErrors: Seq[String]): Boolean = error.isMock || // if it is mocked, it is handled in the error, as it also could be a successful output handledErrors.contains(error.errorCode.toString) || handledErrors.map( - _.toLowerCase - ).contains("catchall") + _.toLowerCase + ).contains("catchall") protected def regexMatchesAll( - errorHandled: Boolean, - error: CamundalaWorkerError, - regexHandledErrors: Seq[String] - ) = + errorHandled: Boolean, + error: CamundalaWorkerError, + regexHandledErrors: Seq[String] + ) = val errorMsg = error.errorMsg.replace("\n", "") errorHandled && regexHandledErrors.forall(regex => errorMsg.matches(s".*$regex.*") ) + end regexMatchesAll protected def filteredOutput( - outputVariables: Seq[String], - allOutputs: Map[String, Any] - ): Map[String, Any] = + outputVariables: Seq[String], + allOutputs: Map[String, Any] + ): Map[String, Any] = outputVariables match case filter if filter.isEmpty => allOutputs - case filter => + case filter => allOutputs .filter: case k -> _ => filter.contains(k) @@ -143,12 +144,12 @@ trait ServiceWorkerDsl[ protected def serviceTask: ServiceTask[In, Out, ServiceIn, ServiceOut] protected def apiUri(in: In): Uri // input must be valid - so no errors // optional - protected def method: Method = Method.GET + protected def method: Method = Method.GET protected def querySegments(in: In): Seq[QuerySegmentOrParam] = Seq.empty // input must be valid - so no errors - // mocking out from outService and headers - protected def inputMapper(in: In): Option[ServiceIn] = None // input must be valid - so no errors - protected def inputHeaders(in: In): Map[String, String] = + // mocking out from outService and headers + protected def inputMapper(in: In): Option[ServiceIn] = None // input must be valid - so no errors + protected def inputHeaders(in: In): Map[String, String] = Map.empty // input must be valid - so no errors protected def outputMapper( serviceOut: ServiceResponse[ServiceOut], @@ -167,10 +168,10 @@ trait ServiceWorkerDsl[ in: In ): Either[ServiceMappingError, Out] = serviceResponse.outputBody match - case _: NoOutput => Right(serviceTask.out) + case _: NoOutput => Right(serviceTask.out) case Some(_: NoOutput) => Right(serviceTask.out) - case None => Right(serviceTask.out) - case _ => + case None => Right(serviceTask.out) + case _ => Left(ServiceMappingError(s"There is an outputMapper missing for '${getClass.getName}'.")) end defaultOutMapper diff --git a/03-worker/src/main/scala/camundala/worker/WorkerExecutor.scala b/03-worker/src/main/scala/camundala/worker/WorkerExecutor.scala index 489d2fd7..78dc661d 100644 --- a/03-worker/src/main/scala/camundala/worker/WorkerExecutor.scala +++ b/03-worker/src/main/scala/camundala/worker/WorkerExecutor.scala @@ -5,6 +5,7 @@ import camundala.domain.* import camundala.worker.CamundalaWorkerError.* import camundala.bpmn.WithConfig import io.circe.syntax.* +import zio.* case class WorkerExecutor[ In <: Product: InOutCodec, @@ -13,57 +14,65 @@ case class WorkerExecutor[ ]( worker: T )(using context: EngineRunContext): + given EngineContext = context.engineContext def execute( - processVariables: Seq[Either[BadVariableError, (String, Option[Json])]] - ): Either[CamundalaWorkerError, Map[String, Any]] = - for + processVariables: Seq[IO[BadVariableError, (String, Option[Json])]] + ): IO[CamundalaWorkerError, Map[String, Any]] = + (for validatedInput <- InputValidator.validate(processVariables) - initializedOutput <- Initializer.initVariables(validatedInput)(using context.engineContext) + initializedOutput <- Initializer.initVariables(validatedInput) mockedOutput <- OutMocker.mockedOutput(validatedInput) // only run the work if it is not mocked output <- - if mockedOutput.isEmpty then WorkRunner.run(validatedInput) else Right(mockedOutput.get) - allOutputs = camundaOutputs(validatedInput, initializedOutput, output) - filteredOut = filteredOutput(allOutputs) + if mockedOutput.isEmpty then WorkRunner.run(validatedInput) + else ZIO.succeed(mockedOutput.get) + allOutputs: Map[String, Any] = camundaOutputs(validatedInput, initializedOutput, output) + filteredOut: Map[String, Any] = filteredOutput(allOutputs, context.generalVariables.outputVariables) // make MockedOutput as error if mocked - _ <- if mockedOutput.isDefined then Left(MockedOutput(filteredOut)) else Right(()) - yield filteredOut + _ <- if mockedOutput.isDefined then ZIO.fail(MockedOutput(filteredOut)) else ZIO.succeed(()) + yield filteredOut) object InputValidator: lazy val prototype = worker.in lazy val validationHandler = worker.validationHandler def validate( - inputParamsAsJson: Seq[Either[Any, (String, Option[Json])]] - ): Either[ValidatorError, In] = - val jsonResult: Either[ValidatorError, Seq[(String, Option[Json])]] = - inputParamsAsJson - .partition(_.isRight) match - case (successes, failures) if failures.isEmpty => - Right( - successes.collect { case Right(value) => value } - ) - case (_, failures) => - Left( - ValidatorError( - failures - .collect { case Left(value) => value } - .mkString("Validator Error(s):\n - ", " - ", "\n") + inputParamsAsJson: Seq[IO[Any, (String, Option[Json])]] + ): IO[ValidatorError, In] = + + val jsonResult: IO[ValidatorError, Seq[(String, Option[Json])]] = + ZIO + .partition(inputParamsAsJson)(i => i) + .flatMap: + case (failures, successes) if failures.isEmpty => + ZIO.succeed(successes.toSeq) + case (failures, _) => + ZIO.fail( + ValidatorError( + failures + .collect { case Left(value) => value } + .mkString("Validator Error(s):\n - ", " - ", "\n") + ) ) - ) - val json: Either[ValidatorError, JsonObject] = jsonResult + + val json: IO[ValidatorError, JsonObject] = jsonResult .map(_.foldLeft(JsonObject()) { case (jsonObj, jsonKey -> jsonValue) => if jsonValue.isDefined then jsonObj.add(jsonKey, jsonValue.get) else jsonObj }) - def toIn(posJsonObj: Either[ValidatorError, JsonObject]): Either[ValidatorError, In] = + + def toIn(posJsonObj: IO[ValidatorError, JsonObject]): IO[ValidatorError, In] = posJsonObj .flatMap(jsonObj => - decodeTo[In](jsonObj.asJson.deepDropNullValues.toString).left - .map(ex => ValidatorError(errorMsg = ex.errorMsg)) - .flatMap(in => validationHandler.map(h => h.validate(in)).getOrElse(Right(in))) + ZIO.fromEither(decodeTo[In](jsonObj.asJson.deepDropNullValues.toString)) + .mapError(ex => ValidatorError(errorMsg = ex.errorMsg)) + .flatMap(in => + validationHandler.map(h => ZIO.fromEither(h.validate(in))).getOrElse(ZIO.succeed( + in + )) + ) ) val in = toIn(json) @@ -74,7 +83,7 @@ case class WorkerExecutor[ jsonObj: JsonObject <- json inputVariables = jsonObj.toMap configJson: JsonObject = - inputVariables.get("inConfig").getOrElse(i.defaultConfigAsJson).asObject.get + inputVariables.getOrElse("inConfig", i.defaultConfigAsJson).asObject.get newJsonConfig = worker.inConfigVariableNames .foldLeft(configJson): case (configJson, n) => @@ -98,17 +107,19 @@ case class WorkerExecutor[ def initVariables( validatedInput: In - ): InitProcessFunction = + )(using EngineContext): IO[InitProcessError, Map[String, Any]] = worker.initProcessHandler - .map { vi => + .map: vi => vi.init(validatedInput).map(_ ++ defaultVariables) - } - .getOrElse(Right(defaultVariables)) + .map: + ZIO.fromEither + .getOrElse: + ZIO.succeed(defaultVariables) end Initializer object OutMocker: - def mockedOutput(in: In): Either[MockerError, Option[Out]] = + def mockedOutput(in: In): IO[MockerError, Option[Out]] = ( context.generalVariables.isMockedWorker(worker.topic), context.generalVariables.outputMock, @@ -122,49 +133,54 @@ case class WorkerExecutor[ worker.defaultMock(in).map(Some(_)) // otherwise it is not mocked or it is a service mock which is handled in service Worker during running case (_, None, _) => - Right(None) + ZIO.succeed(None) end mockedOutput private def decodeMock( json: Json ) = - json.as[Out] + ZIO.fromEither(json.as[Out]) .map: Some(_) - .left.map: error => + .mapError: error => MockerError(errorMsg = s"$error:\n- $json") end decodeMock end OutMocker object WorkRunner: - def run(inputObject: In): Either[RunWorkError, Out | NoOutput] = + def run(inputObject: In)(using EngineRunContext): IO[RunWorkError, Out | NoOutput] = worker.runWorkHandler - .map(_.runWork(inputObject)) - .getOrElse(Right(NoOutput())) + .map: + _.runWork(inputObject) + .map: + ZIO.fromEither + .getOrElse: + ZIO.succeed(NoOutput()) end WorkRunner private def camundaOutputs( initializedInput: In, internalVariables: Map[String, Any], output: Out | NoOutput - ): Map[String, Any] = + )(using context: EngineRunContext): Map[String, Any] = context.toEngineObject(initializedInput) ++ internalVariables ++ (output match case o: NoOutput => context.toEngineObject(o) case _ => context.toEngineObject(output.asInstanceOf[Out]) - ) + ) + private def filteredOutput( - allOutputs: Map[String, Any] + allOutputs: Map[String, Any], + outputVariables: Seq[String] ): Map[String, Any] = - val filter = context.generalVariables.outputVariables - if filter.isEmpty then + if outputVariables.isEmpty then allOutputs else allOutputs - .filter { case k -> _ => filter.contains(k) } + .filter { case k -> _ => outputVariables.contains(k) } end if end filteredOutput diff --git a/03-worker/src/main/scala/camundala/worker/ast.scala b/03-worker/src/main/scala/camundala/worker/ast.scala index f230b66f..e3ebecac 100644 --- a/03-worker/src/main/scala/camundala/worker/ast.scala +++ b/03-worker/src/main/scala/camundala/worker/ast.scala @@ -7,6 +7,7 @@ import camundala.worker.CamundalaWorkerError.* import camundala.worker.QuerySegmentOrParam.{Key, KeyValue, Value} import sttp.model.Uri.QuerySegment import sttp.model.{Method, Uri} +import zio.{IO, ZIO} case class Workers(workers: Seq[Worker[?, ?, ?]]) @@ -18,16 +19,16 @@ sealed trait Worker[ def inOutExample: InOut[In, Out, ?] def topic: String - def otherEnumInExamples: Option[Seq[In]] = inOutExample.otherEnumInExamples - lazy val in: In = inOutExample.in - lazy val out: Out = inOutExample.out + def otherEnumInExamples: Option[Seq[In]] = inOutExample.otherEnumInExamples + lazy val in: In = inOutExample.in + lazy val out: Out = inOutExample.out // handler - def validationHandler: Option[ValidationHandler[In]] = None + def validationHandler: Option[ValidationHandler[In]] = None def initProcessHandler: Option[InitProcessHandler[In]] = None // no handler for mocking - all done from the InOut Object - def runWorkHandler: Option[RunWorkHandler[In, Out]] = None + def runWorkHandler: Option[RunWorkHandler[In, Out]] = None // helper - lazy val variableNames: Seq[String] = + lazy val variableNames: Seq[String] = (in.productElementNames.toSeq ++ otherEnumInExamples .map: @@ -39,16 +40,14 @@ sealed trait Worker[ in match case i: WithConfig[?] => i.defaultConfig.productElementNames.toSeq - case _ => Seq.empty + case _ => Seq.empty - def defaultMock(in: In)(using - context: EngineRunContext - ): Either[MockerError, Out] = - Right( + def defaultMock(in: In): IO[MockerError, Out] = + ZIO.succeed( inOutExample match case e: ProcessOrExternalTask[In, Out, ?] => e.dynamicOutMock.map(_(in)).getOrElse(out) - case _ => out + case _ => out ) end defaultMock @@ -132,12 +131,10 @@ case class ServiceWorker[ ): ServiceWorker[In, Out, ServiceIn, ServiceOut] = copy(runWorkHandler = Some(handler)) - override def defaultMock(in: In)(using - context: EngineRunContext - ): Either[MockerError, Out] = - val mocked: Option[Either[MockerError, Out]] = // needed for Union Type - runWorkHandler - .map(handler => + override def defaultMock(in: In): IO[MockerError, Out] = + runWorkHandler + .map(handler => + ZIO.fromEither( handler .outputMapper( inOutExample.dynamicServiceOutMock @@ -147,12 +144,12 @@ case class ServiceWorker[ inOutExample.defaultServiceOutMock.toServiceResponse , in - ).left.map: error => - MockerError(s"Error mapping ServiceResponse to Out: $error") - ) - mocked + ) + ).mapError: error => + MockerError(s"Error mapping ServiceResponse to Out: $error") + ) .getOrElse( - Left(MockerError(s"There is no ServiceRunner defined for Worker: $topic")) + ZIO.fail(MockerError(s"There is no ServiceRunner defined for Worker: $topic")) ) end defaultMock @@ -186,7 +183,7 @@ object RunnableRequest: inputObject.productElementNames.toSeq .zip(inputObject.productIterator.toSeq) .collect { - case k -> Some(v) => k -> s"$v" + case k -> Some(v) => k -> s"$v" case k -> v if v != None => k -> s"$v" } .toMap @@ -194,8 +191,8 @@ object RunnableRequest: val segments = querySegments .collect { - case Value(v) => QuerySegment.Value(v) - case KeyValue(k, v) => QuerySegment.KeyValue(k, v) + case Value(v) => QuerySegment.Value(v) + case KeyValue(k, v) => QuerySegment.KeyValue(k, v) case Key(k) if valueMap.contains(k) => QuerySegment.KeyValue(k, valueMap(k)) } diff --git a/03-worker/src/main/scala/camundala/worker/exports.scala b/03-worker/src/main/scala/camundala/worker/exports.scala index 8255c5f9..8b6986d7 100644 --- a/03-worker/src/main/scala/camundala/worker/exports.scala +++ b/03-worker/src/main/scala/camundala/worker/exports.scala @@ -5,6 +5,7 @@ import camundala.bpmn.* import camundala.domain.* import camundala.worker.CamundalaWorkerError.* import io.circe.* +import zio.{IO, ZIO} import java.util.Date @@ -172,7 +173,7 @@ def niceClassName(clazz: Class[?]) = clazz.getName.split("""\$""").head def printTimeOnConsole(start: Date) = - val time = new Date().getTime - start.getTime + val time = new Date().getTime - start.getTime val color = if time > 1000 then Console.YELLOW_B else if time > 250 then Console.MAGENTA else Console.BLACK diff --git a/03-worker/src/test/scala/camundala/worker/astTest.scala b/03-worker/src/test/scala/camundala/worker/astTest.scala index 826dc409..2dfb8204 100644 --- a/03-worker/src/test/scala/camundala/worker/astTest.scala +++ b/03-worker/src/test/scala/camundala/worker/astTest.scala @@ -2,7 +2,8 @@ package camundala.worker import camundala.bpmn.* import camundala.domain.* -import camundala.worker.CamundalaWorkerError.MockedOutput +import camundala.worker.CamundalaWorkerError.{MockedOutput, ServiceError} +import zio.ZIO import scala.reflect.ClassTag @@ -34,8 +35,8 @@ class astTest extends munit.FunSuite, BpmnProcessDsl, BpmnServiceTaskDsl: inOutExample = proc ) - assertEquals(worker.defaultMock(In(1)), Right(Out())) - assertEquals(worker.defaultMock(In(3)), Right(Out(false))) + assertEquals(worker.defaultMock(In(1)), ZIO.succeed(Out())) + assertEquals(worker.defaultMock(In(3)), ZIO.succeed(Out(false))) } test("defaultMock ServiceTask") { @@ -69,8 +70,8 @@ class astTest extends munit.FunSuite, BpmnProcessDsl, BpmnServiceTaskDsl: ) ) - assertEquals(worker.defaultMock(In(1)), Right(Out())) - assertEquals(worker.defaultMock(In(3)), Right(Out(false))) + assertEquals(worker.defaultMock(In(1)), ZIO.succeed(Out())) + assertEquals(worker.defaultMock(In(3)), ZIO.succeed(Out(false))) } case class In(value: Int = 1) diff --git a/03-worker/src/test/scala/camundala/worker/camundala.worker.WorkerExecutorTest.scala b/03-worker/src/test/scala/camundala/worker/camundala.worker.WorkerExecutorTest.scala index 54a9a4f3..47bdc992 100644 --- a/03-worker/src/test/scala/camundala/worker/camundala.worker.WorkerExecutorTest.scala +++ b/03-worker/src/test/scala/camundala/worker/camundala.worker.WorkerExecutorTest.scala @@ -3,6 +3,7 @@ package camundala.worker import camundala.bpmn.* import camundala.domain.* import camundala.worker.CamundalaWorkerError.* +import zio.ZIO import scala.reflect.ClassTag @@ -48,30 +49,30 @@ class WorkerExecutorTest extends munit.FunSuite, BpmnProcessDsl: test("InputValidator WithConfig override InConfig"): assertEquals( executor.InputValidator.validate(Seq( - Right("requiredValue" -> None), - Right("optionalValue" -> None), - Right("aValue" -> Some(Json.fromString("ok"))), - Right("inConfig" -> Some(Json.obj("requiredValue" -> Json.fromString("aso")))) + ZIO.succeed("requiredValue" -> None), + ZIO.succeed("optionalValue" -> None), + ZIO.succeed("aValue" -> Some(Json.fromString("ok"))), + ZIO.succeed("inConfig" -> Some(Json.obj("requiredValue" -> Json.fromString("aso")))) )), - Right(In(inConfig = Some(InConfig(requiredValue = "aso")))) + ZIO.succeed(In(inConfig = Some(InConfig(requiredValue = "aso")))) ) test("InputValidator WithConfig default InConfig"): assertEquals( executor.InputValidator.validate(Seq( - Right("requiredValue" -> None), - Right("optionalValue" -> None), - Right("aValue" -> Some(Json.fromString("ok"))) + ZIO.succeed("requiredValue" -> None), + ZIO.succeed("optionalValue" -> None), + ZIO.succeed("aValue" -> Some(Json.fromString("ok"))) )), - Right(In(inConfig = Some(InConfig()))) + ZIO.succeed(In(inConfig = Some(InConfig()))) ) test("InputValidator WithConfig override InConfig in In"): assertEquals( executor.InputValidator.validate(Seq( - Right("aValue" -> Some(Json.fromString("ok"))), - Right("requiredValue" -> Some(Json.fromString("aso"))), - Right("optionalValue" -> Some(Json.fromString("nei"))) + ZIO.succeed("aValue" -> Some(Json.fromString("ok"))), + ZIO.succeed("requiredValue" -> Some(Json.fromString("aso"))), + ZIO.succeed("optionalValue" -> Some(Json.fromString("nei"))) )), - Right(In(inConfig = Some(InConfig(requiredValue = "aso", optionalValue = Some("nei"))))) + ZIO.succeed(In(inConfig = Some(InConfig(requiredValue = "aso", optionalValue = Some("nei"))))) ) test("Test optional values are null in JSON"): diff --git a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/C7WorkerHandler.scala b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/C7WorkerHandler.scala index cfb26357..4ae87cad 100644 --- a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/C7WorkerHandler.scala +++ b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/C7WorkerHandler.scala @@ -8,6 +8,7 @@ import camundala.worker.CamundalaWorkerError.* import jakarta.annotation.PostConstruct import org.camunda.bpm.client.{ExternalTaskClient, task as camunda} import org.springframework.beans.factory.annotation.{Autowired, Value} +import zio.{IO, ZIO} import java.util.Date import scala.concurrent.ExecutionContext.Implicits.global @@ -61,14 +62,13 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler: val tryGeneralVariables = ProcessVariablesExtractor.extractGeneral() try (for - generalVariables <- tryGeneralVariables - context = EngineRunContext(engineContext, generalVariables) - filteredOut <- - worker.executor(using context).execute(tryProcessVariables) + generalVariables <- tryGeneralVariables + given EngineRunContext = EngineRunContext(engineContext, generalVariables) + filteredOut <- + worker.executor.execute(tryProcessVariables) yield externalTaskService.handleSuccess(filteredOut, generalVariables.manualOutMapping) // - ).left.map { ex => + ).mapError: ex => externalTaskService.handleError(ex, tryGeneralVariables) - } catch // safety net case ex: Throwable => ex.printStackTrace() @@ -95,25 +95,25 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler: private[worker] def handleError( error: CamundalaWorkerError, - tryGeneralVariables: Either[BadVariableError, GeneralVariables] + tryGeneralVariables: IO[BadVariableError, GeneralVariables] ): HelperContext[Unit] = import CamundalaWorkerError.* val errorMsg = error.errorMsg.replace("\n", "") (for generalVariables <- tryGeneralVariables - errorHandled = isErrorHandled(error, generalVariables.handledErrors) + errorHandled = isErrorHandled(error, generalVariables.handledErrors) errorRegexHandled = errorHandled && generalVariables.regexHandledErrors.forall(regex => - errorMsg.matches(s".*$regex.*") - ) + errorMsg.matches(s".*$regex.*") + ) yield (errorHandled, errorRegexHandled, generalVariables)) .flatMap { case (true, true, generalVariables) => val mockedOutput = error match case error: ErrorWithOutput => error.output - case _ => Map.empty - val filtered = filteredOutput(generalVariables.outputVariables, mockedOutput) - Right( + case _ => Map.empty + val filtered = filteredOutput(generalVariables.outputVariables, mockedOutput) + ZIO.succeed( if error.isMock && !generalVariables.handledErrors.contains( error.errorCode.toString @@ -123,7 +123,7 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler: else val errorVars = Map( "errorCode" -> error.errorCode, - "errorMsg" -> error.errorMsg + "errorMsg" -> error.errorMsg ) val variables = (filtered ++ errorVars).asJava logger.info(s"Handled Error: $errorVars") @@ -134,13 +134,12 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler: variables ) ) - case (true, false, _) => - Left(HandledRegexNotMatchedError(error)) - case _ => - Left(error) + case (true, false, _) => + ZIO.fail(HandledRegexNotMatchedError(error)) + case _ => + ZIO.fail(error) } - .left - .map { err => + .mapError: err => logger.error(err) externalTaskService.handleFailure( summon[camunda.ExternalTask], @@ -149,7 +148,6 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler: 0, 0 ) // TODO implement retry mechanism - } end handleError end extension @@ -160,7 +158,7 @@ trait C7WorkerHandler extends camunda.ExternalTaskHandler, WorkerHandler: ): Map[String, Any] = outputVariables match case filter if filter.isEmpty => allOutputs - case filter => + case filter => allOutputs .filter { case k -> _ => filter.contains(k) } diff --git a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/Camunda7Context.scala b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/Camunda7Context.scala index d5a8ccb0..0783aafb 100644 --- a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/Camunda7Context.scala +++ b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/Camunda7Context.scala @@ -3,9 +3,12 @@ package camunda7.worker import camundala.domain.* import camundala.worker.* +import camundala.worker.CamundalaWorkerError.ServiceError import org.camunda.bpm.client.variable.ClientValues import org.slf4j.{Logger, LoggerFactory} import org.springframework.context.annotation.Configuration +import zio.ZIO + import scala.reflect.ClassTag trait Camunda7Context extends EngineContext: diff --git a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/CamundaHelper.scala b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/CamundaHelper.scala index fac89f7e..b45b6a0c 100644 --- a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/CamundaHelper.scala +++ b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/CamundaHelper.scala @@ -9,6 +9,7 @@ import io.circe.Decoder.Result import org.camunda.bpm.client.task.ExternalTask import org.camunda.bpm.engine.variable.`type`.{PrimitiveValueType, ValueType} import org.camunda.bpm.engine.variable.value.TypedValue +import zio.{IO, ZIO} object CamundaHelper: @@ -22,38 +23,38 @@ object CamundaHelper: */ def variableOpt[A: InOutDecoder]( varKey: String | InputParams - )(using ExternalTask): Either[BadVariableError, Option[A]] = + )(using ExternalTask): IO[BadVariableError, Option[A]] = for maybeJson <- jsonVariableOpt(varKey) - obj <- maybeJson - .map(_.as[Option[A]]) - .getOrElse(Right(None)) - .left - .map(err => - BadVariableError( - s"Problem decoding Json to ${nameOfType[A]}: ${err.getMessage}" - ) - ) + obj <- maybeJson + .map(_.as[Option[A]]) + .map(ZIO.fromEither) + .getOrElse(ZIO.succeed(None)) + .mapError(err => + BadVariableError( + s"Problem decoding Json to ${nameOfType[A]}: ${err.getMessage}" + ) + ) yield obj def jsonVariableOpt( varKey: String | InputParams - ): HelperContext[Either[BadVariableError, Option[Json]]] = + ): HelperContext[IO[BadVariableError, Option[Json]]] = variableTypedOpt(varKey) .map { case typedValue if typedValue.getType == ValueType.NULL => - Right(None) // k -> null as Camunda Expressions need them + ZIO.succeed(None) // k -> null as Camunda Expressions need them case typedValue => extractValue(typedValue) .map(v => Some(v)) } - .getOrElse(Right(None)) + .getOrElse(ZIO.succeed(None)) // used for input variables you can define with Array of Strings or a comma-separated String // if not set it returns an empty Seq def extractSeqFromArrayOrString( varKey: String | InputParams - ): HelperContext[Either[BadVariableError, Seq[String]]] = + ): HelperContext[IO[BadVariableError, Seq[String]]] = extractSeqFromArrayOrString(varKey, Seq.empty) // used for input variables you can define with Array of Strings or a comma-separated String @@ -61,10 +62,10 @@ object CamundaHelper: def extractSeqFromArrayOrString( varKey: String | InputParams, defaultSeq: Seq[String | ErrorCodes] = Seq.empty - ): HelperContext[Either[BadVariableError, Seq[String]]] = + ): HelperContext[IO[BadVariableError, Seq[String]]] = jsonVariableOpt(varKey) .flatMap { - case Some(value) if value.isArray => + case Some(value) if value.isArray => extractFromSeq( value .as[Seq[String]] @@ -75,8 +76,8 @@ object CamundaHelper: .as[String] .map(_.split(",").toSeq) ) - case _ => - Right(defaultSeq.map(_.toString)) + case _ => + ZIO.succeed(defaultSeq.map(_.toString)) } /** Analog `variable(String vari)`. You can define a Value that is returned if there is no @@ -85,19 +86,22 @@ object CamundaHelper: def variable[A: InOutDecoder]( varKey: String | InputParams, defaultObj: A - ): HelperContext[Either[BadVariableError, A]] = + ): HelperContext[IO[BadVariableError, A]] = variableOpt[A](varKey).map(_.getOrElse(defaultObj)) /** Returns the Variable in the Bag. B if there is no Variable with that identifier. */ def variable[T: InOutDecoder]( varKey: String | InputParams - ): HelperContext[Either[BadVariableError, T]] = + ): HelperContext[IO[BadVariableError, T]] = variableOpt(varKey) .flatMap( - _.toEither( - s"The Variable '$varKey' is required! But does not exist in your Process" - ) + ZIO.fromOption(_) + .mapError(_ => + BadVariableError( + s"The Variable '$varKey' is required! But does not exist in your Process" + ) + ) ) end variable @@ -120,30 +124,29 @@ object CamundaHelper: end extension // Option - def extractValue(typedValue: TypedValue): Either[BadVariableError, Json] = + def extractValue(typedValue: TypedValue): IO[BadVariableError, Json] = typedValue.getType match case pt: PrimitiveValueType if pt.getName == "json" => val jsonStr = typedValue.getValue.toString - parser - .parse(jsonStr) - .left - .map(ex => BadVariableError(s"Input is not valid: $ex")) + ZIO.fromEither(parser + .parse(jsonStr)) + .mapError(ex => BadVariableError(s"Input is not valid: $ex")) case _: PrimitiveValueType => typedValue.getValue match - case vt: DmnValueSimple => - Right(vt.asJson) + case vt: DmnValueSimple => + ZIO.succeed(vt.asJson) case en: scala.reflect.Enum => - Right(Json.fromString(en.toString)) - case other => - Left( + ZIO.succeed(Json.fromString(en.toString)) + case other => + ZIO.fail( BadVariableError( s"Input is not valid: Unexpected PrimitiveValueType: $other" ) ) case other => - Left( + ZIO.fail( BadVariableError( s"Unexpected ValueType ${other.getName} - but is ${typedValue.getType}" ) @@ -153,15 +156,13 @@ object CamundaHelper: private def extractFromSeq( variableKeys: Result[Seq[String]] - ): HelperContext[Either[BadVariableError, Seq[String]]] = - variableKeys + ): HelperContext[IO[BadVariableError, Seq[String]]] = + ZIO.fromEither(variableKeys) .map(_.map(_.trim).filter(_.nonEmpty)) - .left - .map { error => + .mapError: error => error.printStackTrace() BadVariableError( s"Could not extract Seq for an Array or comma-separated String: ${error.getMessage}" ) - } end CamundaHelper diff --git a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/ProcessVariablesExtractor.scala b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/ProcessVariablesExtractor.scala index d61102a2..833d93b3 100644 --- a/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/ProcessVariablesExtractor.scala +++ b/04-worker-c7spring/src/main/scala/camundala/camunda7/worker/ProcessVariablesExtractor.scala @@ -7,13 +7,14 @@ import camundala.worker.* import camundala.worker.CamundalaWorkerError.BadVariableError import org.camunda.bpm.engine.variable.`type`.ValueType import org.camunda.bpm.engine.variable.value.TypedValue +import zio.{IO, ZIO} /** Validator to validate the input variables automatically. */ object ProcessVariablesExtractor: - type VariableType = HelperContext[Seq[Either[BadVariableError, (String, Option[Json])]]] - type GeneralVariableType = HelperContext[Either[BadVariableError, GeneralVariables]] + type VariableType = HelperContext[Seq[IO[BadVariableError, (String, Option[Json])]]] + type GeneralVariableType = HelperContext[IO[BadVariableError, GeneralVariables]] // gets the input variables of the process as Optional Jsons. def extract(variableNames: Seq[String]): VariableType = @@ -21,14 +22,13 @@ object ProcessVariablesExtractor: .map(k => k -> variableTypedOpt(k)) .map { case k -> Some(typedValue) if typedValue.getType == ValueType.NULL => - Right(k -> None) // k -> null as Camunda Expressions need them + ZIO.succeed(k -> None) // k -> null as Camunda Expressions need them case k -> Some(typedValue) => extractValue(typedValue) .map(v => k -> Some(v)) - .left - .map(ex => BadVariableError(s"Problem extracting Process Variable $k: ${ex.errorMsg}")) + .mapError(ex => BadVariableError(s"Problem extracting Process Variable $k: ${ex.errorMsg}")) case k -> None => - Right(k -> None) // k -> null as Camunda Expressions need them + ZIO.succeed(k -> None) // k -> null as Camunda Expressions need them } end extract diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Client.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Client.scala similarity index 97% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Client.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Client.scala index f6df9fa3..dd696db9 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Client.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Client.scala @@ -1,6 +1,6 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio -import camundala.worker.c8zio.oauth.OAuthPasswordFlow +import camundala.worker.c7zio.oauth.OAuthPasswordFlow import camundala.worker.{Slf4JLogger, WorkerLogger} import org.apache.hc.client5.http.config.RequestConfig import org.apache.hc.client5.http.impl.classic.* diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Context.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Context.scala similarity index 74% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Context.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Context.scala index 90f97b80..4420dd16 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Context.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Context.scala @@ -1,9 +1,12 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.domain.* import camundala.worker.* +import camundala.worker.CamundalaWorkerError.ServiceError import org.camunda.bpm.client.variable.ClientValues import org.slf4j.{Logger, LoggerFactory} +import zio.ZIO + import scala.reflect.ClassTag trait C7Context extends EngineContext: @@ -16,10 +19,7 @@ trait C7Context extends EngineContext: def sendRequest[ServiceIn: InOutEncoder, ServiceOut: InOutDecoder: ClassTag]( request: RunnableRequest[ServiceIn] - ): SendRequestType[ServiceOut] = ??? - // DefaultRestApiClient.sendRequest(request) + ): SendRequestType[ServiceOut] = + DefaultRestApiClient.sendRequest(request) end C7Context - - - diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Worker.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Worker.scala similarity index 83% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Worker.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Worker.scala index 4bb20f1c..ea94f8fa 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7Worker.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7Worker.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.bpmn.GeneralVariables import camundala.domain.* @@ -16,7 +16,7 @@ trait C7Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends WorkerDsl[In, Out], camunda.ExternalTaskHandler: protected def c7Context: C7Context - + def logger: WorkerLogger = Slf4JLogger.logger(getClass.getName) override def execute( @@ -53,16 +53,13 @@ trait C7Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] ProcessVariablesExtractor.extract(worker.variableNames) val tryGeneralVariables = ProcessVariablesExtractor.extractGeneral() (for - generalVariables <- tryGeneralVariables - context = EngineRunContext(c7Context, generalVariables) - filteredOut <- - ZIO.fromEither( - worker.executor(using context).execute(variablesAsEithers(tryProcessVariables)) - ) - _ <- ZIO.attempt(externalTaskService.handleSuccess( - filteredOut, - generalVariables.manualOutMapping - )) + generalVariables <- tryGeneralVariables + given EngineRunContext = EngineRunContext(c7Context, generalVariables) + filteredOut <- worker.executor.execute(tryProcessVariables) + _ <- ZIO.attempt(externalTaskService.handleSuccess( + filteredOut, + generalVariables.manualOutMapping + )) yield () // ).mapError: case ex: CamundalaWorkerError => ex @@ -72,17 +69,6 @@ trait C7Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] ex end executeWorker - private def variablesAsEithers(tryProcessVariables: Seq[IO[ - BadVariableError, - (String, Option[Json]) - ]]): Seq[Either[BadVariableError, (String, Option[Json])]] = - tryProcessVariables - .map((x: IO[BadVariableError, (String, Option[Json])]) => - Unsafe.unsafe: - implicit unsafe => // can be removed if everything is ZIO - runtime.unsafe.run(x.either).getOrThrow() - ) - extension (externalTaskService: camunda.ExternalTaskService) private def handleSuccess( diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7WorkerRegistry.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7WorkerRegistry.scala similarity index 97% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7WorkerRegistry.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7WorkerRegistry.scala index 597ea777..6bed866f 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/C7WorkerRegistry.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/C7WorkerRegistry.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.worker.WorkerRegistry import org.camunda.bpm.client.ExternalTaskClient diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/CamundaHelper.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/CamundaHelper.scala similarity index 99% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/CamundaHelper.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/CamundaHelper.scala index 061cb5dd..158dd292 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/CamundaHelper.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/CamundaHelper.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.bpmn.{*, given} import camundala.domain.* diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/ProcessVariablesExtractor.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/ProcessVariablesExtractor.scala similarity index 98% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/ProcessVariablesExtractor.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/ProcessVariablesExtractor.scala index 9fd9e1be..beed88ae 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/ProcessVariablesExtractor.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/ProcessVariablesExtractor.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.bpmn.* import camundala.domain.* diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/RestApiClient.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/RestApiClient.scala new file mode 100644 index 00000000..e1a1df12 --- /dev/null +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/RestApiClient.scala @@ -0,0 +1,146 @@ +package camundala.worker.c7zio + +import camundala.bpmn.* +import camundala.domain.* +import camundala.worker.* +import camundala.worker.CamundalaWorkerError.* +import io.circe.parser +import sttp.client3.* +import sttp.client3.circe.* +import sttp.model.Uri.QuerySegment +import sttp.model.{Header, Uri} + +import scala.util.Try +import scala.reflect.ClassTag + +trait RestApiClient: + + def sendRequest[ + ServiceIn: InOutEncoder, // body of service + ServiceOut: InOutDecoder: ClassTag // output of service + ]( + runnableRequest: RunnableRequest[ServiceIn] + ): SendRequestType[ServiceOut] = + try + for + reqWithOptBody <- requestWithOptBody(runnableRequest) + req <- auth(reqWithOptBody) + response <- sendRequest(req) + statusCode = response.code + body <- readBody(statusCode, response, req) + headers = response.headers.map(h => h.name -> h.value).toMap + out <- decodeResponse[ServiceOut](body) + yield ServiceResponse(out, headers) + catch + case ex: Throwable => + val unexpectedError = + s"""Unexpected error while sending request: ${ex.getMessage}. + | -> $runnableRequest + |""".stripMargin + ex.printStackTrace() + Left(ServiceUnexpectedError(unexpectedError)) + end sendRequest + + protected def readBody( + statusCode: StatusCode, + response: Response[Either[String, String]], + request: Request[Either[String, String], Any] + ): Either[ServiceRequestError, String] = + response.body.left + .map(body => + ServiceRequestError( + statusCode.code, + s"Non-2xx response with code $statusCode:\n$body\n\n${request.toCurl}" + ) + ) + end readBody + + // no auth per default + protected def auth( + request: Request[Either[String, String], Any] + )(using EngineRunContext): Either[ServiceAuthError, Request[Either[String, String], Any]] = + Right(request) + + protected def sendRequest( + request: Request[Either[String, String], Any] + ) = + try + Right(request.send(backend)) + catch + case ex: Throwable => + val unexpectedError = + s"""Unexpected error while sending request: ${ex.getMessage}. + | -> ${request.toCurl(Set("Authorization"))} + |""".stripMargin + ex.printStackTrace() + Left(ServiceUnexpectedError(unexpectedError)) + + protected def decodeResponse[ + ServiceOut: InOutDecoder: ClassTag // output of service + ]( + body: String + ): Either[ServiceBadBodyError, ServiceOut] = + if hasNoOutput[ServiceOut]() + then Right(NoOutput().asInstanceOf[ServiceOut]) + else + if body.isBlank then + val runtimeClass = implicitly[ClassTag[ServiceOut]].runtimeClass + runtimeClass match + case x if x == classOf[Option[?]] => + Right(None.asInstanceOf[ServiceOut]) + case other => + Left(ServiceBadBodyError( + s"There is no body in the response and the ServiceOut is neither NoOutput nor Option (Class is $other)." + )) + end match + else + parser + .decodeAccumulating[ServiceOut](body) + .toEither + .left + .map(err => ServiceBadBodyError(s"Problem creating body from response.\n$err\nBODY: $body")) + + protected def requestWithOptBody[ServiceIn: InOutEncoder]( + runnableRequest: RunnableRequest[ServiceIn] + ): Either[ServiceBadBodyError, RequestT[Identity, Either[String, String], Any]] = + val request = + requestMethod( + runnableRequest.httpMethod, + runnableRequest.apiUri, + runnableRequest.qSegments, + runnableRequest.headers + ) + Try(runnableRequest.requestBodyOpt.map(b => + request.body(b.asJson.deepDropNullValues) + ).getOrElse(request)).toEither.left + .map(err => ServiceBadBodyError(errorMsg = s"Problem creating body for request.\n$err")) + end requestWithOptBody + + private def requestMethod( + httpMethod: Method, + apiUri: Uri, + qSegments: Seq[QuerySegment], + headers: Map[String, String] + ): Request[Either[String, String], Any] = + basicRequest + .copy( + uri = apiUri.addQuerySegments(qSegments), + headers = headers.toSeq.map { case k -> v => Header(k, v) }, + method = httpMethod + ) + end requestMethod + + private[worker] def hasNoOutput[ServiceOut: ClassTag](): Boolean = + val runtimeClass = implicitly[ClassTag[ServiceOut]].runtimeClass + runtimeClass == classOf[NoOutput] + + extension (request: Request[Either[String, String], Any]) + + def addToken(token: String): RequestT[Identity, Either[String, String], Any] = + val tokenHeader = if token.startsWith("Bearer") then token else s"Bearer $token" + request.header("Authorization", tokenHeader) + + end extension +end RestApiClient + +object DefaultRestApiClient extends RestApiClient diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/exports.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/exports.scala similarity index 88% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/exports.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/exports.scala index 884ff0da..4e18643b 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/exports.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/exports.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import org.camunda.bpm.client.task.ExternalTask import sttp.client3.{HttpClientSyncBackend, Identity, SttpBackend} diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/OAuthPasswordFlow.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/OAuthPasswordFlow.scala similarity index 98% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/OAuthPasswordFlow.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/OAuthPasswordFlow.scala index 4e2cadcb..bcec5fc2 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/OAuthPasswordFlow.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/OAuthPasswordFlow.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio.oauth +package camundala.worker.c7zio.oauth import camundala.domain.InOutDecoder import camundala.worker.* diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/TokenCache.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/TokenCache.scala similarity index 88% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/TokenCache.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/TokenCache.scala index 69a5fccb..9d65bed5 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/TokenCache.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/TokenCache.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio.oauth +package camundala.worker.c7zio.oauth import com.github.blemale.scaffeine.{Cache, Scaffeine} diff --git a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/TokenService.scala b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/TokenService.scala similarity index 98% rename from 04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/TokenService.scala rename to 04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/TokenService.scala index 8ab6b90a..9b37c2d2 100644 --- a/04-worker-c7zio/src/main/scala/camundala/worker/c8zio/oauth/TokenService.scala +++ b/04-worker-c7zio/src/main/scala/camundala/worker/c7zio/oauth/TokenService.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio package oauth import camundala.worker.CamundalaWorkerError.ServiceAuthError diff --git a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Client.scala b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Client.scala similarity index 97% rename from 04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Client.scala rename to 04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Client.scala index 7da02d9a..6136882d 100644 --- a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Client.scala +++ b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Client.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import io.camunda.zeebe.client.ZeebeClient import io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder diff --git a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Context.scala b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Context.scala similarity index 82% rename from 04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Context.scala rename to 04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Context.scala index ee834886..babe1727 100644 --- a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Context.scala +++ b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Context.scala @@ -1,7 +1,9 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.domain.* import camundala.worker.* +import camundala.worker.CamundalaWorkerError.ServiceError +import zio.ZIO import scala.reflect.ClassTag diff --git a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Worker.scala b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Worker.scala similarity index 64% rename from 04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Worker.scala rename to 04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Worker.scala index b5ea353f..dd6fd3ae 100644 --- a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8Worker.scala +++ b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8Worker.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.bpmn.GeneralVariables import camundala.domain.* @@ -13,7 +13,8 @@ import java.time import scala.jdk.CollectionConverters.* import java.util.Date -trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends WorkerDsl[In, Out], JobHandler: +trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends WorkerDsl[In, Out], + JobHandler: protected def c8Context: C8Context private lazy val runtime = Runtime.default @@ -27,19 +28,19 @@ trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends Wo def run(client: JobClient, job: ActivatedJob): ZIO[Any, Throwable, Unit] = (for - startDate <- succeed(new Date()) - json <- extractJson(job) - businessKey <- extractBusinessKey(json) - _ <- logInfo( - s"Worker: ${job.getType} (${job.getWorker}) started > $businessKey" - ) - processVariables <- ZIO.foreach(worker.variableNames)(k => processVariable(k, json)) - generalVariables <- extractGeneralVariables(json) - context = EngineRunContext(c8Context, generalVariables) - filteredOut <- ZIO.fromEither(worker.executor(using context).execute(processVariables)) - _ <- logInfo(s"generalVariables: $generalVariables") - _ <- handleSuccess(client, job, filteredOut, generalVariables.manualOutMapping, businessKey) - _ <- + startDate <- succeed(new Date()) + json <- extractJson(job) + businessKey <- extractBusinessKey(json) + _ <- logInfo( + s"Worker: ${job.getType} (${job.getWorker}) started > $businessKey" + ) + processVariables = worker.variableNames.map(k => processVariable(k, json)) + generalVariables <- extractGeneralVariables(json) + given EngineRunContext = EngineRunContext(c8Context, generalVariables) + filteredOut <- worker.executor.execute(processVariables) + _ <- logInfo(s"generalVariables: $generalVariables") + _ <- handleSuccess(client, job, filteredOut, generalVariables.manualOutMapping, businessKey) + _ <- logInfo( s"Worker: ${job.getType} (${job.getWorker}) ended ${printTimeOnConsole(startDate)} > $businessKey" ) @@ -69,25 +70,26 @@ trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends Wo error: CamundalaWorkerError ): ZIO[Any, Throwable, Unit] = (for - _ <- logError(s"Error: ${error.causeMsg}") + _ <- logError(s"Error: ${error.causeMsg}") json <- extractJson(job) generalVariables <- extractGeneralVariables(json) - isErrorHandled = errorHandled(error, generalVariables.handledErrors) - errorRegexHandled = regexMatchesAll(isErrorHandled, error, generalVariables.regexHandledErrors) - _ <- attempt(client.newFailCommand(job) - .retries(job.getRetries - 1) - .retryBackoff(time.Duration.ofSeconds(60)) - .variables(Map("errorCode" -> error.errorCode, "errorMsg" -> error.errorMsg).asJava) - .errorMessage(error.causeMsg) - .send().join()) + isErrorHandled = errorHandled(error, generalVariables.handledErrors) + errorRegexHandled = + regexMatchesAll(isErrorHandled, error, generalVariables.regexHandledErrors) + _ <- attempt(client.newFailCommand(job) + .retries(job.getRetries - 1) + .retryBackoff(time.Duration.ofSeconds(60)) + .variables(Map("errorCode" -> error.errorCode, "errorMsg" -> error.errorMsg).asJava) + .errorMessage(error.causeMsg) + .send().join()) yield (isErrorHandled, errorRegexHandled, generalVariables)) - .flatMap : + .flatMap: case (true, true, generalVariables) => val mockedOutput = error match case error: ErrorWithOutput => error.output - case _ => Map.empty - val filtered = filteredOutput(generalVariables.outputVariables, mockedOutput) + case _ => Map.empty + val filtered = filteredOutput(generalVariables.outputVariables, mockedOutput) ZIO.attempt( if error.isMock && !generalVariables.handledErrors.contains( @@ -98,7 +100,7 @@ trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends Wo else val errorVars = Map( "errorCode" -> error.errorCode, - "errorMsg" -> error.errorMsg + "errorMsg" -> error.errorMsg ) val variables = (filtered ++ errorVars).asJava client.newFailCommand(job) @@ -108,9 +110,9 @@ trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends Wo .errorMessage(error.causeMsg) .send().join() ) - case (true, false, _) => + case (true, false, _) => ZIO.fail(HandledRegexNotMatchedError(error)) - case _ => + case _ => ZIO.fail(error) private def extractGeneralVariables(json: Json) = @@ -141,12 +143,10 @@ trait C8Worker[In <: Product: InOutCodec, Out <: Product: InOutCodec] extends Wo private def processVariable( key: String, json: Json - ): UIO[Either[BadVariableError, (String, Option[Json])]] = - ZIO.succeed( - json.hcursor.downField(key).as[Option[Json]] match - case Right(value) => Right(key -> value) - case Left(ex) => Left(BadVariableError(ex.getMessage)) - ) + ): IO[BadVariableError, (String, Option[Json])] = + json.hcursor.downField(key).as[Option[Json]] match + case Right(value) => ZIO.succeed(key -> value) + case Left(ex) => ZIO.fail(BadVariableError(ex.getMessage)) case class BusinessKey(businessKey: Option[String]) object BusinessKey: diff --git a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8WorkerRegistry.scala b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8WorkerRegistry.scala similarity index 97% rename from 04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8WorkerRegistry.scala rename to 04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8WorkerRegistry.scala index 91dd14e4..38648d46 100644 --- a/04-worker-c8zio/src/main/scala/camundala/worker/c8zio/C8WorkerRegistry.scala +++ b/04-worker-c8zio/src/main/scala/camundala/worker/c7zio/C8WorkerRegistry.scala @@ -1,4 +1,4 @@ -package camundala.worker.c8zio +package camundala.worker.c7zio import camundala.worker.WorkerRegistry import io.camunda.zeebe.client.ZeebeClient diff --git a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/CompanyWorkerHandler.scala b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/CompanyWorkerHandler.scala index b2032a25..4f51b65b 100644 --- a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/CompanyWorkerHandler.scala +++ b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/CompanyWorkerHandler.scala @@ -3,7 +3,7 @@ package camundala.examples.demos.newWorker import camundala.camunda7.worker.C7WorkerHandler import camundala.domain.* import camundala.worker.* -import camundala.worker.c8zio.{C7Context, C7Worker, C8Context, C8Worker} +import camundala.worker.c7zio.{C7Context, C7Worker, C8Context, C8Worker} import scala.reflect.ClassTag diff --git a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJob2Worker.scala b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJob2Worker.scala index ecb898b0..240dfb91 100644 --- a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJob2Worker.scala +++ b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJob2Worker.scala @@ -4,7 +4,7 @@ import camundala.bpmn.CustomTask import camundala.domain.* import camundala.examples.demos.newWorker.ExampleJob2.* import camundala.worker.CamundalaWorkerError -import camundala.worker.c8zio.C8Worker +import camundala.worker.c7zio.C8Worker object ExampleJob2Worker extends CompanyCustomWorkerDsl[In, Out]: lazy val customTask = example diff --git a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJobWorker.scala b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJobWorker.scala index 232c1715..47debc0a 100644 --- a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJobWorker.scala +++ b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/ExampleJobWorker.scala @@ -4,7 +4,7 @@ import camundala.bpmn.CustomTask import camundala.domain.* import camundala.examples.demos.newWorker.ExampleJob.* import camundala.worker.CamundalaWorkerError -import camundala.worker.c8zio.C8Worker +import camundala.worker.c7zio.C8Worker object ExampleJobWorker extends CompanyCustomWorkerDsl[In, Out]: lazy val customTask = example diff --git a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorker2App.scala b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorker2App.scala index c214df06..84aa05e6 100644 --- a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorker2App.scala +++ b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorker2App.scala @@ -1,7 +1,7 @@ package camundala.examples.demos.newWorker import camundala.worker.{WorkerApp, WorkerRegistry} -import camundala.worker.c8zio.{C7NoAuthClient, C7WorkerRegistry, C8SaasClient, C8WorkerRegistry} +import camundala.worker.c7zio.{C7NoAuthClient, C7WorkerRegistry, C8SaasClient, C8WorkerRegistry} object TestWorker2App extends CompanyWorkerApp: diff --git a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorkerApp.scala b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorkerApp.scala index 007c839a..41afed70 100644 --- a/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorkerApp.scala +++ b/05-examples/demos/03-worker/src/main/scala/camundala/examples/demos/newWorker/TestWorkerApp.scala @@ -1,7 +1,7 @@ package camundala.examples.demos.newWorker import camundala.worker.{WorkerApp, WorkerRegistry} -import camundala.worker.c8zio.{C7NoAuthClient, C7WorkerRegistry, C8SaasClient, C8WorkerRegistry} +import camundala.worker.c7zio.{C7NoAuthClient, C7WorkerRegistry, C8SaasClient, C8WorkerRegistry} trait CompanyWorkerApp extends WorkerApp: lazy val workerRegistries: Seq[WorkerRegistry[?]] = diff --git a/05-examples/invoice/03-worker/src/main/scala/camundala/examples/invoice/worker/ComposedWorker.scala b/05-examples/invoice/03-worker/src/main/scala/camundala/examples/invoice/worker/ComposedWorker.scala index d120127a..dd9ca191 100644 --- a/05-examples/invoice/03-worker/src/main/scala/camundala/examples/invoice/worker/ComposedWorker.scala +++ b/05-examples/invoice/03-worker/src/main/scala/camundala/examples/invoice/worker/ComposedWorker.scala @@ -7,6 +7,7 @@ import camundala.worker.CamundalaWorkerError.CustomError import camundala.worker.{CustomWorkerDsl, EngineRunContext} import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.annotation.Configuration +import zio.IO @Configuration class ComposedWorker