Skip to content

Commit ea27a96

Browse files
committed
backlog service
1 parent 0923431 commit ea27a96

File tree

23 files changed

+808
-70
lines changed

23 files changed

+808
-70
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ lazy val ammExecutor = utils
158158
)
159159
.settings(nativePackagerSettings("amm-executor"))
160160
.enablePlugins(JavaAppPackaging, UniversalPlugin, DockerPlugin)
161-
.dependsOn(Seq(core, http).map(_ % allConfigDependency): _*)
161+
.dependsOn(Seq(core, http, cache).map(_ % allConfigDependency): _*)
162162

163163
lazy val poolResolver = utils
164164
.mkModule("pool-resolver", "PoolResolver")

modules/amm-executor/src/main/resources/application.conf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ rotation.retry-delay = 120s
22

33
exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy"
44

5-
execution.order-lifetime = 300s
5+
backlogConfig.order-lifetime = 300s
6+
backlogConfig.order-execution-time = 180s
7+
backlogConfig.suspended-probability = 10
68

79
monetary.miner-fee = 2000000
810
monetary.min-dex-fee = 1000000

modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/App.scala

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,17 @@ import fs2.kafka.RecordDeserializer
66
import fs2.kafka.serde._
77
import org.ergoplatform.ErgoAddressEncoder
88
import org.ergoplatform.common.EnvApp
9+
import org.ergoplatform.common.cache.{Cache, MakeRedisTransaction, Redis}
910
import org.ergoplatform.common.streaming._
1011
import org.ergoplatform.dex.configs.ConsumerConfig
11-
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
12+
import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId}
1213
import org.ergoplatform.dex.executor.amm.config.ConfigBundle
1314
import org.ergoplatform.dex.executor.amm.context.AppContext
1415
import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter}
1516
import org.ergoplatform.dex.executor.amm.processes.Executor
16-
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools
17-
import org.ergoplatform.dex.executor.amm.services.Execution
18-
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries}
17+
import org.ergoplatform.dex.executor.amm.repositories.{CFMMOrders, CFMMPools}
18+
import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution}
19+
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMHistConsumer, CFMMProducerRetries}
1920
import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM}
2021
import org.ergoplatform.ergo.modules.ErgoNetwork
2122
import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming}
@@ -27,6 +28,7 @@ import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend
2728
import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
2829
import tofu.WithRun
2930
import tofu.fs2Instances._
31+
import tofu.generate.GenRandom
3032
import tofu.lift.IsoK
3133
import tofu.syntax.unlift._
3234
import zio.interop.catz._
@@ -40,17 +42,22 @@ object App extends EnvApp[AppContext] {
4042
appF.run(ctx) as ExitCode.success
4143
}.orDie
4244

45+
implicit val mtx: MakeRedisTransaction[RunF] = MakeRedisTransaction.make[RunF]
46+
4347
private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] =
4448
for {
4549
blocker <- Blocker[InitF]
4650
configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker))
51+
implicit0(genRand: GenRandom[RunF]) <- Resource.eval(GenRandom.instance[InitF, RunF]())
4752
ctx = AppContext.init(configs)
4853
implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx)
4954
implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix)
5055
implicit0(confirmedOrders: CFMMConsumerIn[StreamF, RunF, Confirmed]) =
5156
makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders)
5257
implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) =
5358
makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders)
59+
implicit0(ammHistCons: CFMMHistConsumer[StreamF, RunF]) =
60+
makeConsumer[OrderId, Option[EvaluatedCFMMOrder.Any]](configs.consumers.cfmmHistory)
5461
implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) =
5562
makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry)
5663
implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) =
@@ -66,8 +73,13 @@ object App extends EnvApp[AppContext] {
6673
implicit0(t2tInt: CFMMInterpreter[T2T_CFMM, RunF]) <- Resource.eval(T2TCFMMInterpreter.make[InitF, RunF])
6774
implicit0(n2tInt: CFMMInterpreter[N2T_CFMM, RunF]) <- Resource.eval(N2TCFMMInterpreter.make[InitF, RunF])
6875
implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF]
69-
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
70-
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
76+
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
77+
implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis)
78+
implicit0(cache: Cache[RunF]) <- Resource.eval(Cache.make[InitF, RunF])
79+
implicit0(cfmmOrders: CFMMOrders[RunF]) <- Resource.eval[InitF, CFMMOrders[RunF]](CFMMOrders.make[InitF, RunF])
80+
implicit0(cfmmBacklog: CFMMBacklog[RunF]) <-
81+
Resource.eval[InitF, CFMMBacklog[RunF]](CFMMBacklog.make[InitF, RunF])
82+
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
7183
} yield executor -> ctx
7284

