Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Malleability] Header #7100

Open
wants to merge 11 commits into
base: feature/malleability
Choose a base branch
from
1 change: 0 additions & 1 deletion cmd/bootstrap/run/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ func GenerateRootHeader(chainID flow.ChainID, parentID flow.Identifier, height u
ParentVoterIndices: nil,
ParentVoterSigData: nil,
ProposerID: flow.ZeroID,
ProposerSigData: nil,
}
}
7 changes: 6 additions & 1 deletion cmd/consensus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,10 +793,15 @@ func main() {
if !startupTime.IsZero() {
opts = append(opts, consensus.WithStartupTime(startupTime))
}
finalizedBlock, pending, err := recovery.FindLatest(node.State, node.Storage.Headers)
finalizedBlock, pendingOrig, err := recovery.FindLatest(node.State, node.Storage.Headers)
if err != nil {
return nil, err
}
// TODO(malleability, #7100) proposer signature storage
pending := make([]*flow.Proposal, 0, len(pendingOrig))
for _, p := range pendingOrig {
pending = append(pending, &flow.Proposal{Header: p, ProposerSigData: nil})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this break recovery? Vote aggregator shouldn't accept an invalid vote.

Copy link
Contributor Author

@tim-barry tim-barry Mar 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I believe it does. This is one of the main reasons that we need storage for proposer signatures, if we don't want to change how recovery works.

}

// initialize hotstuff consensus algorithm
hot, err = consensus.NewParticipant(
Expand Down
9 changes: 8 additions & 1 deletion consensus/follower.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ func NewFollower(log zerolog.Logger,
return nil, fmt.Errorf("could not initialize forks: %w", err)
}

// TODO(malleability, #7100) - proposerSigData storage
// Followers don't need proposer signature (and all headers in storage must have been already verified)
pendingProposals := make([]*flow.Proposal, 0, len(pending))
for _, p := range pending {
pendingProposals = append(pendingProposals, &flow.Proposal{Header: p, ProposerSigData: nil})
}

// recover forks internal state (inserts all pending blocks)
err = recovery.Recover(log, pending, recovery.ForksState(forks))
err = recovery.Recover(log, pendingProposals, recovery.ForksState(forks))
if err != nil {
return nil, fmt.Errorf("could not recover hotstuff follower state: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ type BlockProducer interface {
// - model.NoVoteError if it is not safe for us to vote (our proposal includes our vote)
// for this view. This can happen if we have already proposed or timed out this view.
// - generic error in case of unexpected failure
MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error)
MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Proposal, error)
}
6 changes: 3 additions & 3 deletions consensus/hotstuff/blockproducer/block_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func New(safetyRules hotstuff.SafetyRules, committee hotstuff.Replicas, builder
// - model.NoVoteError if it is not safe for us to vote (our proposal includes our vote)
// for this view. This can happen if we have already proposed or timed out this view.
// - generic error in case of unexpected failure
func (bp *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
func (bp *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Proposal, error) {
// the custom functions allows us to set some custom fields on the block;
// in hotstuff, we use this for view number and signature-related fields
setHotstuffFields := func(header *flow.Header) error {
Expand All @@ -62,7 +62,7 @@ func (bp *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertifica
}

signer := newSafetyRulesConcurrencyWrapper(bp.safetyRules)
header, err := bp.builder.BuildOn(
proposal, err := bp.builder.BuildOn(
qc.BlockID,
setHotstuffFields, // never returns an error
signer.Sign, // may return model.NoVoteError, which we handle below
Expand All @@ -77,5 +77,5 @@ func (bp *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertifica
return nil, fmt.Errorf("signer has not yet completed signing")
}

return header, nil
return proposal, nil
}
2 changes: 1 addition & 1 deletion consensus/hotstuff/blockproducer/metrics_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewMetricsWrapper(builder module.Builder, metrics module.HotstuffMetrics) *
}
}

func (w BlockBuilderMetricsWrapper) BuildOn(parentID flow.Identifier, setter func(*flow.Header) error, sign func(*flow.Header) error) (*flow.Header, error) {
func (w BlockBuilderMetricsWrapper) BuildOn(parentID flow.Identifier, setter func(*flow.Header) error, sign func(*flow.Header) ([]byte, error)) (*flow.Proposal, error) {
processStart := time.Now()
header, err := w.builder.BuildOn(parentID, setter, sign)
w.metrics.PayloadProductionDuration(time.Since(processStart))
Expand Down
10 changes: 4 additions & 6 deletions consensus/hotstuff/blockproducer/safety_rules_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,21 @@ func newSafetyRulesConcurrencyWrapper(safetyRules hotstuff.SafetyRules) *safetyR
// - model.NoVoteError if it is not safe for us to vote (our proposal includes our vote)
// for this view. This can happen if we have already proposed or timed out this view.
// - generic error in case of unexpected failure
func (w *safetyRulesConcurrencyWrapper) Sign(unsignedHeader *flow.Header) error {
func (w *safetyRulesConcurrencyWrapper) Sign(unsignedHeader *flow.Header) ([]byte, error) {
if !w.signingStatus.CompareAndSwap(0, 1) { // value of `signingStatus` is something else than 0
return fmt.Errorf("signer has already commenced signing; possibly repeated signer call")
return nil, fmt.Errorf("signer has already commenced signing; possibly repeated signer call")
} // signer is now in state 1, and this thread is the only one every going to execute the following logic

// signature for own block is structurally a vote
vote, err := w.safetyRules.SignOwnProposal(model.ProposalFromFlow(unsignedHeader))
if err != nil {
return fmt.Errorf("could not sign block proposal: %w", err)
return nil, fmt.Errorf("could not sign block proposal: %w", err)
}
unsignedHeader.ProposerSigData = vote.SigData

// value of `signingStatus` is always 1, i.e. the following check always succeeds.
if !w.signingStatus.CompareAndSwap(1, 2) { // sanity check protects logic from future modifications accidentally breaking this invariant
panic("signer wrapper completed its work but encountered state other than 1") // never happens
}
return nil
return vote.SigData, nil
}

// IsSigningComplete atomically checks whether the Sign logic has concluded, and returns true only in this case.
Expand Down
2 changes: 1 addition & 1 deletion consensus/hotstuff/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ type CommunicatorConsumer interface {
// Prerequisites:
// Implementation must be concurrency safe; Non-blocking;
// and must handle repetition of the same events (with some processing overhead).
OnOwnProposal(proposal *flow.Header, targetPublicationTime time.Time)
OnOwnProposal(proposal *flow.Proposal, targetPublicationTime time.Time)
}

// FollowerConsumer consumes outbound notifications produced by consensus followers.
Expand Down
8 changes: 4 additions & 4 deletions consensus/hotstuff/eventhandler/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,14 +423,14 @@ func (e *EventHandler) proposeForNewViewIfPrimary() error {
}
return fmt.Errorf("can not make block proposal for curView %v: %w", curView, err)
}
targetPublicationTime := e.paceMaker.TargetPublicationTime(flowProposal.View, start, flowProposal.ParentID) // determine target publication time
targetPublicationTime := e.paceMaker.TargetPublicationTime(flowProposal.Header.View, start, flowProposal.Header.ParentID) // determine target publication time
log.Debug().
Uint64("block_view", flowProposal.View).
Uint64("block_view", flowProposal.Header.View).
Time("target_publication", targetPublicationTime).
Hex("block_id", logging.ID(flowProposal.ID())).
Hex("block_id", logging.ID(flowProposal.Header.ID())).
Uint64("parent_view", newestQC.View).
Hex("parent_id", newestQC.BlockID[:]).
Hex("signer", flowProposal.ProposerID[:]).
Hex("signer", flowProposal.Header.ProposerID[:]).
Msg("forwarding proposal to communicator for broadcasting")

// emit notification with own proposal (also triggers broadcast)
Expand Down
32 changes: 16 additions & 16 deletions consensus/hotstuff/eventhandler/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func NewBlockProducer(proposerID flow.Identifier) *BlockProducer {
}
}

func (b *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Header, error) {
func (b *BlockProducer) MakeBlockProposal(view uint64, qc *flow.QuorumCertificate, lastViewTC *flow.TimeoutCertificate) (*flow.Proposal, error) {
if b.producedBlockForView[view] {
return nil, model.NewNoVoteErrorf("block already produced")
}
Expand Down Expand Up @@ -474,10 +474,10 @@ func (es *EventHandlerSuite) TestOnReceiveProposal_ProposeAfterReceivingQC() {
es.committee.leaders[es.paceMaker.CurView()] = struct{}{}

es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(es.T(), ok)
// it should broadcast a header as the same as current view
require.Equal(es.T(), es.paceMaker.CurView(), header.View)
require.Equal(es.T(), es.paceMaker.CurView(), proposal.Header.View)
}).Once()

// processing this proposal shouldn't trigger view change since we have already seen QC.
Expand Down Expand Up @@ -508,10 +508,10 @@ func (es *EventHandlerSuite) TestOnReceiveProposal_ProposeAfterReceivingTC() {
es.committee.leaders[es.paceMaker.CurView()] = struct{}{}

es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(es.T(), ok)
// it should broadcast a header as the same as current view
require.Equal(es.T(), es.paceMaker.CurView(), header.View)
require.Equal(es.T(), es.paceMaker.CurView(), proposal.Header.View)
}).Once()

// processing this proposal shouldn't trigger view change, since we have already seen QC.
Expand Down Expand Up @@ -612,10 +612,10 @@ func (es *EventHandlerSuite) TestOnReceiveQc_NextLeaderProposes() {
require.NoError(es.T(), err)

es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(es.T(), ok)
// it should broadcast a header as the same as endView
require.Equal(es.T(), es.endView, header.View)
require.Equal(es.T(), es.endView, proposal.Header.View)
}).Once()

// after receiving proposal build QC and deliver it to event handler
Expand Down Expand Up @@ -674,16 +674,16 @@ func (es *EventHandlerSuite) TestOnReceiveTc_NextLeaderProposes() {
es.endView++

es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(es.T(), ok)
// it should broadcast a header as the same as endView
require.Equal(es.T(), es.endView, header.View)
require.Equal(es.T(), es.endView, proposal.Header.View)

// proposed block should contain valid newest QC and lastViewTC
expectedNewestQC := es.paceMaker.NewestQC()
proposal := model.SignedProposalFromFlow(header)
require.Equal(es.T(), expectedNewestQC, proposal.Block.QC)
require.Equal(es.T(), es.paceMaker.LastViewTC(), proposal.LastViewTC)
hotstuffProposal := model.SignedProposalFromFlow(proposal)
require.Equal(es.T(), expectedNewestQC, hotstuffProposal.Block.QC)
require.Equal(es.T(), es.paceMaker.LastViewTC(), hotstuffProposal.LastViewTC)
}).Once()

err := es.eventhandler.OnReceiveTc(es.tc)
Expand Down Expand Up @@ -805,9 +805,9 @@ func (es *EventHandlerSuite) TestLeaderBuild100Blocks() {
es.endView++

es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
ownProposal, ok := args[0].(*flow.Proposal)
require.True(es.T(), ok)
require.Equal(es.T(), proposal.Block.View+1, header.View)
require.Equal(es.T(), proposal.Block.View+1, ownProposal.Header.View)
}).Once()
es.notifier.On("OnOwnVote", proposal.Block.BlockID, proposal.Block.View, mock.Anything, mock.Anything).Once()

Expand Down Expand Up @@ -897,10 +897,10 @@ func (es *EventHandlerSuite) TestCreateProposal_SanityChecks() {
es.committee.leaders[tc.View+1] = struct{}{}

es.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(es.T(), ok)
// we need to make sure that produced proposal contains only QC even if there is TC for previous view as well
require.Nil(es.T(), header.LastViewTC)
require.Nil(es.T(), proposal.Header.LastViewTC)
}).Once()

err := es.eventhandler.OnReceiveTc(tc)
Expand Down
5 changes: 3 additions & 2 deletions consensus/hotstuff/eventloop/event_loop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ func TestReadyDoneWithStartTime(t *testing.T) {
unittest.RequireCloseBefore(t, eventLoop.Ready(), 100*time.Millisecond, "event loop not started")

parentBlock := unittest.BlockHeaderFixture()
block := unittest.BlockHeaderWithParentFixture(parentBlock)
eventLoop.SubmitProposal(model.SignedProposalFromFlow(block))
header := unittest.BlockHeaderWithParentFixture(parentBlock)
proposal := unittest.ProposalFromHeader(header)
eventLoop.SubmitProposal(model.SignedProposalFromFlow(proposal))

unittest.RequireCloseBefore(t, done, startTimeDuration+100*time.Millisecond, "proposal wasn't received")
cancel()
Expand Down
13 changes: 7 additions & 6 deletions consensus/hotstuff/helper/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,14 @@ func WithLastViewTC(lastViewTC *flow.TimeoutCertificate) func(*model.Proposal) {
}
}

// SignedProposalToFlow turns a block proposal into a flow header.
// SignedProposalToFlow turns a HotStuff block proposal into a flow block proposal.
//
// CAUTION: This function is only suitable for TESTING purposes ONLY.
// In the conversion from `flow.Header` to HoStuff's `model.Block` we loose information
// In the conversion from `flow.Header` to HotStuff's `model.Block` we lose information
// (e.g. `ChainID` and `Height` are not included in `model.Block`) and hence the conversion
// is *not reversible*. This is on purpose, because we wanted to only expose data to
// HotStuff that HotStuff really needs.
func SignedProposalToFlow(proposal *model.SignedProposal) *flow.Header {

func SignedProposalToFlow(proposal *model.SignedProposal) *flow.Proposal {
block := proposal.Block
header := &flow.Header{
ParentID: block.QC.BlockID,
Expand All @@ -121,9 +120,11 @@ func SignedProposalToFlow(proposal *model.SignedProposal) *flow.Header {
ParentVoterIndices: block.QC.SignerIndices,
ParentVoterSigData: block.QC.SigData,
ProposerID: block.ProposerID,
ProposerSigData: proposal.SigData,
LastViewTC: proposal.LastViewTC,
}

return header
return &flow.Proposal{
Header: header,
ProposerSigData: proposal.SigData,
}
}
16 changes: 8 additions & 8 deletions consensus/hotstuff/integration/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,25 @@ func Connect(t *testing.T, instances []*Instance) {
*sender.notifier = *NewMockedCommunicatorConsumer()
sender.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(t, ok)

// sender should always have the parent
sender.updatingBlocks.RLock()
_, exists := sender.headers[header.ParentID]
_, exists := sender.headers[proposal.Header.ParentID]
sender.updatingBlocks.RUnlock()
if !exists {
t.Fatalf("parent for proposal not found (sender: %x, parent: %x)", sender.localID, header.ParentID)
t.Fatalf("parent for proposal not found (sender: %x, parent: %x)", sender.localID, proposal.Header.ParentID)
}

// convert into proposal immediately
proposal := model.SignedProposalFromFlow(header)
hotstuffProposal := model.SignedProposalFromFlow(proposal)

// store locally and loop back to engine for processing
sender.ProcessBlock(proposal)
sender.ProcessBlock(hotstuffProposal)

// check if we should block the outgoing proposal
if sender.blockPropOut(proposal) {
if sender.blockPropOut(hotstuffProposal) {
return
}

Expand All @@ -56,11 +56,11 @@ func Connect(t *testing.T, instances []*Instance) {
}

// check if we should block the incoming proposal
if receiver.blockPropIn(proposal) {
if receiver.blockPropIn(hotstuffProposal) {
continue
}

receiver.ProcessBlock(proposal)
receiver.ProcessBlock(hotstuffProposal)
}
},
)
Expand Down
25 changes: 15 additions & 10 deletions consensus/hotstuff/integration/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func NewInstance(t *testing.T, options ...Option) *Instance {

// program the builder module behaviour
in.builder.On("BuildOn", mock.Anything, mock.Anything, mock.Anything).Return(
func(parentID flow.Identifier, setter func(*flow.Header) error, sign func(*flow.Header) error) *flow.Header {
func(parentID flow.Identifier, setter func(*flow.Header) error, sign func(*flow.Header) ([]byte, error)) *flow.Proposal {
in.updatingBlocks.Lock()
defer in.updatingBlocks.Unlock()

Expand All @@ -207,11 +207,16 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
Timestamp: time.Now().UTC(),
}
require.NoError(t, setter(header))
require.NoError(t, sign(header))
sig, err := sign(header)
require.NoError(t, err)
proposal := &flow.Proposal{
Header: header,
ProposerSigData: sig,
}
in.headers[header.ID()] = header
return header
return proposal
},
func(parentID flow.Identifier, _ func(*flow.Header) error, _ func(*flow.Header) error) error {
func(parentID flow.Identifier, _ func(*flow.Header) error, _ func(*flow.Header) ([]byte, error)) error {
in.updatingBlocks.RLock()
_, ok := in.headers[parentID]
in.updatingBlocks.RUnlock()
Expand Down Expand Up @@ -281,23 +286,23 @@ func NewInstance(t *testing.T, options ...Option) *Instance {
// program the hotstuff communicator behaviour
in.notifier.On("OnOwnProposal", mock.Anything, mock.Anything).Run(
func(args mock.Arguments) {
header, ok := args[0].(*flow.Header)
proposal, ok := args[0].(*flow.Proposal)
require.True(t, ok)

// sender should always have the parent
in.updatingBlocks.RLock()
_, exists := in.headers[header.ParentID]
_, exists := in.headers[proposal.Header.ParentID]
in.updatingBlocks.RUnlock()

if !exists {
t.Fatalf("parent for proposal not found parent: %x", header.ParentID)
t.Fatalf("parent for proposal not found parent: %x", proposal.Header.ParentID)
}

// convert into proposal immediately
proposal := model.SignedProposalFromFlow(header)
hotstuffProposal := model.SignedProposalFromFlow(proposal)

// store locally and loop back to engine for processing
in.ProcessBlock(proposal)
in.ProcessBlock(hotstuffProposal)
},
)
in.notifier.On("OnOwnTimeout", mock.Anything).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -637,7 +642,7 @@ func (in *Instance) ProcessBlock(proposal *model.SignedProposal) {
if parentExists {
next := proposal
for next != nil {
in.headers[next.Block.BlockID] = helper.SignedProposalToFlow(next)
in.headers[next.Block.BlockID] = helper.SignedProposalToFlow(next).Header

in.queue <- next
// keep processing the pending blocks
Expand Down
Loading
Loading