Skip to content
Draft
83 changes: 46 additions & 37 deletions beacon-chain/sync/data_column_sidecars.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func FetchDataColumnSidecars(
// set as we retrieve them.
incompleteRoots := make(map[[fieldparams.RootLength]byte]bool, blockCount)
slotsWithCommitments := make(map[primitives.Slot]bool, blockCount)
slotByRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, blockCount)
roBlockByRoot := make(map[[fieldparams.RootLength]byte]blocks.ROBlock, blockCount)
storedIndicesByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool, blockCount)

for _, roBlock := range roBlocks {
Expand All @@ -93,7 +93,7 @@ func FetchDataColumnSidecars(
slot := block.Slot()

incompleteRoots[root] = true
slotByRoot[root] = slot
roBlockByRoot[root] = roBlock
slotsWithCommitments[slot] = true

storedIndices := params.Storage.Summary(root).Stored()
Expand All @@ -118,7 +118,7 @@ func FetchDataColumnSidecars(
}

// Request direct sidecars from peers.
directSidecarsByRoot, err := requestDirectSidecarsFromPeers(params, slotByRoot, requestedIndices, slotsWithCommitments, storedIndicesByRoot, incompleteRoots)
directSidecarsByRoot, err := requestDirectSidecarsFromPeers(params, roBlockByRoot, requestedIndices, slotsWithCommitments, storedIndicesByRoot, incompleteRoots)
if err != nil {
return nil, nil, errors.Wrap(err, "request direct sidecars from peers")
}
Expand All @@ -139,7 +139,7 @@ func FetchDataColumnSidecars(
}

// Request all possible indirect sidecars from peers which are neither stored nor in `directSidecarsByRoot`
indirectSidecarsByRoot, err := requestIndirectSidecarsFromPeers(params, slotByRoot, slotsWithCommitments, storedIndicesByRoot, directSidecarsByRoot, requestedIndices, incompleteRoots)
indirectSidecarsByRoot, err := requestIndirectSidecarsFromPeers(params, roBlockByRoot, slotsWithCommitments, storedIndicesByRoot, directSidecarsByRoot, requestedIndices, incompleteRoots)
if err != nil {
return nil, nil, errors.Wrap(err, "request all sidecars from peers")
}
Expand Down Expand Up @@ -229,7 +229,7 @@ func requestSidecarsFromStorage(
// It returns a map from each root to its successfully retrieved sidecars.
func requestDirectSidecarsFromPeers(
params DataColumnSidecarsParams,
slotByRoot map[[fieldparams.RootLength]byte]primitives.Slot,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
requestedIndices map[uint64]bool,
slotsWithCommitments map[primitives.Slot]bool,
storedIndicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
Expand Down Expand Up @@ -266,7 +266,7 @@ func requestDirectSidecarsFromPeers(

initialMissingCount := computeTotalCount(missingIndicesByRoot)

indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
indicesByRootByPeer, err := computeIndicesByRootByPeer(params.P2P, roBlockByRoot, missingIndicesByRoot, connectedPeers)
if err != nil {
return nil, errors.Wrap(err, "explore peers")
}
Expand All @@ -285,10 +285,10 @@ func requestDirectSidecarsFromPeers(
}

// Fetch the sidecars from the chosen peers.
roDataColumnsByPeer := fetchDataColumnSidecarsFromPeers(params, slotByRoot, slotsWithCommitments, indicesByRootByPeerToQuery)
roDataColumnsByPeer := fetchDataColumnSidecarsFromPeers(params, roBlockByRoot, slotsWithCommitments, indicesByRootByPeerToQuery)

// Verify the received data column sidecars.
verifiedRoDataColumnSidecars, err := verifyDataColumnSidecarsByPeer(params.P2P, params.NewVerifier, roDataColumnsByPeer)
verifiedRoDataColumnSidecars, err := verifyDataColumnSidecarsByPeer(params.P2P, params.NewVerifier, roBlockByRoot, roDataColumnsByPeer)
if err != nil {
return nil, errors.Wrap(err, "verify data columns sidecars by peer")
}
Expand All @@ -300,7 +300,7 @@ func requestDirectSidecarsFromPeers(
}

// Compute indices by root by peers with the updated missing indices and connected peers.
indicesByRootByPeer, err = computeIndicesByRootByPeer(params.P2P, slotByRoot, missingIndicesByRoot, connectedPeers)
indicesByRootByPeer, err = computeIndicesByRootByPeer(params.P2P, roBlockByRoot, missingIndicesByRoot, connectedPeers)
if err != nil {
return nil, errors.Wrap(err, "explore peers")
}
Expand All @@ -323,7 +323,7 @@ func requestDirectSidecarsFromPeers(
// - all peers are exhausted.
func requestIndirectSidecarsFromPeers(
p DataColumnSidecarsParams,
slotByRoot map[[fieldparams.RootLength]byte]primitives.Slot,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
slotsWithCommitments map[primitives.Slot]bool,
storedIndicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
alreadyAvailableByRoot map[[fieldparams.RootLength]byte][]blocks.VerifiedRODataColumn,
Expand Down Expand Up @@ -370,7 +370,7 @@ func requestIndirectSidecarsFromPeers(
}

// Compute which peers have which of the missing indices.
indicesByRootByPeer, err := computeIndicesByRootByPeer(p.P2P, slotByRoot, indicesToRetrieveByRoot, connectedPeers)
indicesByRootByPeer, err := computeIndicesByRootByPeer(p.P2P, roBlockByRoot, indicesToRetrieveByRoot, connectedPeers)
if err != nil {
return nil, errors.Wrap(err, "explore peers")
}
Expand All @@ -395,10 +395,10 @@ func requestIndirectSidecarsFromPeers(
}

// Fetch the sidecars from the chosen peers.
roDataColumnsByPeer := fetchDataColumnSidecarsFromPeers(p, slotByRoot, slotsWithCommitments, indicesByRootByPeerToQuery)
roDataColumnsByPeer := fetchDataColumnSidecarsFromPeers(p, roBlockByRoot, slotsWithCommitments, indicesByRootByPeerToQuery)

// Verify the received data column sidecars.
verifiedRoDataColumnSidecars, err := verifyDataColumnSidecarsByPeer(p.P2P, p.NewVerifier, roDataColumnsByPeer)
verifiedRoDataColumnSidecars, err := verifyDataColumnSidecarsByPeer(p.P2P, p.NewVerifier, roBlockByRoot, roDataColumnsByPeer)
if err != nil {
return nil, errors.Wrap(err, "verify data columns sidecars by peer")
}
Expand Down Expand Up @@ -436,7 +436,7 @@ func requestIndirectSidecarsFromPeers(
}

// Compute indices by root by peers with the updated missing indices and connected peers.
indicesByRootByPeer, err = computeIndicesByRootByPeer(p.P2P, slotByRoot, indicesToRetrieveByRoot, connectedPeers)
indicesByRootByPeer, err = computeIndicesByRootByPeer(p.P2P, roBlockByRoot, indicesToRetrieveByRoot, connectedPeers)
if err != nil {
return nil, errors.Wrap(err, "explore peers")
}
Expand Down Expand Up @@ -709,7 +709,7 @@ func updateResults(
// fetchDataColumnSidecarsFromPeers retrieves data column sidecars from peers.
func fetchDataColumnSidecarsFromPeers(
params DataColumnSidecarsParams,
slotByRoot map[[fieldparams.RootLength]byte]primitives.Slot,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
slotsWithCommitments map[primitives.Slot]bool,
indicesByRootByPeer map[goPeer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool,
) map[goPeer.ID][]blocks.RODataColumn {
Expand All @@ -736,7 +736,7 @@ func fetchDataColumnSidecarsFromPeers(
"totalRequestedCount": requestedCount,
})

roDataColumns, err := sendDataColumnSidecarsRequest(params, slotByRoot, slotsWithCommitments, peerID, indicesByRoot)
roDataColumns, err := sendDataColumnSidecarsRequest(params, roBlockByRoot, slotsWithCommitments, peerID, indicesByRoot)
if err != nil {
log.WithError(err).Debug("Failed to send data column sidecars request")
return
Expand All @@ -755,7 +755,7 @@ func fetchDataColumnSidecarsFromPeers(

func sendDataColumnSidecarsRequest(
params DataColumnSidecarsParams,
slotByRoot map[[fieldparams.RootLength]byte]primitives.Slot,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
slotsWithCommitments map[primitives.Slot]bool,
peerID goPeer.ID,
indicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
Expand All @@ -775,7 +775,7 @@ func sendDataColumnSidecarsRequest(
})

// Try to build a by range byRangeRequest first.
byRangeRequests, err := buildByRangeRequests(slotByRoot, slotsWithCommitments, indicesByRoot, batchSize)
byRangeRequests, err := buildByRangeRequests(roBlockByRoot, slotsWithCommitments, indicesByRoot, batchSize)
if err != nil {
return nil, errors.Wrap(err, "craft by range request")
}
Expand Down Expand Up @@ -854,7 +854,7 @@ func sendDataColumnSidecarsRequest(
// (Missing blocks or blocks without commitments do count as contiguous)
// If one of this condition is not met, returns nil.
func buildByRangeRequests(
slotByRoot map[[fieldparams.RootLength]byte]primitives.Slot,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
slotsWithCommitments map[primitives.Slot]bool,
indicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
batchSize uint64,
Expand All @@ -864,7 +864,7 @@ func buildByRangeRequests(
}

var reference map[uint64]bool
slots := make([]primitives.Slot, 0, len(slotByRoot))
slots := make([]primitives.Slot, 0, len(roBlockByRoot))
for root, indices := range indicesByRoot {
if reference == nil {
reference = indices
Expand All @@ -874,12 +874,12 @@ func buildByRangeRequests(
return nil, nil
}

slot, ok := slotByRoot[root]
roBlock, ok := roBlockByRoot[root]
if !ok {
return nil, errors.Errorf("slot not found for block root %#x", root)
}

slots = append(slots, slot)
slots = append(slots, roBlock.Block().Slot())
}

slices.Sort(slots)
Expand Down Expand Up @@ -944,6 +944,7 @@ func buildByRootRequest(indicesByRoot map[[fieldparams.RootLength]byte]map[uint6
func verifyDataColumnSidecarsByPeer(
p2p prysmP2P.P2P,
newVerifier verification.NewDataColumnsVerifier,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
roDataColumnsByPeer map[goPeer.ID][]blocks.RODataColumn,
) ([]blocks.VerifiedRODataColumn, error) {
// First optimistically verify all received data columns in a single batch.
Expand All @@ -957,7 +958,7 @@ func verifyDataColumnSidecarsByPeer(
roDataColumnSidecars = append(roDataColumnSidecars, columns...)
}

verifiedRoDataColumnSidecars, err := verifyByRootDataColumnSidecars(newVerifier, roDataColumnSidecars)
verifiedRoDataColumnSidecars, err := verifyRPCDataColumnSidecars(newVerifier, roBlockByRoot, roDataColumnSidecars)
if err == nil {
// This is the happy path where all sidecars are verified.
return verifiedRoDataColumnSidecars, nil
Expand All @@ -967,7 +968,7 @@ func verifyDataColumnSidecarsByPeer(
// Reverify peer by peer to identify faulty peer(s), reject all its sidecars, and downscore it.
verifiedRoDataColumnSidecars = make([]blocks.VerifiedRODataColumn, 0, count)
for peer, columns := range roDataColumnsByPeer {
peerVerifiedRoDataColumnSidecars, err := verifyByRootDataColumnSidecars(newVerifier, columns)
peerVerifiedRoDataColumnSidecars, err := verifyRPCDataColumnSidecars(newVerifier, roBlockByRoot, columns)
if err != nil {
// This peer has invalid sidecars.
log := log.WithError(err).WithField("peerID", peer)
Expand All @@ -982,15 +983,23 @@ func verifyDataColumnSidecarsByPeer(
return verifiedRoDataColumnSidecars, nil
}

// verifyByRootDataColumnSidecars verifies the provided read-only data columns against the
// verifyRPCDataColumnSidecars verifies the provided read-only data columns against the
// requirements for data column sidecars received via the by root request.
func verifyByRootDataColumnSidecars(newVerifier verification.NewDataColumnsVerifier, roDataColumns []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error) {
verifier := newVerifier(roDataColumns, verification.ByRootRequestDataColumnSidecarRequirements)
func verifyRPCDataColumnSidecars(
newVerifier verification.NewDataColumnsVerifier,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
roDataColumns []blocks.RODataColumn,
) ([]blocks.VerifiedRODataColumn, error) {
verifier := newVerifier(roDataColumns, verification.RPCDataColumnSidecarRequirements)

if err := verifier.ValidFields(); err != nil {
return nil, errors.Wrap(err, "valid fields")
}

if err := verifier.SidecarRootAndSignatureAligned(roBlockByRoot); err != nil {
return nil, errors.Wrap(err, "sidecar root and signature aligned")
}

if err := verifier.SidecarInclusionProven(); err != nil {
return nil, errors.Wrap(err, "sidecar inclusion proven")
}
Expand All @@ -1008,12 +1017,12 @@ func verifyByRootDataColumnSidecars(newVerifier verification.NewDataColumnsVerif
}

// computeIndicesByRootByPeer returns a peers->root->indices map only for
// root and indices given in `indicesByBlockRoot`. It also only selects peers
// root and indices given in `indicesByRoot`. It also only selects peers
// for a given root only if its head state is higher than the block slot.
func computeIndicesByRootByPeer(
p2p prysmP2P.P2P,
slotByBlockRoot map[[fieldparams.RootLength]byte]primitives.Slot,
indicesByBlockRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
roBlockByRoot map[[fieldparams.RootLength]byte]blocks.ROBlock,
indicesByRoot map[[fieldparams.RootLength]byte]map[uint64]bool,
peers map[goPeer.ID]bool,
) (map[goPeer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool, error) {
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
Expand Down Expand Up @@ -1065,10 +1074,10 @@ func computeIndicesByRootByPeer(

// For each block root and its indices, find suitable peers
indicesByRootByPeer := make(map[goPeer.ID]map[[fieldparams.RootLength]byte]map[uint64]bool)
for blockRoot, indices := range indicesByBlockRoot {
blockSlot, ok := slotByBlockRoot[blockRoot]
for root, indices := range indicesByRoot {
roBlock, ok := roBlockByRoot[root]
if !ok {
return nil, errors.Errorf("slot not found for block root %#x", blockRoot)
return nil, errors.Errorf("slot not found for block root %#x", root)
}

for index := range indices {
Expand All @@ -1079,18 +1088,18 @@ func computeIndicesByRootByPeer(
return nil, errors.Errorf("head slot not found for peer %s", peer)
}

if peerHeadSlot < blockSlot {
if peerHeadSlot < roBlock.Block().Slot() {
continue
}

// Build peers->root->indices map
if _, exists := indicesByRootByPeer[peer]; !exists {
indicesByRootByPeer[peer] = make(map[[fieldparams.RootLength]byte]map[uint64]bool)
}
if _, exists := indicesByRootByPeer[peer][blockRoot]; !exists {
indicesByRootByPeer[peer][blockRoot] = make(map[uint64]bool)
if _, exists := indicesByRootByPeer[peer][root]; !exists {
indicesByRootByPeer[peer][root] = make(map[uint64]bool)
}
indicesByRootByPeer[peer][blockRoot][index] = true
indicesByRootByPeer[peer][root][index] = true
}
}
}
Expand Down
Loading
Loading