Skip to content

Commit

Permalink
node/consensus: priority lock for CE business to preemtpy tx influx
Browse files Browse the repository at this point in the history
  • Loading branch information
jchappelow committed Mar 3, 2025
1 parent 73e7cc2 commit 9b8ade1
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 5 deletions.
8 changes: 7 additions & 1 deletion node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ func (ce *ConsensusEngine) lastBlock() (int64, types.Hash, time.Time) {
func (ce *ConsensusEngine) QueueTx(ctx context.Context, tx *types.Tx) error {
height, _, timestamp := ce.lastBlock()

// contention on mempoolMtx is high, between here and commit().
// We do not want to delay commit() under any circumstances... so...

// commit can claim priority any time, while QueueTx must sit in a queue
// unless commit is not already running.

ce.mempoolMtx.Lock()
defer ce.mempoolMtx.Unlock()

Expand Down Expand Up @@ -276,7 +282,7 @@ func (ce *ConsensusEngine) executeBlock(ctx context.Context, blkProp *blockPropo
// Commit method commits the block to the blockstore and postgres database.
// It also updates the txIndexer and mempool with the transactions in the block.
func (ce *ConsensusEngine) commit(ctx context.Context) error {
ce.mempoolMtx.Lock()
ce.mempoolMtx.PriorityLock()
defer ce.mempoolMtx.Unlock()

if ce.state.blockRes == nil {
Expand Down
4 changes: 2 additions & 2 deletions node/consensus/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type ConsensusEngine struct {
// QueueTx (external) take this lock to ensure that no new txs are added to
// the mempool while the block is being committed i.e while the accounts are
// being updated.
mempoolMtx sync.Mutex
mempoolMtx PriorityLockQueue
// mempoolReady indicates consensus engine that has enough txs to propose a block
// CE can adjust it's wait times based on this flag.
// This flag tracks if the mempool filled enough between the commit and
Expand Down Expand Up @@ -1081,7 +1081,7 @@ func (ce *ConsensusEngine) resetBlockProp(ctx context.Context, height int64, txI

// recheck txs in the mempool, if we have deleted any txs from the mempool
if len(txIDs) > 0 {
ce.mempoolMtx.Lock()
ce.mempoolMtx.PriorityLock()
ce.mempool.RecheckTxs(ctx, ce.recheckTxFn(ce.lastBlockInternal()))
ce.mempoolMtx.Unlock()
}
Expand Down
4 changes: 2 additions & 2 deletions node/consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (ce *ConsensusEngine) proposeBlock(ctx context.Context) error {
}

// Recheck the transactions in the mempool
ce.mempoolMtx.Lock()
ce.mempoolMtx.PriorityLock()
ce.mempool.RecheckTxs(ctx, ce.recheckTxFn(ce.lastBlockInternal()))
ce.mempoolMtx.Unlock()

Expand Down Expand Up @@ -217,7 +217,7 @@ func (ce *ConsensusEngine) proposeBlock(ctx context.Context) error {
// This method orders the transactions in the nonce order and also
// does basic gas and balance checks and enforces the block size limits.
func (ce *ConsensusEngine) createBlockProposal(ctx context.Context) (*blockProposal, error) {
ce.mempoolMtx.Lock()
ce.mempoolMtx.PriorityLock()
defer ce.mempoolMtx.Unlock()

totalTxSizeLimit := ce.ConsensusParams().MaxBlockSize
Expand Down
71 changes: 71 additions & 0 deletions node/consensus/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package consensus

import "sync"

type PriorityLockQueue struct {
mtx sync.Mutex
active bool
queue []chan struct{}
}

type queueFunc func(q []chan struct{}, c chan struct{}) []chan struct{}

func appendTo[E any](q []E, c E) []E {
return append(q, c)
}

func prependTo[E any](q []E, c E) []E {
if cap(q) > len(q) { // with extra capacity, shift in-place to avoid realloc
q = append(q, c) // extend, allowing runtime to over-allocate
copy(q[1:], q) // shift right
q[0] = c // insert at front
} else {
q = append([]E{c}, q...)
}
return q
}

func (pl *PriorityLockQueue) lock(qf queueFunc) {
pl.mtx.Lock()
if !pl.active {
pl.active = true
pl.mtx.Unlock()
return
}

ch := make(chan struct{})
pl.queue = qf(pl.queue, ch)
pl.mtx.Unlock()

<-ch // wait
}

func (pl *PriorityLockQueue) Lock() {
pl.lock(appendTo) // back of the line
}

func (pl *PriorityLockQueue) PriorityLock() {
pl.lock(prependTo) // jump the line
// NOTE: this is intended for only one serial caller to PriorityLock, like
// commit(), not multiple. If there is another PriorityLock caller before
// the first unblocks, the second one will take the front of the line.
}

func (pl *PriorityLockQueue) Unlock() {
pl.mtx.Lock()

if len(pl.queue) == 0 {
pl.active = false
pl.mtx.Unlock()
return
}

// Wake up the next in line
ch := pl.queue[0]
pl.queue = pl.queue[1:]
// pl.active = true

pl.mtx.Unlock()

close(ch)
}
163 changes: 163 additions & 0 deletions node/consensus/lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package consensus_test

import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/kwilteam/kwil-db/core/utils/random"
"github.com/kwilteam/kwil-db/node/consensus"
)

func TestPriorityLockQueue(t *testing.T) {
pl := consensus.PriorityLockQueue{}

// Simulate queue calls
for i := 1; i <= 3; i++ {
go func(id int) {
pl.Lock()
fmt.Printf("Queue %d acquired lock\n", id)
time.Sleep(1 * time.Second)
fmt.Printf("Queue %d released lock\n", id)
pl.Unlock()
}(i)
runtime.Gosched()
}

time.Sleep(500 * time.Millisecond) // Allow queue to start

// Commit takes priority
go func() {
pl.PriorityLock()
fmt.Println("Commit acquired lock")
time.Sleep(2 * time.Second)
fmt.Println("Commit released lock")
pl.Unlock()
}()

time.Sleep(5 * time.Second) // Wait for all routines to finish
}

func BenchmarkRegularLocks(b *testing.B) {
const numWorkers = 100 // Fixed worker count
var pl consensus.PriorityLockQueue
var wg sync.WaitGroup

b.ResetTimer()
work := make(chan struct{}, numWorkers)

// Create fixed worker pool
for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
for range work { // Consume work
pl.Lock()
time.Sleep(10 * time.Microsecond) // Simulate work
pl.Unlock()
}
}()
}

// Feed the workers `b.N` operations
for range b.N {
work <- struct{}{}
}
close(work) // Signal workers to exit
wg.Wait()
}

func BenchmarkPriorityLocks(b *testing.B) {
const numWorkers = 100 // Fixed worker count
var pl consensus.PriorityLockQueue
var wg sync.WaitGroup

b.ResetTimer()
work := make(chan struct{}, numWorkers)

for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
for range work {
pl.PriorityLock()
time.Sleep(10 * time.Microsecond)
pl.Unlock()
}
}()
}

for range b.N {
work <- struct{}{}
}
close(work)
wg.Wait()
}

func BenchmarkMutexLocks(b *testing.B) {
const numWorkers = 100 // Fixed worker count
var pl sync.Mutex
var wg sync.WaitGroup

b.ResetTimer()
work := make(chan struct{}, numWorkers)

for range numWorkers {
wg.Add(1)
go func() {
defer wg.Done()
for range work {
pl.Lock()
time.Sleep(10 * time.Microsecond)
pl.Unlock()
}
}()
}

for range b.N {
work <- struct{}{}
}
close(work)
wg.Wait()
}

func BenchmarkMixedPriorityLocks(b *testing.B) {
const numWorkers = 40 // Fixed worker count
var pl consensus.PriorityLockQueue
var wg sync.WaitGroup

var lt, pt atomic.Int64
var np, nl atomic.Int64

b.ResetTimer()
for range numWorkers { // Fixed number of goroutines
wg.Add(1)
go func() {
defer wg.Done()
for range max(1, b.N/numWorkers) { // Spread iterations across workers
t0 := time.Now()
priority := random.Source.Uint64()%20 == 0
if priority {
pl.PriorityLock()
pt.Add(int64(time.Since(t0)))
np.Add(1)
} else {
pl.Lock()
lt.Add(int64(time.Since(t0)))
nl.Add(1)
}
time.Sleep(time.Microsecond) // Simulate work
pl.Unlock()
}
}()
}

wg.Wait()

if np.Load() > 0 {
b.Log(time.Duration(lt.Load()/nl.Load()), time.Duration(pt.Load()/np.Load()))
}
}

0 comments on commit 9b8ade1

Please sign in to comment.