Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
470 changes: 453 additions & 17 deletions docker/monitornode/dashboards/cryptosim-dashboard.json

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions sei-db/ledger_db/receipt/parquet_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *parquetReceiptStore) SetReceipts(ctx sdk.Context, receipts []ReceiptRec
BlockNumber: blockNumber,
ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes),
},
Logs: buildParquetLogRecords(txLogs, blockHash),
Logs: BuildParquetLogRecords(txLogs, blockHash),
ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes),
})
}
Expand Down Expand Up @@ -308,7 +308,7 @@ func (s *parquetReceiptStore) replayWAL() error {
BlockNumber: blockNumber,
ReceiptBytes: parquet.CopyBytesOrEmpty(receiptBytes),
},
Logs: buildParquetLogRecords(txLogs, blockHash),
Logs: BuildParquetLogRecords(txLogs, blockHash),
}

if err := s.store.ApplyReceiptFromReplay(input); err != nil {
Expand Down Expand Up @@ -348,14 +348,14 @@ func truncateReplayWAL(w interface{ TruncateBefore(offset uint64) error }, dropO
return nil
}

func buildParquetLogRecords(logs []*ethtypes.Log, blockHash common.Hash) []parquet.LogRecord {
func BuildParquetLogRecords(logs []*ethtypes.Log, blockHash common.Hash) []parquet.LogRecord {
if len(logs) == 0 {
return nil
}

records := make([]parquet.LogRecord, 0, len(logs))
for _, lg := range logs {
topic0, topic1, topic2, topic3 := extractLogTopics(lg.Topics)
topic0, topic1, topic2, topic3 := ExtractLogTopics(lg.Topics)
rec := parquet.LogRecord{
BlockNumber: lg.BlockNumber,
TxHash: lg.TxHash[:],
Expand Down Expand Up @@ -393,7 +393,7 @@ func buildTopicsFromParquetLogResult(lr parquet.LogResult) []common.Hash {
return topicList
}

func extractLogTopics(topics []common.Hash) ([]byte, []byte, []byte, []byte) {
func ExtractLogTopics(topics []common.Hash) ([]byte, []byte, []byte, []byte) {
t0 := make([]byte, 0)
t1 := make([]byte, 0)
t2 := make([]byte, 0)
Expand Down
21 changes: 20 additions & 1 deletion sei-db/state_db/bench/cryptosim/block.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package cryptosim

import "iter"
import (
"iter"

evmtypes "github.com/sei-protocol/sei-chain/x/evm/types"
)

// A simulated block of transactions.
type block struct {
Expand All @@ -9,6 +13,9 @@ type block struct {
// The transactions in the block.
transactions []*transaction

// If receipt generation is enabled, this will contain the receipts for each transaction in the block.
reciepts []*evmtypes.Receipt

// The block number. This is not currently preserved across benchmark restarts, but otherwise monotonically
// increases as you'd expect.
blockNumber int64
Expand All @@ -32,11 +39,18 @@ func NewBlock(
blockNumber int64,
capacity int,
) *block {

var reciepts []*evmtypes.Receipt
if config.GenerateReceipts {
reciepts = make([]*evmtypes.Receipt, 0, capacity)
}

return &block{
config: config,
blockNumber: blockNumber,
transactions: make([]*transaction, 0, capacity),
metrics: metrics,
reciepts: reciepts,
}
}

Expand All @@ -56,6 +70,11 @@ func (b *block) AddTransaction(txn *transaction) {
b.transactions = append(b.transactions, txn)
}

// Adds a receipt to the block.
func (b *block) AddReceipt(receipt *evmtypes.Receipt) {
b.reciepts = append(b.reciepts, receipt)
}

// Returns the block number.
func (b *block) BlockNumber() int64 {
return b.blockNumber
Expand Down
15 changes: 15 additions & 0 deletions sei-db/state_db/bench/cryptosim/block_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ func (b *blockBuilder) buildBlock() *block {
continue
}
blk.AddTransaction(txn)

if b.config.GenerateReceipts {
receipt, err := BuildERC20TransferReceiptFromTxn(
b.dataGenerator.Rand(),
b.dataGenerator.FeeCollectionAddress(),
uint64(blk.BlockNumber()), //nolint:gosec
uint32(i), //nolint:gosec
txn,
)
if err != nil {
fmt.Printf("failed to build receipt: %v\n", err)
continue
}
blk.AddReceipt(receipt)
}
}

blk.SetBlockAccountStats(
Expand Down
8 changes: 8 additions & 0 deletions sei-db/state_db/bench/cryptosim/config/reciept-store.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"Comment": "For testing with the state store and reciept store both enabled.",
"DataDir": "data",
"MinimumNumberOfColdAccounts": 1000000,
"MinimumNumberOfDormantAccounts": 1000000,
"GenerateReceipts": true
}

72 changes: 65 additions & 7 deletions sei-db/state_db/bench/cryptosim/cryptosim.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/sei-protocol/sei-chain/sei-db/state_db/bench/wrappers"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -75,6 +76,12 @@ type CryptoSim struct {
// This is fixed after initial setup is complete, since we don't currently simulate
// the creation of new ERC20 contracts during the benchmark.
nextERC20ContractID int64

// The channel that holds blocks sent to the receipt store.
recieptsChan chan *block

// Enforces a maximum transaction rate (if enabled).
rateLimiter *rate.Limiter
}

// Creates a new cryptosim benchmark runner.
Expand Down Expand Up @@ -131,7 +138,7 @@ func NewCryptoSim(

start := time.Now()

database := NewDatabase(config, db, metrics)
database := NewDatabase(config, db, metrics, 0)

dataGenerator, err := NewDataGenerator(config, database, rand, metrics)
if err != nil {
Expand All @@ -141,6 +148,7 @@ func NewCryptoSim(
}
return nil, fmt.Errorf("failed to create data generator: %w", err)
}
database.nextBlockNumber = dataGenerator.InitialNextBlockNumber()
threadCount := int(config.ThreadsPerCore)*runtime.NumCPU() + config.ConstantThreadCount
if threadCount < 1 {
threadCount = 1
Expand All @@ -150,7 +158,23 @@ func NewCryptoSim(
executors := make([]*TransactionExecutor, threadCount)
for i := 0; i < threadCount; i++ {
executors[i] = NewTransactionExecutor(
ctx, cancel, database, dataGenerator.FeeCollectionAddress(), config.ExecutorQueueSize, metrics)
ctx, cancel, config, database, dataGenerator.FeeCollectionAddress(), config.ExecutorQueueSize, metrics)
}

var recieptsChan chan *block
if config.GenerateReceipts {
recieptsChan = make(chan *block, config.RecieptChannelCapacity)
_, err := NewRecieptStoreSimulator(ctx, config, recieptsChan, metrics)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create receipt store simulator: %w", err)
}
metrics.startReceiptChannelDepthSampling(recieptsChan, config.BackgroundMetricsScrapeInterval)
}

var rateLimiter *rate.Limiter
if config.MaxTPS > 0 {
rateLimiter = rate.NewLimiter(rate.Limit(config.MaxTPS), config.TransactionsPerBlock)
}

blockBuilder := NewBlockBuilder(ctx, config, metrics, dataGenerator)
Expand All @@ -169,6 +193,8 @@ func NewCryptoSim(
executors: executors,
metrics: metrics,
suspendChan: make(chan bool, 1),
recieptsChan: recieptsChan,
rateLimiter: rateLimiter,
}

database.SetFlushFunc(c.flushExecutors)
Expand Down Expand Up @@ -227,7 +253,7 @@ func (c *CryptoSim) setupAccounts() error {
if err != nil {
return fmt.Errorf("failed to create new account: %w", err)
}
c.database.IncrementTransactionCount(1)
c.database.IncrementTransactionCount()
finalized, err := c.database.MaybeFinalizeBlock(
c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID())
if err != nil {
Expand All @@ -249,7 +275,8 @@ func (c *CryptoSim) setupAccounts() error {
fmt.Printf("Created %s of %s accounts. \n",
int64Commas(c.dataGenerator.NextAccountID()), int64Commas(int64(requiredNumberOfAccounts)))

err := c.database.FinalizeBlock(c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true)
err := c.database.FinalizeBlock(
c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true)
if err != nil {
return fmt.Errorf("failed to finalize block: %w", err)
}
Expand Down Expand Up @@ -284,7 +311,7 @@ func (c *CryptoSim) setupErc20Contracts() error {
break
}

c.database.IncrementTransactionCount(1)
c.database.IncrementTransactionCount()

_, _, err := c.dataGenerator.CreateNewErc20Contract(c.config.Erc20ContractSize, true)
if err != nil {
Expand Down Expand Up @@ -314,7 +341,10 @@ func (c *CryptoSim) setupErc20Contracts() error {
fmt.Printf("Created %s of %s simulated ERC20 contracts. \n",
int64Commas(c.dataGenerator.NextErc20ContractID()), int64Commas(int64(c.config.MinimumNumberOfErc20Contracts)))

err := c.database.FinalizeBlock(c.dataGenerator.NextAccountID(), c.dataGenerator.NextErc20ContractID(), true)
err := c.database.FinalizeBlock(
c.dataGenerator.NextAccountID(),
c.dataGenerator.NextErc20ContractID(),
true)
if err != nil {
return fmt.Errorf("failed to finalize block: %w", err)
}
Expand Down Expand Up @@ -359,19 +389,38 @@ func (c *CryptoSim) run() {
c.cancel()
return
case blk := <-c.blockBuilder.blocksChan:
c.maybeThrottle()
c.handleNextBlock(blk)
}

c.generateConsoleReport(false)
}
}

// Potentially block for a while if we are throttling the transaction rate.
func (c *CryptoSim) maybeThrottle() {
if c.config.MaxTPS == 0 {
// Throttling is disabled.
return
}

c.metrics.SetMainThreadPhase("throttling")

if err := c.rateLimiter.WaitN(c.ctx, c.config.TransactionsPerBlock); err != nil {
fmt.Printf("failed to wait for rate limit: %v\n", err)
c.cancel()
return
}
}

// Execute and finalize the next block.
func (c *CryptoSim) handleNextBlock(blk *block) {
c.mostRecentBlock = blk
c.metrics.SetMainThreadPhase("send_to_executors")

c.database.IncrementTransactionCount(blk.TransactionCount())
for i := int64(0); i < blk.TransactionCount(); i++ {
c.database.IncrementTransactionCount()
}

for txn := range blk.Iterator() {
c.executors[c.nextExecutorIndex].ScheduleForExecution(txn)
Expand All @@ -383,6 +432,15 @@ func (c *CryptoSim) handleNextBlock(blk *block) {
c.cancel()
return
}

if c.config.GenerateReceipts {
select {
case <-c.ctx.Done():
return
case c.recieptsChan <- blk:
}
}

blk.ReportBlockMetrics()
}

Expand Down
37 changes: 36 additions & 1 deletion sei-db/state_db/bench/cryptosim/cryptosim_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,30 @@ type CryptoSimConfig struct {

// The capacity of the channel that holds blocks awaiting execution.
BlockChannelCapacity int

// If true, the benchmark will generate receipts for each transaction in each block and
// feed those receipts into the receipt store.
GenerateReceipts bool

// The capacity of the channel that holds blocks sent to the receipt store.
RecieptChannelCapacity int

// If true, disables simulation of transaction execution, and writes very little to the database. This is
// potentially useful when benchmarking things other than state storage (e.g. the receipt store).
//
// Note that switching execution on after previously running with execution disabled may result in buggy behavior,
// as the benchmark will not be properly maintaining DB state when transaction execution is disabled. In order
// to switch transaction execution back on, it is necessary to delete the on-disk database and start over.
DisableTransactionExecution bool

// If greater than 0, the benchmark will throttle the transaction rate to this value, in hertz.
MaxTPS float64

// Number of recent blocks to keep before pruning parquet files. 0 disables pruning.
ReceiptKeepRecent int64

// Interval in seconds between prune checks. 0 disables pruning.
ReceiptPruneIntervalSeconds int64
}

// Returns the default configuration for the cryptosim benchmark.
Expand Down Expand Up @@ -174,6 +198,12 @@ func DefaultCryptoSimConfig() *CryptoSimConfig {
BackgroundMetricsScrapeInterval: 60,
EnableSuspension: true,
BlockChannelCapacity: 8,
GenerateReceipts: false,
RecieptChannelCapacity: 32,
DisableTransactionExecution: false,
MaxTPS: 0,
ReceiptKeepRecent: 100_000,
ReceiptPruneIntervalSeconds: 600,
}
}

Expand Down Expand Up @@ -249,7 +279,12 @@ func (c *CryptoSimConfig) Validate() error {
if c.BlockChannelCapacity < 1 {
return fmt.Errorf("BlockChannelCapacity must be at least 1 (got %d)", c.BlockChannelCapacity)
}

if c.RecieptChannelCapacity < 1 {
return fmt.Errorf("RecieptChannelCapacity must be at least 1 (got %d)", c.RecieptChannelCapacity)
}
if c.MaxTPS < 0 {
return fmt.Errorf("MaxTPS must be non-negative (got %f)", c.MaxTPS)
}
return nil
}

Expand Down
Loading
Loading