Skip to content

Commit

Permalink
Handle incoming chunks on the P2P layer
Browse files Browse the repository at this point in the history
- Add hooks for the new topic "beacon_block_chunk"
- Add protos for the new messages
- Handle the sync package logic of signature validation
- Create native types for the chunks to be handled in the sync package

Still TODO:
- Handle chunks for pending blocks.
- Handle nodes with incoming chunks, avoid verifying signature twice
- Decode the block and send it to the blockchain package
  • Loading branch information
potuz committed Jan 20, 2025
1 parent e634aea commit 665b04f
Show file tree
Hide file tree
Showing 18 changed files with 1,124 additions and 9 deletions.
17 changes: 17 additions & 0 deletions beacon-chain/core/chunks/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = ["signature.go"],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/chunks",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//network/forks:go_default_library",
"//time/slots:go_default_library",
],
)
53 changes: 53 additions & 0 deletions beacon-chain/core/chunks/signature.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package chunks

import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)

// VerifyChunkSignature verifies the proposer signature of a beacon block chunk.
func VerifyChunkSignature(beaconState state.ReadOnlyBeaconState,
proposerIndex primitives.ValidatorIndex,
sig []byte,
rootFunc func() ([32]byte, error)) error {
currentEpoch := slots.ToEpoch(beaconState.Slot())
domain, err := signing.Domain(beaconState.Fork(), currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := beaconState.ValidatorAtIndex(proposerIndex)
if err != nil {
return err
}
proposerPubKey := proposer.PublicKey
return signing.VerifyBlockSigningRoot(proposerPubKey, sig, domain, rootFunc)
}

// VerifyChunkSignatureUsingCurrentFork verifies the proposer signature of a beacon block chunk. This differs
// from the above method by not using fork data from the state and instead retrieving it
// via the respective epoch.
func VerifyChunkSignatureUsingCurrentFork(beaconState state.ReadOnlyBeaconState, chunk interfaces.ReadOnlyBeaconBlockChunk) error {
currentEpoch := slots.ToEpoch(chunk.Slot())
fork, err := forks.Fork(currentEpoch)
if err != nil {
return err
}
domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBeaconProposer, beaconState.GenesisValidatorsRoot())
if err != nil {
return err
}
proposer, err := beaconState.ValidatorAtIndex(chunk.ProposerIndex())
if err != nil {
return err
}
proposerPubKey := proposer.PublicKey
sig := chunk.Signature()
return signing.VerifyBlockSigningRoot(proposerPubKey, sig[:], domain, func() ([32]byte, error) {
return chunk.HeaderRoot(), nil
})
}
4 changes: 4 additions & 0 deletions beacon-chain/p2p/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
GossipBlsToExecutionChangeMessage = "bls_to_execution_change"
// GossipBlobSidecarMessage is the name for the blob sidecar message type.
GossipBlobSidecarMessage = "blob_sidecar"
// GossipBlockChunkMessage is the name for the block chunk message type.
GossipBlockChunkMessage = "beacon_block_chunk"
// Topic Formats
//
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
Expand All @@ -52,4 +54,6 @@ const (
BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage
// BlobSubnetTopicFormat is the topic format for the blob subnet.
BlobSubnetTopicFormat = GossipProtocolAndDigest + GossipBlobSidecarMessage + "_%d"
// RLNCTopicFormat is the topic format for the RLNC subnet.
RLNCTopicFormat = GossipProtocolAndDigest + GossipBlockChunkMessage
)
3 changes: 3 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_library(
"subscriber_beacon_aggregate_proof.go",
"subscriber_beacon_attestation.go",
"subscriber_beacon_blocks.go",
"subscriber_beacon_blocks_chunks.go",
"subscriber_blob_sidecar.go",
"subscriber_bls_to_execution_change.go",
"subscriber_handlers.go",
Expand Down Expand Up @@ -66,6 +67,7 @@ go_library(
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/chunks:go_default_library",
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/block:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library",
Expand Down Expand Up @@ -100,6 +102,7 @@ go_library(
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/chunks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",
Expand Down
21 changes: 15 additions & 6 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,21 @@ func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64

// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.subscribe(
p2p.BlockSubnetTopicFormat,
s.validateBeaconBlockPubSub,
s.beaconBlockSubscriber,
digest,
)
if features.Get().UseRLNC {
s.subscribe(
p2p.RLNCTopicFormat,
s.validateBeaconBlockChunkPubSub,
s.beaconBlockChunkSubscriber,
digest,
)
} else {
s.subscribe(
p2p.BlockSubnetTopicFormat,
s.validateBeaconBlockPubSub,
s.beaconBlockSubscriber,
digest,
)
}
s.subscribe(
p2p.AggregateAndProofSubnetTopicFormat,
s.validateAggregateAndProof,
Expand Down
124 changes: 124 additions & 0 deletions beacon-chain/sync/subscriber_beacon_blocks_chunks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package sync

import (
"context"
"fmt"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
core_chunks "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/chunks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/chunks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
)

func (s *Service) beaconBlockChunkSubscriber(ctx context.Context, msg proto.Message) error {
chunk, err := chunks.NewBlockChunk(msg)
if err != nil {
return err
}
if chunk.IsNil() {
return chunks.ErrNilObject
}
// TODO: verify if we have the full block, decode and send it to the blockchain package.
return nil
}

func (s *Service) validateBeaconBlockChunkPubSub(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
if pid == s.cfg.p2p.PeerID() {
return pubsub.ValidationAccept, nil
}
if s.cfg.initialSync.Syncing() {
return pubsub.ValidationIgnore, nil
}
ctx, span := trace.StartSpan(ctx, "sync.validateBeaconBlockChunkPubSub")
defer span.End()

m, err := s.decodePubsubMessage(msg)
if err != nil {
tracing.AnnotateError(span, err)
return pubsub.ValidationReject, errors.Wrap(err, "Could not decode message")
}

// It's fine to use the same lock for both block and chunk validation
s.validateBlockLock.Lock()
defer s.validateBlockLock.Unlock()

chunk, ok := m.(interfaces.ReadOnlyBeaconBlockChunk)
if !ok {
return pubsub.ValidationReject, errors.New("msg is not ReadOnlyBeaconBlockChunk")
}

if chunk.IsNil() {
return pubsub.ValidationReject, errors.New("chunk is nil")
}

// Check if parent is a bad block and then reject the chunk.
if s.hasBadBlock(chunk.ParentRoot()) {
err := fmt.Errorf("received chunk that has an invalid parent %#x", chunk.ParentRoot())
log.WithError(err).Debug("Received block with an invalid parent")
return pubsub.ValidationReject, err
}

// Be lenient in handling early blocks. Instead of discarding blocks arriving later than
// MAXIMUM_GOSSIP_CLOCK_DISPARITY in future, we tolerate blocks arriving at max two slots
// earlier (SECONDS_PER_SLOT * 2 seconds). Queue such blocks and process them at the right slot.
genesisTime := uint64(s.cfg.clock.GenesisTime().Unix())
if err := slots.VerifyTime(genesisTime, chunk.Slot(), earlyBlockProcessingTolerance); err != nil {
log.WithError(err).Debug("Ignored chunk: could not verify slot time")
return pubsub.ValidationIgnore, nil
}

cp := s.cfg.chain.FinalizedCheckpt()
startSlot, err := slots.EpochStart(cp.Epoch)
if err != nil {
log.WithError(err).Debug("Ignored block: could not calculate epoch start slot")
return pubsub.ValidationIgnore, nil
}
if startSlot >= chunk.Slot() {
err := fmt.Errorf("finalized slot %d greater or equal to block slot %d", startSlot, chunk.Slot())
log.Debug(err)
return pubsub.ValidationIgnore, err
}

if !s.cfg.chain.HasBlock(ctx, chunk.ParentRoot()) {
// TODO: implement pending chunk storage
return pubsub.ValidationIgnore, err
}

err = s.validateBeaconBlockChunk(ctx, chunk)
if err != nil {
log.WithError(err).Debug("Could not validate beacon block chunk")
return pubsub.ValidationReject, err
}

logFields := logrus.Fields{
"chunkSlot": chunk.Slot(),
"proposerIndex": chunk.ProposerIndex(),
"parentRoot": chunk.ParentRoot(),
}
log.WithFields(logFields).Debug("Received block chunk")
return pubsub.ValidationAccept, nil
}

func (s *Service) validateBeaconBlockChunk(ctx context.Context, chunk interfaces.ReadOnlyBeaconBlockChunk) error {
if !s.cfg.chain.InForkchoice(chunk.ParentRoot()) {
return blockchain.ErrNotDescendantOfFinalized
}

parentState, err := s.cfg.stateGen.StateByRoot(ctx, chunk.ParentRoot())
if err != nil {
return err
}

if err := core_chunks.VerifyChunkSignatureUsingCurrentFork(parentState, chunk); err != nil {
return err
}
return nil
}
3 changes: 3 additions & 0 deletions changelog/potuz_rlnc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
### Added

- Use random linear network coding for block propagation
4 changes: 2 additions & 2 deletions config/features/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ var (
Usage: "Enables an experimental attestation pool design.",
}
useRLNC = &cli.BoolFlag{
Name: "use-rlnc",
Usage: "Experimental: enables the use of random linear network coding for gossiping."
Name: "use-rlnc",
Usage: "Experimental: enables the use of random linear network coding for gossiping.",
Hidden: true,
}
)
Expand Down
19 changes: 19 additions & 0 deletions consensus-types/chunks/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@prysm//tools/go:def.bzl", "go_library")

go_library(
name = "go_default_library",
srcs = [
"beacon_block_chunk.go",
"error.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/consensus-types/chunks",
visibility = ["//visibility:public"],
deps = [
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
],
)
Loading

0 comments on commit 665b04f

Please sign in to comment.