Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PEVM-opt: parallel Txs Prepare (#176)
Browse files Browse the repository at this point in the history
sunny2022da authored Sep 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent 96c4ba0 commit 5d7dd63
Showing 1 changed file with 144 additions and 37 deletions.
181 changes: 144 additions & 37 deletions core/parallel_state_processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package core

import (
"context"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/metrics"
@@ -201,7 +202,7 @@ func (p *ParallelStateProcessor) resetState(txNum int, statedb *state.StateDB) {
p.inConfirmStage2 = false

statedb.PrepareForParallel()
p.allTxReqs = make([]*ParallelTxRequest, 0, txNum)
p.allTxReqs = make([]*ParallelTxRequest, txNum)

for _, slot := range p.slotState {
slot.pendingTxReqList = make([]*ParallelTxRequest, 0)
@@ -872,48 +873,110 @@ func (p *ParallelStateProcessor) Process(block *types.Block, statedb *state.Stat
p.commonTxs = make([]*types.Transaction, 0, txNum)
p.receipts = make([]*types.Receipt, 0, txNum)

for i, tx := range allTxs {
// can be moved it into slot for efficiency, but signer is not concurrent safe
// Parallel Execution 1.0&2.0 is for full sync mode, Nonce PreCheck is not necessary
// And since we will do out-of-order execution, the Nonce PreCheck could fail.
// We will disable it and leave it to Parallel 3.0 which is for validator mode
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}
parallelNum := p.parallelNum

if txNum > parallelNum*2 && txNum >= 4 {
var wg sync.WaitGroup
errChan := make(chan error)

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
begin := 0
// first try to find latestExcludeTx, as for opBNB, they are the first consecutive txs.
for idx := 0; idx < len(allTxs); idx++ {
if txDAG != nil && txDAG.TxDep(idx).CheckFlag(types.ExcludedTxFlag) {
if err := p.transferTxs(allTxs, idx, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil {
return nil, nil, 0, err
}
latestExcludedTx = idx
} else {
begin = idx
break
}
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx

// Create a cancelable context
ctx, cancel := context.WithCancel(context.Background())

// Create a pool of workers
transactionsPerWorker := (len(allTxs) - begin) / parallelNum

// Create a pool of workers
for i := 0; i < parallelNum; i++ {
wg.Add(1)
go func(start, end int, signer types.Signer, blk *types.Block, sdb *state.StateDB, cfg vm.Config, usedGas *uint64) {
defer wg.Done()
for j := start; j < end; j++ {
select {
case <-ctx.Done():
return // Exit the goroutine if the context is canceled
default:
if err := p.transferTxs(allTxs, j, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil {
errChan <- err
cancel() // Cancel the context to stop other goroutines
return
}
}
}
}(begin+i*transactionsPerWorker, begin+(i+1)*transactionsPerWorker, signer, block, statedb, cfg, usedGas)
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
baseStateDB: statedb,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
usedGas: usedGas,
curTxChan: make(chan int, 1),
runnable: 1, // 0: not runnable, 1: runnable
useDAG: txDAG != nil,
// Distribute any remaining transactions
for i := begin + parallelNum*transactionsPerWorker; i < len(allTxs); i++ {
if err := p.transferTxs(allTxs, i, signer, block, statedb, cfg, usedGas, latestExcludedTx); err != nil {
errChan <- err
cancel() // Cancel the context to stop other goroutines
}
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))

// Wait for all workers to finish and handle errors
go func() {
wg.Wait()
close(errChan)
}()

for err := range errChan {
return nil, nil, 0, err
}
p.allTxReqs = append(p.allTxReqs, txReq)
if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
latestExcludedTx = i
//
} else {
for i, tx := range allTxs {
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
baseStateDB: statedb,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
usedGas: usedGas,
curTxChan: make(chan int, 1),
runnable: 1, // 0: not runnable, 1: runnable
useDAG: txDAG != nil,
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))
}
p.allTxReqs[i] = txReq
if txDAG != nil && txDAG.TxDep(i).CheckFlag(types.ExcludedTxFlag) {
latestExcludedTx = i
}
}
}
allTxCount := len(p.allTxReqs)
@@ -1064,6 +1127,50 @@ func (p *ParallelStateProcessor) handlePendingResultLoop() {
}
}

func (p *ParallelStateProcessor) transferTxs(txs types.Transactions, i int, signer types.Signer, block *types.Block, statedb *state.StateDB, cfg vm.Config, usedGas *uint64, latestExcludedTx int) error {
if p.allTxReqs[i] != nil {
return nil
}
tx := txs[i]
txDAG := cfg.TxDAG
msg, err := TransactionToMessage(tx, signer, block.Header().BaseFee)
if err != nil {
return fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}

// find the latestDepTx from TxDAG or latestExcludedTx
latestDepTx := -1
if dep := types.TxDependency(txDAG, i); len(dep) > 0 {
latestDepTx = int(dep[len(dep)-1])
}
if latestDepTx < latestExcludedTx {
latestDepTx = latestExcludedTx
}

// parallel start, wrap an exec message, which will be dispatched to a slot
txReq := &ParallelTxRequest{
txIndex: i,
baseStateDB: statedb,
staticSlotIndex: -1,
tx: tx,
gasLimit: block.GasLimit(), // gp.Gas().
msg: msg,
block: block,
vmConfig: cfg,
usedGas: usedGas,
curTxChan: make(chan int, 1),
runnable: 1, // 0: not runnable, 1: runnable
useDAG: txDAG != nil,
}
txReq.executedNum.Store(0)
txReq.conflictIndex.Store(-2)
if latestDepTx >= 0 {
txReq.conflictIndex.Store(int32(latestDepTx))
}
p.allTxReqs[i] = txReq
return nil
}

func applyTransactionStageExecution(msg *Message, gp *GasPool, statedb *state.ParallelStateDB, evm *vm.EVM, delayGasFee bool) (*vm.EVM, *ExecutionResult, error) {
// Create a new context to be used in the EVM environment.
txContext := NewEVMTxContext(msg)

0 comments on commit 5d7dd63

Please sign in to comment.