Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/types,node/types: add cached tx hash to ktypes.Transaction, create node/types.Tx #1422

Merged
merged 2 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/types/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewBlock(height int64, prevHash, appHash, valSetHash, paramsHash Hash, stam
numTxns := len(txns)
txHashes := make([]Hash, numTxns)
for i, tx := range txns {
txHashes[i] = tx.Hash()
txHashes[i] = tx.HashCache()
}
merkelRoot := CalcMerkleRoot(txHashes)
hdr := &BlockHeader{
Expand All @@ -88,10 +88,10 @@ func (b *Block) Hash() Hash {
return b.Header.Hash()
}

func (b *Block) MerkleRoot() Hash {
func (b *Block) CalcMerkleRoot() Hash {
txHashes := make([]Hash, len(b.Txns))
for i, tx := range b.Txns {
txHashes[i] = tx.Hash()
txHashes[i] = tx.HashCache()
}
return CalcMerkleRoot(txHashes)
}
Expand Down
20 changes: 15 additions & 5 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Transaction struct {
Sender HexBytes `json:"sender"`

strictUnmarshal bool
// cachedHash *Hash // maybe maybe maybe... this would require a mutex or careful use
cachedHash *Hash // maybe maybe maybe... this would require a mutex or careful use
}

func (t *Transaction) StrictUnmarshal() {
Expand All @@ -59,6 +59,19 @@ func (t *Transaction) Hash() Hash {
return HashBytes(raw)
}

// HashCache is like Hash, but caches the hash of the transaction. If it is
// already cached, it is returned as is. Use this with caution:
// 1. it is not safe for concurrent use
// 2. the allocation and storage of the hash may potentially be undesirable
// 3. the hash is not guaranteed to be valid if the transaction is modified
func (t *Transaction) HashCache() Hash {
if t.cachedHash == nil {
hash := t.Hash()
t.cachedHash = &hash
}
return *t.cachedHash
}

// TransactionBody is the body of a transaction that gets included in the
// signature. This type implements json.Marshaler and json.Unmarshaler to ensure
// that the Fee field is represented as a string in JSON rather than a number.
Expand Down Expand Up @@ -264,10 +277,7 @@ func (t *TransactionBody) SerializeMsg(mst SignedMsgSerializationType) ([]byte,

switch mst {
case SignedMsgDirect:
msg, err := t.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("failed to serialize transaction body: %v", err)
}
msg := t.Bytes()
sigHash := HashBytes(msg) // could just be msg
return sigHash[:], nil
case SignedMsgConcat:
Expand Down
7 changes: 4 additions & 3 deletions node/block_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,9 @@ func (bp *BlockProcessor) loadNetworkParams(ctx context.Context, readTx sql.Tx)
return networkParams, nil
}

func (bp *BlockProcessor) CheckTx(ctx context.Context, tx *ktypes.Transaction, height int64, blockTime time.Time, recheck bool) error {
txHash := tx.Hash()
func (bp *BlockProcessor) CheckTx(ctx context.Context, ntx *types.Tx, height int64, blockTime time.Time, recheck bool) error {
tx := ntx.Transaction
txHash := ntx.Hash()

// If the network is halted for migration, we reject all transactions.
if bp.chainCtx.NetworkParameters.MigrationStatus == ktypes.MigrationCompleted {
Expand Down Expand Up @@ -678,7 +679,7 @@ func (bp *BlockProcessor) Commit(ctx context.Context, req *ktypes.CommitRequest)
// that consensus limits such as the maximum block size, maxVotesPerTx are met. It also adds
// validator vote transactions for events observed by the leader. This function is
// used exclusively by the leader node to prepare the proposal block.
func (bp *BlockProcessor) PrepareProposal(ctx context.Context, txs []*ktypes.Transaction) (finalTxs []*ktypes.Transaction, invalidTxs []*ktypes.Transaction, err error) {
func (bp *BlockProcessor) PrepareProposal(ctx context.Context, txs []*types.Tx) (finalTxs []*ktypes.Transaction, invalidTxs []*ktypes.Transaction, err error) {
// unmarshal and index the transactions
return bp.prepareBlockTransactions(ctx, txs)
}
Expand Down
2 changes: 1 addition & 1 deletion node/block_processor/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (bp *BlockProcessor) BlockExecutionStatus() *ktypes.BlockExecutionStatus {
func (bp *BlockProcessor) initBlockExecutionStatus(blk *ktypes.Block) []ktypes.Hash {
txIDs := make([]ktypes.Hash, len(blk.Txns))
for i, tx := range blk.Txns {
txID := tx.Hash()
txID := tx.HashCache()
txIDs[i] = txID
}
bp.statusMu.Lock()
Expand Down
7 changes: 4 additions & 3 deletions node/block_processor/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kwilteam/kwil-db/core/types"
authExt "github.com/kwilteam/kwil-db/extensions/auth"
"github.com/kwilteam/kwil-db/node/txapp"
nodetypes "github.com/kwilteam/kwil-db/node/types"
"github.com/kwilteam/kwil-db/node/types/sql"
)

Expand Down Expand Up @@ -66,15 +67,15 @@ type indexedTxn struct {
// enforces block size limits, and applies the maxVotesPerTx limit for voteID transactions.
// Additionally, it includes the ValidatorVoteBody transaction for unresolved events.
// The final transaction order is: MempoolProposerTxns, ValidatorVoteBodyTx, Other MempoolTxns (Nonce ordered, stable sorted).
func (bp *BlockProcessor) prepareBlockTransactions(ctx context.Context, txs []*types.Transaction) (finalTxs []*types.Transaction, invalidTxs []*types.Transaction, err error) {
func (bp *BlockProcessor) prepareBlockTransactions(ctx context.Context, txs []*nodetypes.Tx) (finalTxs []*types.Transaction, invalidTxs []*types.Transaction, err error) {
// Unmarshal and index the transactions.
var okTxns []*indexedTxn
invalidTxs = make([]*types.Transaction, 0, len(txs))
var i int

for is, tx := range txs {
rawTx := tx.Bytes()
okTxns = append(okTxns, &indexedTxn{i, tx, len(rawTx), types.HashBytes(rawTx), is})
okTxns = append(okTxns, &indexedTxn{i, tx.Transaction, len(rawTx), tx.Hash(), is})
i++
}

Expand Down Expand Up @@ -103,7 +104,7 @@ func (bp *BlockProcessor) prepareBlockTransactions(ctx context.Context, txs []*t
// Enfore nonce ordering and remove transactions from the unfunded accounts
for _, tx := range okTxns {
if i > 0 && tx.Body.Nonce == nonces[i-1] && bytes.Equal(tx.Sender, okTxns[i-1].Sender) {
invalidTxs = append(invalidTxs, txs[tx.is])
invalidTxs = append(invalidTxs, txs[tx.is].Transaction)
bp.log.Warn("Transaction has a duplicate nonce", "tx", tx)
continue
}
Expand Down
8 changes: 7 additions & 1 deletion node/block_processor/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kwilteam/kwil-db/core/log"
"github.com/kwilteam/kwil-db/core/types"
"github.com/kwilteam/kwil-db/node/txapp"
nodetypes "github.com/kwilteam/kwil-db/node/types"
"github.com/kwilteam/kwil-db/node/types/sql"
)

Expand Down Expand Up @@ -237,7 +238,12 @@ func TestPrepareMempoolTxns(t *testing.T) {

chainCtx.NetworkParameters.DisabledGasCosts = !tt.gas

got, invalids, err := bp.prepareBlockTransactions(ctx, tt.txs)
ntxs := make([]*nodetypes.Tx, len(tt.txs))
for i, tx := range tt.txs {
ntxs[i] = nodetypes.NewTx(tx)
}

got, invalids, err := bp.prepareBlockTransactions(ctx, ntxs)
require.NoError(t, err)

if len(got) != len(tt.want) {
Expand Down
23 changes: 12 additions & 11 deletions node/consensus/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (ce *ConsensusEngine) validateBlock(blk *ktypes.Block) error {
}

// Verify the merkle root of the block transactions
merkleRoot := blk.MerkleRoot()
merkleRoot := blk.CalcMerkleRoot() // NOTE: this expects CalcMerkleRoot to use tx.HashCache() to prepare the Transaction's internal hash cache
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateBlock is a key method that is one of the first points at which a node will begin processing a block it has received, e.g. from p2p, block store, etc. Ensuring that this method warms the internal tx hash cached in core/types.Transaction benefits many other spots downstream.

if merkleRoot != blk.Header.MerkleRoot {
return fmt.Errorf("merkleroot mismatch, expected %v, got %v", merkleRoot, blk.Header.MerkleRoot)
}
Expand Down Expand Up @@ -117,14 +117,13 @@ func (ce *ConsensusEngine) lastBlock() (int64, types.Hash, time.Time) {
// It is an error if the transaction is already in the mempool.
// It is an error if the transaction fails CheckTx.
// This method holds the mempool lock for the duration of the call.
func (ce *ConsensusEngine) QueueTx(ctx context.Context, tx *ktypes.Transaction) error {
func (ce *ConsensusEngine) QueueTx(ctx context.Context, tx *types.Tx) error {
height, _, timestamp := ce.lastBlock()

ce.mempoolMtx.Lock()
defer ce.mempoolMtx.Unlock()

txHash := tx.Hash()
have, rejected := ce.mempool.Store(txHash, tx)
Comment on lines -126 to -127
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This silly optimization of having mempool.Store take a hash and the transaction separately was silly and now obsolete.

have, rejected := ce.mempool.Store(tx)
if have {
return ktypes.ErrTxAlreadyExists
}
Expand All @@ -135,7 +134,7 @@ func (ce *ConsensusEngine) QueueTx(ctx context.Context, tx *ktypes.Transaction)
const recheck = false
err := ce.blockProcessor.CheckTx(ctx, tx, height, timestamp, recheck)
if err != nil {
ce.mempool.Remove(txHash)
ce.mempool.Remove(tx.Hash())
return err
}

Expand Down Expand Up @@ -181,9 +180,9 @@ func (ce *ConsensusEngine) lastBlockInternal() (int64, time.Time) {

// recheckTxFn creates a tx recheck function for the mempool that assumes the
// provided height and timestamp for each call.
func (ce *ConsensusEngine) recheckTxFn(height int64, timestamp time.Time) func(ctx context.Context, tx *ktypes.Transaction) error {
func (ce *ConsensusEngine) recheckTxFn(height int64, timestamp time.Time) func(ctx context.Context, tx *types.Tx) error {
// height, _, timestamp := ce.lastBlock()
return func(ctx context.Context, tx *ktypes.Transaction) error {
return func(ctx context.Context, tx *types.Tx) error {
const recheck = true
return ce.blockProcessor.CheckTx(ctx, tx, height, timestamp, recheck)
}
Expand All @@ -192,8 +191,10 @@ func (ce *ConsensusEngine) recheckTxFn(height int64, timestamp time.Time) func(c
// BroadcastTx checks the transaction with the mempool and if the verification
// is successful, broadcasts it to the network. The TxResult will be nil unless
// sync is set to 1, in which case the BroadcastTx returns only after it is
// successfully executed in a committed block.
func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (types.Hash, *ktypes.TxResult, error) {
// successfully executed in a committed block. This method is effectively
// [QueueTx] followed, by P2P broadcast of the transaction, followed by
// optionally waiting for the transaction to be mined.
func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *types.Tx, sync uint8) (types.Hash, *ktypes.TxResult, error) {
// check and store the transaction in the mempool
if err := ce.QueueTx(ctx, tx); err != nil {
return types.Hash{}, nil, err
Expand All @@ -205,7 +206,7 @@ func (ce *ConsensusEngine) BroadcastTx(ctx context.Context, tx *ktypes.Transacti
if ce.txAnnouncer != nil {
// We can't use parent context 'cause it's canceled in the caller, which
// could be the RPC request. handler. This shouldn't be CE's problem...
go ce.txAnnouncer(context.Background(), tx)
go ce.txAnnouncer(context.Background(), tx.Transaction)
}

// If sync is set to 1, wait for the transaction to be committed in a block.
Expand Down Expand Up @@ -301,7 +302,7 @@ func (ce *ConsensusEngine) commit(ctx context.Context) error {

// remove transactions from the mempool
for idx, txn := range blkProp.blk.Txns {
txHash := txn.Hash()
txHash := txn.HashCache()
ce.mempool.Remove(txHash)

txRes := ce.state.blockRes.txResults[idx]
Expand Down
8 changes: 4 additions & 4 deletions node/consensus/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type DB interface {
}

type Mempool interface {
PeekN(maxTxns, totalSizeLimit int) []types.NamedTx
PeekN(maxTxns, totalSizeLimit int) []*types.Tx
Remove(txid types.Hash)
RecheckTxs(ctx context.Context, checkFn mempool.CheckFn)
Store(types.Hash, *ktypes.Transaction) (have, rejected bool)
Store(*types.Tx) (have, rejected bool)
TxsAvailable() bool
Size() (totalBytes, numTxns int)
}
Expand All @@ -44,13 +44,13 @@ type BlockProcessor interface {
InitChain(ctx context.Context) (int64, []byte, error)
SetCallbackFns(applyBlockFn blockprocessor.BroadcastTxFn, addPeer, removePeer func(string) error)

PrepareProposal(ctx context.Context, txs []*ktypes.Transaction) (finalTxs []*ktypes.Transaction, invalidTxs []*ktypes.Transaction, err error)
PrepareProposal(ctx context.Context, txs []*types.Tx) (finalTxs []*ktypes.Transaction, invalidTxs []*ktypes.Transaction, err error)
ExecuteBlock(ctx context.Context, req *ktypes.BlockExecRequest) (*ktypes.BlockExecResult, error)
Commit(ctx context.Context, req *ktypes.CommitRequest) error
Rollback(ctx context.Context, height int64, appHash ktypes.Hash) error
Close() error

CheckTx(ctx context.Context, tx *ktypes.Transaction, height int64, blockTime time.Time, recheck bool) error
CheckTx(ctx context.Context, tx *types.Tx, height int64, blockTime time.Time, recheck bool) error

GetValidators() []*ktypes.Validator
ConsensusParams() *ktypes.NetworkParameters
Expand Down
6 changes: 1 addition & 5 deletions node/consensus/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,7 @@ func (ce *ConsensusEngine) createBlockProposal(ctx context.Context) (*blockPropo
defer ce.mempoolMtx.Unlock()

totalTxSizeLimit := ce.ConsensusParams().MaxBlockSize
nTxs := ce.mempool.PeekN(maxNumTxnsInBlock, int(totalTxSizeLimit))
txns := make([]*ktypes.Transaction, len(nTxs))
for i, ntx := range nTxs {
txns[i] = ntx.Tx
}
txns := ce.mempool.PeekN(maxNumTxnsInBlock, int(totalTxSizeLimit))

finalTxs, invalidTxs, err := ce.blockProcessor.PrepareProposal(ctx, txns)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions node/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ type ConsensusEngine interface {

Start(ctx context.Context, fns consensus.BroadcastFns, peerFns consensus.WhitelistFns) error

QueueTx(ctx context.Context, tx *ktypes.Transaction) error
BroadcastTx(ctx context.Context, tx *ktypes.Transaction, sync uint8) (ktypes.Hash, *ktypes.TxResult, error)
QueueTx(ctx context.Context, tx *types.Tx) error
BroadcastTx(ctx context.Context, tx *types.Tx, sync uint8) (ktypes.Hash, *ktypes.TxResult, error)

ConsensusParams() *ktypes.NetworkParameters
CancelBlockExecution(height int64, txIDs []types.Hash) error
Expand Down
Loading
Loading