Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ lazy val commonSettings = List(
scalacOptions ++= commonScalacOptions,
scalaVersion := "2.12.15",
organization := "org.ergoplatform",
version := "1.0.1-M3",
version := "1.2.0-M1",
resolvers ++= Seq(
Resolver.sonatypeRepo("public"),
Resolver.sonatypeRepo("snapshots"),
Expand Down
11 changes: 3 additions & 8 deletions modules/amm-executor/src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
rotation.retry-delay = 120s

exchange.reward-address = "9gCigPc9cZNRhKgbgdmTkVxo1ZKgw79G8DvLjCcYWAvEF3XRUKy"

execution.order-lifetime = 300s
Expand All @@ -18,12 +16,9 @@ consumers.unconfirmed-orders.group-id = "ergo"
consumers.unconfirmed-orders.client-id = "ergo"
consumers.unconfirmed-orders.topic-id = "dex.amm.cfmm.unconfirmed.orders"

consumers.orders-retry.group-id = "ergo"
consumers.orders-retry.client-id = "ergo-retry"
consumers.orders-retry.topic-id = "dex.amm.cfmm.orders.retry"

producers.orders-retry.topic-id = "dex.amm.cfmm.orders.retry"
producers.orders-retry.parallelism = 3
consumers.evaluated-orders.group-id = "ergo"
consumers.evaluated-orders.client-id = "ergo"
consumers.evaluated-orders.topic-id = "dex.cfmm.history.orders"

kafka.bootstrap-servers = ["kafka1:9092"]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,63 +1,58 @@
package org.ergoplatform.dex.executor.amm

import cats.Id
import cats.effect.{Blocker, Resource}
import fs2.kafka.RecordDeserializer
import fs2.kafka.serde._
import org.ergoplatform.ErgoAddressEncoder
import org.ergoplatform.common.EnvApp
import org.ergoplatform.common.streaming._
import org.ergoplatform.dex.configs.ConsumerConfig
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
import org.ergoplatform.dex.domain.amm.{CFMMOrder, EvaluatedCFMMOrder, OrderId}
import org.ergoplatform.dex.executor.amm.config.ConfigBundle
import org.ergoplatform.dex.executor.amm.context.AppContext
import org.ergoplatform.dex.executor.amm.interpreters.{CFMMInterpreter, N2TCFMMInterpreter, T2TCFMMInterpreter}
import org.ergoplatform.dex.executor.amm.processes.Executor
import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog
import org.ergoplatform.dex.executor.amm.processes.{BacklogCleaner, Executor, Registerer}
import org.ergoplatform.dex.executor.amm.repositories.CFMMPools
import org.ergoplatform.dex.executor.amm.services.Execution
import org.ergoplatform.dex.executor.amm.streaming.{CFMMCircuit, CFMMConsumerIn, CFMMConsumerRetries, CFMMProducerRetries}
import org.ergoplatform.dex.executor.amm.streaming.{CFMMOrders, CFMMOrdersGen, EvaluatedCFMMOrders}
import org.ergoplatform.dex.protocol.amm.AMMType.{CFMMType, N2T_CFMM, T2T_CFMM}
import org.ergoplatform.ergo.modules.ErgoNetwork
import org.ergoplatform.ergo.services.explorer.{ErgoExplorer, ErgoExplorerStreaming}
import org.ergoplatform.ergo.services.node.ErgoNode
import org.ergoplatform.ergo.state.{Confirmed, Unconfirmed}
import sttp.capabilities.fs2.Fs2Streams
import sttp.client3.SttpBackend
import sttp.client3.asynchttpclient.cats.AsyncHttpClientCatsBackend
import sttp.client3.asynchttpclient.fs2.AsyncHttpClientFs2Backend
import tofu.WithRun
import tofu.fs2Instances._
import tofu.lift.IsoK
import tofu.syntax.unlift._
import zio.interop.catz._
import zio.{ExitCode, URIO, ZEnv}

object App extends EnvApp[AppContext] {

def run(args: List[String]): URIO[ZEnv, ExitCode] =
init(args.headOption).use { case (executor, ctx) =>
val appF = executor.run.compile.drain
init(args.headOption).use { case (executor, registerer, cleaner, ctx) =>
val appF = fs2.Stream(executor.run, registerer.run, cleaner.run).parJoinUnbounded.compile.drain
appF.run(ctx) as ExitCode.success
}.orDie

private def init(configPathOpt: Option[String]): Resource[InitF, (Executor[StreamF], AppContext)] =
private def init(configPathOpt: Option[String]) =
for {
blocker <- Blocker[InitF]
configs <- Resource.eval(ConfigBundle.load[InitF](configPathOpt, blocker))
ctx = AppContext.init(configs)
implicit0(isoKRun: IsoK[RunF, InitF]) = isoKRunByContext(ctx)
implicit0(e: ErgoAddressEncoder) = ErgoAddressEncoder(configs.protocol.networkType.prefix)
implicit0(confirmedOrders: CFMMConsumerIn[StreamF, RunF, Confirmed]) =
implicit0(confirmedOrders: CFMMOrdersGen[StreamF, RunF, Confirmed]) =
makeConsumer[OrderId, Confirmed[CFMMOrder]](configs.consumers.confirmedOrders)
implicit0(unconfirmedOrders: CFMMConsumerIn[StreamF, RunF, Unconfirmed]) =
implicit0(unconfirmedOrders: CFMMOrdersGen[StreamF, RunF, Unconfirmed]) =
makeConsumer[OrderId, Unconfirmed[CFMMOrder]](configs.consumers.unconfirmedOrders)
implicit0(consumerRetries: CFMMConsumerRetries[StreamF, RunF]) =
makeConsumer[OrderId, Delayed[CFMMOrder]](configs.consumers.ordersRetry)
implicit0(orders: CFMMConsumerIn[StreamF, RunF, Id]) =
implicit0(orders: CFMMOrders[StreamF, RunF]) =
Consumer.combine2(confirmedOrders, unconfirmedOrders)(_.entity, _.entity)
implicit0(producerRetries: CFMMProducerRetries[StreamF]) <-
Producer.make[InitF, StreamF, RunF, OrderId, Delayed[CFMMOrder]](configs.producers.ordersRetry)
implicit0(consumer: CFMMCircuit[StreamF, RunF]) = StreamingCircuit.make[StreamF, RunF, OrderId, CFMMOrder]
implicit0(evaluatedOrders: EvaluatedCFMMOrders[StreamF, RunF]) =
makeConsumer[OrderId, EvaluatedCFMMOrder.Any](configs.consumers.evaluatedOrders)
implicit0(backlog: CFMMBacklog[RunF]) <- Resource.eval(CFMMBacklog.make[InitF, RunF])
implicit0(backend: SttpBackend[RunF, Fs2Streams[RunF]]) <- makeBackend(ctx, blocker)
implicit0(explorer: ErgoExplorer[RunF]) = ErgoExplorerStreaming.make[StreamF, RunF]
implicit0(node: ErgoNode[RunF]) <- Resource.eval(ErgoNode.make[InitF, RunF])
Expand All @@ -68,7 +63,9 @@ object App extends EnvApp[AppContext] {
implicit0(interpreter: CFMMInterpreter[CFMMType, RunF]) = CFMMInterpreter.make[RunF]
implicit0(execution: Execution[RunF]) <- Resource.eval(Execution.make[InitF, RunF])
executor <- Resource.eval(Executor.make[InitF, StreamF, RunF])
} yield executor -> ctx
registerer <- Resource.eval(Registerer.make[InitF, StreamF, RunF])
cleaner <- Resource.eval(BacklogCleaner.make[InitF, StreamF, RunF])
} yield (executor, registerer, cleaner, ctx)

private def makeBackend(
ctx: AppContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import tofu.optics.macros.{promote, ClassyOptics}
@derive(pureconfigReader, loggable)
@ClassyOptics
final case class ConfigBundle(
@promote rotation: RotationConfig,
@promote exchange: ExchangeConfig,
@promote execution: ExecutionConfig,
@promote monetary: MonetaryConfig,
@promote protocol: ProtocolConfig,
consumers: Consumers,
producers: Producers,
@promote kafka: KafkaConfig,
@promote network: NetworkConfig,
@promote resolver: ResolverConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ import tofu.logging.derivation.loggable
final case class Consumers(
confirmedOrders: ConsumerConfig,
unconfirmedOrders: ConsumerConfig,
ordersRetry: ConsumerConfig
evaluatedOrders: ConsumerConfig
)
Original file line number Diff line number Diff line change
@@ -1,18 +1,86 @@
package org.ergoplatform.dex.executor.amm.modules

import cats.Monad
import cats.effect.{Sync, Timer}
import cats.effect.concurrent.Ref
import org.ergoplatform.dex.domain.amm.{CFMMOrder, OrderId}
import tofu.concurrent.MakeRef
import tofu.generate.GenRandom
import tofu.syntax.monadic._

import scala.collection.immutable.{HashSet, TreeSet}
import scala.concurrent.duration.DurationInt

trait CFMMBacklog[F[_]] {

/** Put an order to the backlog.
*/
def put(order: CFMMOrder): F[Unit]

/** Get candidate order for execution. Blocks until an order is available.
def putLowPriority(order: CFMMOrder): F[Unit]

/** Pop a candidate order for execution. Blocks until an order is available.
*/
def get: F[CFMMOrder]
def pop: F[CFMMOrder]

/** Put an order from the backlog.
*/
def drop(id: OrderId): F[Unit]
def drop(id: OrderId): F[Boolean]
}

object CFMMBacklog {

private val PollInterval = 200.millis
private val PriorityThreshold = 9
private val PrioritySpace = 99

def make[I[_]: Sync, F[_]: Sync: Timer](implicit makeRef: MakeRef[I, F]): I[CFMMBacklog[F]] =
for {
implicit0(rnd: GenRandom[F]) <- GenRandom.instance[I, F]()
candidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder])
lpCandidatesR <- makeRef.refOf(TreeSet.empty[CFMMOrder])
survivorsR <- makeRef.refOf(HashSet.empty[OrderId])
} yield new EphemeralCFMMBacklog(candidatesR, lpCandidatesR, survivorsR)

// In-memory orders backlog.
// Note: Not thread safe.
final class EphemeralCFMMBacklog[F[_]: Monad: GenRandom](
candidatesR: Ref[F, TreeSet[CFMMOrder]],
lowPriorityCandidatesR: Ref[F, TreeSet[CFMMOrder]],
survivorsR: Ref[F, HashSet[OrderId]]
)(implicit T: Timer[F])
extends CFMMBacklog[F] {

def put(order: CFMMOrder): F[Unit] =
candidatesR.update(_ + order) >> survivorsR.update(_ + order.id)

def putLowPriority(order: CFMMOrder): F[Unit] =
lowPriorityCandidatesR.update(_ + order) >> survivorsR.update(_ + order.id)

def pop: F[CFMMOrder] = {
def tryPop: F[CFMMOrder] =
for {
rnd <- GenRandom.nextInt(PrioritySpace)
lpc <- lowPriorityCandidatesR.get.map(_.headOption)
maybeWinner <- lpc match {
case Some(c) if rnd <= PriorityThreshold => Left(c).pure
case _ => candidatesR.get.map(xs => Right(xs.headOption))
}
winner <- maybeWinner match {
case Right(Some(order)) => candidatesR.update(_ - order) as order
case Left(order) => lowPriorityCandidatesR.update(_ - order) as order
case _ => T.sleep(PollInterval) >> tryPop
}
} yield winner
for {
c <- tryPop
res <- survivorsR.get
.map(_.contains(c.id))
.ifM(survivorsR.update(_ - c.id) as c, pop)
} yield res
}

def drop(id: OrderId): F[Boolean] =
survivorsR.get.map(_.contains(id)).ifM(survivorsR.update(_ - id) as true, false.pure)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package org.ergoplatform.dex.executor.amm.processes

import cats.{Functor, Monad}
import derevo.derive
import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog
import org.ergoplatform.dex.executor.amm.streaming.EvaluatedCFMMOrders
import tofu.Catches
import tofu.higherKind.derived.representableK
import tofu.logging.{Logging, Logs}
import tofu.streams.Evals
import tofu.syntax.logging._
import tofu.syntax.monadic._
import tofu.syntax.handle._
import tofu.syntax.streams.all._

@derive(representableK)
trait BacklogCleaner[F[_]] {

def run: F[Unit]
}

object BacklogCleaner {

def make[
I[_]: Functor,
F[_]: Monad: Evals[*[_], G]: Catches,
G[_]: Monad
](implicit
orders: EvaluatedCFMMOrders[F, G],
backlog: CFMMBacklog[G],
logs: Logs[I, G]
): I[BacklogCleaner[F]] =
logs.forService[BacklogCleaner[F]].map(implicit l => new Live[F, G])

final private class Live[
F[_]: Monad: Evals[*[_], G]: Catches,
G[_]: Monad: Logging
](implicit
orders: EvaluatedCFMMOrders[F, G],
backlog: CFMMBacklog[G]
) extends BacklogCleaner[F] {

def run: F[Unit] =
orders.stream
.evalTap(rec => backlog.drop(rec.message.order.id).ifM(debug"Order ${rec.message} is evicted", unit[G]))
.evalMap(_.commit)
.handleWith[Throwable](e => eval(warnCause"BacklogCleaner failed. Restarting .." (e)) >> run)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package org.ergoplatform.dex.executor.amm.processes

import cats.effect.Clock
import cats.syntax.option._
import cats.{Functor, Monad}
import cats.{Defer, Functor, Monad, SemigroupK}
import derevo.derive
import mouse.any._
import org.ergoplatform.common.TraceId
import org.ergoplatform.common.streaming.syntax._
import org.ergoplatform.dex.domain.amm.CFMMOrder
import org.ergoplatform.dex.executor.amm.config.ExecutionConfig
import org.ergoplatform.dex.executor.amm.modules.CFMMBacklog
import org.ergoplatform.dex.executor.amm.services.Execution
import org.ergoplatform.dex.executor.amm.streaming.CFMMCircuit
import org.ergoplatform.ergo.services.explorer.TxSubmissionErrorParser
Expand All @@ -34,10 +35,10 @@ object Executor {

def make[
I[_]: Functor,
F[_]: Monad: Evals[*[_], G]: ExecutionConfig.Has,
F[_]: Monad: SemigroupK: Defer: Evals[*[_], G]: ExecutionConfig.Has,
G[_]: Monad: TraceId.Local: Clock: Catches
](implicit
orders: CFMMCircuit[F, G],
backlog: CFMMBacklog[G],
service: Execution[G],
logs: Logs[I, G]
): I[Executor[F]] =
Expand All @@ -48,34 +49,32 @@ object Executor {
}

final private class Live[
F[_]: Monad: Evals[*[_], G],
F[_]: Monad: SemigroupK: Defer: Evals[*[_], G],
G[_]: Monad: Logging: TraceId.Local: Clock: Catches
](conf: ExecutionConfig)(implicit
orders: CFMMCircuit[F, G],
backlog: CFMMBacklog[G],
service: Execution[G],
errParser: TxSubmissionErrorParser
) extends Executor[F] {

def run: F[Unit] =
orders.stream
.evalMap { rec =>
eval(backlog.pop).repeat
.evalMap { order =>
service
.executeAttempt(rec.message)
.executeAttempt(order)
.handleWith[Throwable](e => warnCause"Order execution failed fatally" (e) as none[CFMMOrder])
.local(_ => TraceId.fromString(rec.message.id.value))
.tupleLeft(rec)
.local(_ => TraceId.fromString(order.id.value))
.tupleLeft(order)
}
.flatTap {
case (_, None) => unit[F]
.evalMap {
case (_, None) => unit[G]
case (_, Some(order)) =>
eval(now.millis) >>= {
now.millis >>= {
case ts if ts - order.timestamp < conf.orderLifetime.toMillis =>
eval(warn"Failed to execute $order. Going to retry.") >>
orders.retry((order.id -> order).pure[F])
warn"Failed to execute $order. Going to retry." >> backlog.put(order)
case _ =>
eval(warn"Failed to execute $order. Order expired.")
warn"Failed to execute $order. Order expired."
}
}
.evalMap { case (rec, _) => rec.commit }
}
}
Loading