Skip to content

Commit 0923431

Browse files
authored
Merge pull request #56 from ergolabs/full-block-scanning-migration
Remove utxo scanning
2 parents 47b5820 + 8bfd1ce commit 0923431

File tree

7 files changed

+89
-26
lines changed

7 files changed

+89
-26
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,5 @@ target
1616
project/metals.sbt
1717

1818
config.env
19+
20+
/.bsp/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.ergoplatform.ergo.domain
2+
3+
import derevo.derive
4+
import org.ergoplatform.ergo.{BlockId, TxId}
5+
import org.ergoplatform.ergo.services.explorer.models.{Transaction => ExplorerTx}
6+
import tofu.logging.derivation.loggable
7+
8+
@derive(loggable)
9+
final case class ExtendedSettledTx(
10+
id: TxId,
11+
blockId: BlockId,
12+
inclusionHeight: Int,
13+
index: Int,
14+
globalIndex: Long,
15+
timestamp: Long,
16+
settledOutputs: List[SettledOutput],
17+
inputs: List[Input]
18+
)
19+
20+
object ExtendedSettledTx {
21+
def fromExplorer(tx: ExplorerTx): ExtendedSettledTx =
22+
ExtendedSettledTx(
23+
tx.id,
24+
tx.blockId,
25+
tx.inclusionHeight,
26+
tx.index,
27+
tx.globalIndex,
28+
tx.timestamp,
29+
tx.outputs.map(SettledOutput.fromExplorer),
30+
tx.inputs.map(Input.fromExplorer)
31+
)
32+
}

modules/dex-core/src/main/scala/org/ergoplatform/ergo/domain/SettledTransaction.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,14 @@ object SettledTransaction {
2727
tx.globalIndex,
2828
tx.timestamp
2929
)
30+
31+
def fromExtendedSettledTx(tx: ExtendedSettledTx): SettledTransaction =
32+
SettledTransaction(
33+
Transaction(tx.id, tx.inputs, tx.settledOutputs.map(_.output)),
34+
tx.blockId,
35+
tx.inclusionHeight,
36+
tx.index,
37+
tx.globalIndex,
38+
tx.timestamp
39+
)
3040
}

modules/dex-core/src/main/scala/org/ergoplatform/ergo/modules/LedgerStreaming.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package org.ergoplatform.ergo.modules
22

33
import cats.Functor
44
import derevo.derive
5-
import org.ergoplatform.ergo.domain.{Block, SettledOutput, SettledTransaction}
5+
import org.ergoplatform.ergo.domain.{Block, ExtendedSettledTx, SettledOutput, SettledTransaction}
66
import org.ergoplatform.ergo.services.explorer.ErgoExplorerStreaming
77
import tofu.higherKind.derived.representableK
88
import tofu.syntax.monadic._
@@ -25,6 +25,10 @@ trait LedgerStreaming[F[_]] {
2525
/** Get a stream of blocks at the given offset(height).
2626
*/
2727
def streamBlocks(gOffset: Long, limit: Int): F[Block]
28+
29+
/** Get a stream of transactions at the given global offset with extended output model.
30+
*/
31+
def streamExtendedTxs(gOffset: Long, limit: Int): F[ExtendedSettledTx]
2832
}
2933

3034
object LedgerStreaming {
@@ -45,5 +49,8 @@ object LedgerStreaming {
4549

4650
def streamBlocks(gOffset: Long, limit: Int): F[Block] =
4751
explorer.streamBlocks(gOffset, limit).map(Block.fromExplorer)
52+
53+
def streamExtendedTxs(gOffset: Long, limit: Int): F[ExtendedSettledTx] =
54+
explorer.streamTransactions(gOffset, limit).map(ExtendedSettledTx.fromExplorer)
4855
}
4956
}