7385
private def makeBackend(
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.ergoplatform.dex.executor.amm.config
2+
3+
import derevo.derive
4+
import derevo.pureconfig.pureconfigReader
5+
import tofu.Context
6+
import tofu.logging.derivation.loggable
7+
8+
import scala.concurrent.duration.FiniteDuration
9+
10+
@derive(pureconfigReader, loggable)
11+
final case class BacklogConfig(
12+
orderLifetime: FiniteDuration,
13+
orderExecutionTime: FiniteDuration,
14+
suspendedOrdersExecutionProbabilityPercent: Int
15+
)
16+
17+
object BacklogConfig extends Context.Companion[BacklogConfig]

modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ConfigBundle.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package org.ergoplatform.dex.executor.amm.config
22

33
import derevo.derive
44
import derevo.pureconfig.pureconfigReader
5+
import org.ergoplatform.common.cache.RedisConfig
56
import org.ergoplatform.common.streaming.RotationConfig
67
import org.ergoplatform.dex.configs._
78
import tofu.Context
89
import tofu.logging.derivation.loggable
9-
import tofu.optics.macros.{promote, ClassyOptics}
10+
import tofu.optics.macros.{ClassyOptics, promote}
1011

1112
@derive(pureconfigReader, loggable)
1213
@ClassyOptics
@@ -16,7 +17,9 @@ final case class ConfigBundle(
1617
@promote execution: ExecutionConfig,
1718
@promote monetary: MonetaryConfig,
1819
@promote protocol: ProtocolConfig,
20+
@promote backlogConfig: BacklogConfig,
1921
consumers: Consumers,
22+
redis: RedisConfig,
2023
producers: Producers,
2124
@promote kafka: KafkaConfig,
2225
@promote network: NetworkConfig,

modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/Consumers.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import tofu.logging.derivation.loggable
88
@derive(pureconfigReader, loggable)
99
final case class Consumers(
1010
confirmedOrders: ConsumerConfig,
11+
cfmmHistory: ConsumerConfig,
1112
unconfirmedOrders: ConsumerConfig,
1213
ordersRetry: ConsumerConfig
1314
)

modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/config/ExecutionConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ import tofu.logging.derivation.loggable
88
import scala.concurrent.duration.FiniteDuration
99

1010
@derive(pureconfigReader, loggable)
11-
final case class ExecutionConfig(orderLifetime: FiniteDuration)
11+
final case class ExecutionConfig(order: FiniteDuration)
1212

1313
object ExecutionConfig extends Context.Companion[ExecutionConfig]

modules/amm-executor/src/main/scala/org/ergoplatform/dex/executor/amm/modules/CFMMBacklog.scala

Lines changed: 0 additions & 18 deletions
This file was deleted.
Lines changed: 45 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
package org.ergoplatform.dex.executor.amm.processes
22

3-
import cats.effect.Clock
3+
import cats.effect.{Clock, Timer}
44
import cats.syntax.option._
5-
import cats.{Functor, Monad}
5+
import cats.syntax.traverse._
6+
import cats.{Defer, Functor, Monad, SemigroupK}
67
import derevo.derive
78
import mouse.any._
89
import org.ergoplatform.common.TraceId
910
import org.ergoplatform.common.streaming.syntax._
1011
import org.ergoplatform.dex.domain.amm.CFMMOrder
1112
import org.ergoplatform.dex.executor.amm.config.ExecutionConfig
12-
import org.ergoplatform.dex.executor.amm.services.Execution
13-
import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit
13+
import org.ergoplatform.dex.executor.amm.services.{CFMMBacklog, Execution}
14+
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMHistConsumer}
1415
import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser
1516
import tofu.Catches
1617
import tofu.higherKind.derived.representableK
1718
import tofu.logging.{Logging, Logs}
18-
import tofu.streams.Evals
19+
import tofu.streams.{Evals, ParFlatten}
1920
import tofu.syntax.context._
2021
import tofu.syntax.embed._
2122
import tofu.syntax.handle._
@@ -34,10 +35,12 @@ object Executor {
3435

3536
def make[
3637
I[_]: Functor,
37-
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has,
38-
G[_]: Monad: TraceId.Local: Clock: Catches
38+
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has: Defer: SemigroupK: ParFlatten,
39+
G[_]: Monad: TraceId.Local: Clock: Catches: Timer
3940
](implicit
4041
orders: CFMMCircuit[F, G],
42+
executedOrders: CFMMHistConsumer[F, G],
43+
cfmmBacklog: CFMMBacklog[G],
4144
service: Execution[G],
4245
logs: Logs[I, G]
4346
): I[Executor[F]] =
@@ -48,34 +51,50 @@ object Executor {
4851
}
4952

5053
final private class Live[
51-
F[_]: Monad: Evals[*[_], G],
52-
G[_]: Monad: Logging: TraceId.Local: Clock: Catches
54+
F[_]: Monad: Evals[*[_], G]: Defer: SemigroupK: ParFlatten,
55+
G[_]: Monad: Logging: Catches: Timer
5356
](conf: ExecutionConfig)(implicit
5457
orders: CFMMCircuit[F, G],
58+
executedOrders: CFMMHistConsumer[F, G],
59+
backlog: CFMMBacklog[G],
5560
service: Execution[G],
5661
errParser: TxSubmissionErrorParser
5762
) extends Executor[F] {
5863

5964
def run: F[Unit] =
65+
emits(
66+
List(
67+
addToBacklog,
68+
executeOrders,
69+
dropExecuted
70+
)
71+
).parFlattenUnbounded
72+
73+
def addToBacklog: F[Unit] =
6074
orders.stream
61-
.evalMap { rec =>
62-
service
63-
.executeAttempt(rec.message)
64-
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
65-
.local(_ => TraceId.fromString(rec.message.id.value))
66-
.tupleLeft(rec)
75+
.evalTap(orderRec => backlog.put(orderRec.message))
76+
.evalMap(_.commit)
77+
78+
def dropExecuted: F[Unit] =
79+
executedOrders.stream
80+
.evalTap { rec =>
81+
rec.message.traverse(order => backlog.drop(order.order.id))
6782
}
68-
.flatTap {
69-
case (_, None) => unit[F]
70-
case (_, Some(order)) =>
71-
eval(now.millis) >>= {
72-
case ts if ts - order.timestamp < conf.orderLifetime.toMillis =>
73-
eval(warn"Failed to execute $order. Going to retry.") >>
74-
orders.retry((order.id -> order).pure[F])
75-
case _ =>
76-
eval(warn"Failed to execute $order. Order expired.")
77-
}
83+
.evalMap(_.commit)
84+
85+
def executeOrders: F[Unit] =
86+
eval(backlog.get).evalMap {
87+
case Some(order) => executeOrder(order)
88+
case None => trace"No orders to execute. Going to wait for" >> Timer[G].sleep(conf.order)
89+
}.repeat
90+
91+
private def executeOrder(order: CFMMOrder): G[Unit] =
92+
service
93+
.executeAttempt(order)
94+
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
95+
.flatMap {
96+
case Some(order) => backlog.suspend(order)
97+
case None => backlog.checkLater(order)
7898
}
79-
.evalMap { case (rec, _) => rec.commit }
8099
}
81100
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package org.ergoplatform.dex.executor.amm.repositories
2+
3+
import cats.{FlatMap, Functor}
4+
import derevo.derive
5+
import org.ergoplatform.common.cache.Cache
6+
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
7+
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools.{CFMMPoolsTracing, Live}
8+
import tofu.higherKind.Mid
9+
import tofu.higherKind.derived.representableK
10+
import tofu.logging.{Logging, Logs}
11+
import tofu.syntax.logging._
12+
import tofu.syntax.monadic._
13+
14+
@derive(representableK)
15+
trait CFMMOrders[F[_]] {
16+
17+
def put(order: CFMMOrder): F[Unit]
18+
19+
def exists(orderId: OrderId): F[Boolean]
20+
21+
def drop(orderId: OrderId): F[Unit]
22+
23+
def get(orderId: OrderId): F[Option[CFMMOrder]]
24+
25+
def getAll: F[List[CFMMOrder]]
26+
}
27+
28+
object CFMMOrders {
29+
30+
def make[I[_]: Functor, F[_]: FlatMap](implicit logs: Logs[I, F], cache: Cache[F]): I[CFMMOrders[F]] =
31+
logs.forService[CFMMOrders[F]].map { implicit logging =>
32+
new CFMMOrdersTracingMid[F] attach new Live[F](cache)
33+
}
34+
35+
final private class Live[F[_]](cache: Cache[F]) extends CFMMOrders[F] {
36+
37+
def put(order: CFMMOrder): F[Unit] = cache.set(order.id, order)
38+
39+
def exists(orderId: OrderId): F[Boolean] = cache.exists(orderId)
40+
41+
def drop(orderId: OrderId): F[Unit] = cache.del(orderId)
42+
43+
def get(orderId: OrderId): F[Option[CFMMOrder]] = cache.get[OrderId, CFMMOrder](orderId)
44+
45+
def getAll: F[List[CFMMOrder]] = cache.getAll
46+
}
47+
48+
final private class CFMMOrdersTracingMid[F[_]: FlatMap: Logging] extends CFMMOrders[Mid[F, *]] {
49+
50+
def put(order: CFMMOrder): Mid[F, Unit] = for {
51+
_ <- trace"put(order=$order)"
52+
r <- _
53+
_ <- trace"put(order=$order) -> $r"
54+
} yield r
55+
56+
def exists(orderId: OrderId): Mid[F, Boolean] = for {
57+
_ <- trace"exists(orderId=$orderId)"
58+
r <- _
59+
_ <- trace"exists(orderId=$orderId) -> $r"
60+
} yield r
61+
62+
def drop(orderId: OrderId): Mid[F, Unit] = for {
63+
_ <- trace"drop(orderId=$orderId)"
64+
r <- _
65+
_ <- trace"drop(orderId=$orderId) -> $r"
66+
} yield r
67+
68+
def get(orderId: OrderId): Mid[F, Option[CFMMOrder]] = for {
69+
_ <- trace"checkLater(order=$orderId)"
70+
r <- _
71+
_ <- trace"checkLater(order=$orderId) -> $r"
72+
} yield r
73+
74+
def getAll: Mid[F, List[CFMMOrder]] = for {
75+
_ <- trace"getAll()"
76+
r <- _
77+
_ <- trace"getAll() -> length: ${r.length}"
78+
} yield r
79+
}
80+
}

0 commit comments

Comments
 (0)