modules/markets-index/src/main/scala/org/ergoplatform/dex/index/App.scala

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import fs2.kafka.serde._
77
import org.ergoplatform.ErgoAddressEncoder
88
import org.ergoplatform.common.EnvApp
99
import org.ergoplatform.common.cache.{MakeRedisTransaction, Redis}
10-
import org.ergoplatform.common.db.{doobieLogging, PostgresTransactor}
10+
import org.ergoplatform.common.db.{PostgresTransactor, doobieLogging}
1111
import org.ergoplatform.common.streaming.{Consumer, MakeKafkaConsumer, Producer}
1212
import org.ergoplatform.dex.configs.ConsumerConfig
1313
import org.ergoplatform.dex.domain.amm.{CFMMPool, EvaluatedCFMMOrder, OrderId, PoolId}
@@ -18,8 +18,7 @@ import org.ergoplatform.dex.index.processes.{BlockIndexing, HistoryIndexing, Loc
1818
import org.ergoplatform.dex.index.repositories.RepoBundle
1919
import org.ergoplatform.dex.index.streaming.{BlocksConsumer, CFMMHistConsumer, CFMMPoolsConsumer, LqLocksConsumer}
2020
import org.ergoplatform.dex.tracker.handlers._
21-
import org.ergoplatform.dex.tracker.processes.LedgerTracker.TrackerMode
22-
import org.ergoplatform.dex.tracker.processes.{BlockTracker, LedgerTracker, TxTracker}
21+
import org.ergoplatform.dex.tracker.processes.{BlockTracker, TxTracker}
2322
import org.ergoplatform.dex.tracker.repositories.TrackerCache
2423
import org.ergoplatform.ergo.BlockId
2524
import org.ergoplatform.ergo.domain.Block
@@ -86,25 +85,23 @@ object App extends EnvApp[ConfigBundle] {
8685
implicit0(node: ErgoNode[RunF]) <- Resource.eval(ErgoNode.make[InitF, RunF])
8786
implicit0(network: ErgoNetwork[RunF]) = ErgoNetwork.make[RunF]
8887
implicit0(ledger: LedgerStreaming[StreamF]) = LedgerStreaming.make[StreamF, RunF]
89-
cfmmPoolsHandler <- Resource.eval(SettledCFMMPoolsHandler.make[InitF, StreamF, RunF])
90-
cfmmHistoryHandler <- Resource.eval(CFMMHistoryHandler.make[InitF, StreamF, RunF])
91-
lqLocksHandler <- Resource.eval(LiquidityLocksHandler.make[InitF, StreamF, RunF])
88+
cfmmPoolsHandler <- Resource.eval(SettledCFMMPoolsHandler.make[InitF, StreamF, RunF]).map(liftSettledOutputs[StreamF])
89+
lqLocksHandler <- Resource.eval(LiquidityLocksHandler.make[InitF, StreamF, RunF]).map(liftOutputs[StreamF])
90+
cfmmHistoryHandler <- Resource.eval(CFMMHistoryHandler.make[InitF, StreamF, RunF]).map(liftSettledTx[StreamF])
9291
blockHandler <- Resource.eval(BlockHistoryHandler.make[InitF, StreamF, RunF])
9392
implicit0(redis: Redis.Plain[RunF]) <- Redis.make[InitF, RunF](configs.redis)
9493
implicit0(cache: TrackerCache[RunF]) <- Resource.eval(TrackerCache.make[InitF, RunF])
9594
blockTracker <- Resource.eval(BlockTracker.make[InitF, StreamF, RunF](blockHandler))
96-
utxoTracker <-
97-
Resource.eval(
98-
LedgerTracker.make[InitF, StreamF, RunF](TrackerMode.Historical, cfmmPoolsHandler, lift(lqLocksHandler))
99-
)
100-
txTracker <- Resource.eval(TxTracker.make[InitF, StreamF, RunF](cfmmHistoryHandler))
101-
implicit0(repos: RepoBundle[xa.DB]) <- Resource.eval(RepoBundle.make[InitF, xa.DB])
102-
historyIndexer <- Resource.eval(HistoryIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
103-
poolsIndexer <- Resource.eval(PoolsIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
104-
locksIndexer <- Resource.eval(LocksIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
105-
blocksIndexer <- Resource.eval(BlockIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
95+
txTracker <- Resource.eval(
96+
TxTracker.make[InitF, StreamF, RunF](cfmmHistoryHandler, lqLocksHandler, cfmmPoolsHandler)
97+
)
98+
implicit0(repos: RepoBundle[xa.DB]) <- Resource.eval(RepoBundle.make[InitF, xa.DB])
99+
historyIndexer <- Resource.eval(HistoryIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
100+
poolsIndexer <- Resource.eval(PoolsIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
101+
locksIndexer <- Resource.eval(LocksIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
102+
blocksIndexer <- Resource.eval(BlockIndexing.make[InitF, StreamF, RunF, xa.DB, Chunk])
106103
processes =
107-
utxoTracker.run :: txTracker.run :: blockTracker.run :: poolsIndexer.run :: historyIndexer.run :: locksIndexer.run :: blocksIndexer.run :: Nil
104+
txTracker.run :: blockTracker.run :: poolsIndexer.run :: historyIndexer.run :: locksIndexer.run :: blocksIndexer.run :: Nil
108105
} yield (processes, configs)
109106
// format:on
110107

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,30 @@
11
package org.ergoplatform.dex.tracker
22

3-
import cats.Functor
4-
import org.ergoplatform.ergo.domain.{Output, Block, SettledOutput, SettledTransaction, Transaction}
3+
import cats.{Functor, Monad}
4+
import org.ergoplatform.ergo.domain.{Block, ExtendedSettledTx, Output, SettledOutput, SettledTransaction, Transaction}
5+
import tofu.streams.Emits
56
import tofu.syntax.monadic._
7+
import tofu.syntax.streams.emits.emits
68

79
package object handlers {
810
type BoxHandler[F[_]] = F[Output] => F[Unit]
911
type SettledBoxHandler[F[_]] = F[SettledOutput] => F[Unit]
1012
type TxHandler[F[_]] = F[Transaction] => F[Unit]
1113
type SettledTxHandler[F[_]] = F[SettledTransaction] => F[Unit]
1214
type SettledBlockHandler[F[_]] = F[Block] => F[Unit]
15+
type ExtendedTxHandler[F[_]] = F[ExtendedSettledTx] => F[Unit]
1316

1417
def lift[F[_]: Functor](bh: BoxHandler[F]): SettledBoxHandler[F] = fa => bh(fa.map(_.output))
18+
19+
def liftSettledTx[F[_]: Monad](f: F[SettledTransaction] => F[Unit]): ExtendedTxHandler[F] =
20+
stream =>
21+
f(stream.map(SettledTransaction.fromExtendedSettledTx))
22+
23+
def liftOutputs[F[_]: Monad: Emits](f: BoxHandler[F]): ExtendedTxHandler[F] =
24+
stream =>
25+
f(stream.map(_.settledOutputs.map(_.output)).flatMap(emits(_)))
26+
27+
def liftSettledOutputs[F[_]: Monad: Emits](f: SettledBoxHandler[F]): ExtendedTxHandler[F] =
28+
stream =>
29+
f(stream.map(_.settledOutputs).flatMap(emits(_)))
1530
}

modules/utxo-tracker/src/main/scala/org/ergoplatform/dex/tracker/processes/TxTracker.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@ package org.ergoplatform.dex.tracker.processes
33
import cats.{Defer, Functor, Monad, MonoidK}
44
import derevo.derive
55
import org.ergoplatform.dex.tracker.configs.TxTrackerConfig
6-
import org.ergoplatform.dex.tracker.handlers.{SettledTxHandler, TxHandler}
6+
import org.ergoplatform.dex.tracker.handlers.ExtendedTxHandler
77
import org.ergoplatform.dex.tracker.repositories.TrackerCache
8+
import org.ergoplatform.ergo.domain.ExtendedSettledTx
89
import org.ergoplatform.ergo.modules.{ErgoNetwork, LedgerStreaming}
9-
import org.ergoplatform.ergo.services.explorer.ErgoExplorerStreaming
1010
import tofu.Catches
1111
import tofu.higherKind.derived.representableK
1212
import tofu.logging.{Logging, Logs}
@@ -29,7 +29,7 @@ object TxTracker {
2929
I[_]: Functor,
3030
F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: Catches: TxTrackerConfig.Has,
3131
G[_]: Monad
32-
](handlers: SettledTxHandler[F]*)(implicit
32+
](handlers: ExtendedTxHandler[F]*)(implicit
3333
cache: TrackerCache[G],
3434
ledger: LedgerStreaming[F],
3535
network: ErgoNetwork[G],
@@ -42,7 +42,7 @@ object TxTracker {
4242
final class StreamingTxTracker[
4343
F[_]: Monad: Evals[*[_], G]: ParFlatten: Pace: Defer: MonoidK: Catches,
4444
G[_]: Monad: Logging
45-
](conf: TxTrackerConfig, handlers: List[SettledTxHandler[F]])(implicit
45+
](conf: TxTrackerConfig, handlers: List[ExtendedTxHandler[F]])(implicit
4646
cache: TrackerCache[G],
4747
ledger: LedgerStreaming[F],
4848
network: ErgoNetwork[G]
@@ -59,10 +59,10 @@ object TxTracker {
5959
val scan =
6060
eval(info"Requesting TX batch {offset=$offset, maxOffset=$maxOffset, batchSize=${conf.batchSize} ..") >>
6161
ledger
62-
.streamTxs(offset, conf.batchSize)
62+
.streamExtendedTxs(offset, conf.batchSize)
6363
.evalTap(tx => trace"Scanning TX $tx")
6464
.flatTap(tx => emits(handlers.map(_(tx.pure[F]))).parFlattenUnbounded)
65-
.evalMap(tx => cache.setLastScannedTxOffset(tx.globalIndex))
65+
.void
6666
val finalizeOffset = eval(cache.setLastScannedTxOffset(nextOffset))
6767
val pause =
6868
eval(info"Upper limit {maxOffset=$maxOffset} was reached. Retrying in ${conf.retryDelay.toSeconds}s") >>

0 commit comments

Comments
 (0)