diff --git a/op-node/cmd/main.go b/op-node/cmd/main.go index b82b3f6babce2..613b5b102cfe1 100644 --- a/op-node/cmd/main.go +++ b/op-node/cmd/main.go @@ -18,6 +18,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/flags" "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/node" + "github.com/ethereum-optimism/optimism/op-node/params" "github.com/ethereum-optimism/optimism/op-node/version" opservice "github.com/ethereum-optimism/optimism/op-service" "github.com/ethereum-optimism/optimism/op-service/cliapp" @@ -94,6 +95,11 @@ func RollupNodeMain(ctx *cli.Context, closeApp context.CancelCauseFunc) (cliapp. cfg.Rollup.LogDescription(log, chaincfg.L2ChainIDToNetworkDisplayName) } + // CHANGE(thedevbirb): assess whether we're in chain replication mode at startup. + if _, ok := os.LookupEnv("BOP_REPLAY"); ok { + params.BopReplay = true + } + n, err := node.New(ctx.Context, cfg, log, VersionWithMeta, m) if err != nil { return nil, fmt.Errorf("unable to create the rollup node: %w", err) diff --git a/op-node/node/node.go b/op-node/node/node.go index 8b0d8eb055a31..8a5033959da53 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -4,12 +4,13 @@ import ( "context" "errors" "fmt" - "github.com/ethereum/go-ethereum/common" "io" gosync "sync" "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/hashicorp/go-multierror" "github.com/libp2p/go-libp2p/core/peer" @@ -22,6 +23,7 @@ import ( "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/node/safedb" "github.com/ethereum-optimism/optimism/op-node/p2p" + "github.com/ethereum-optimism/optimism/op-node/params" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/conductor" "github.com/ethereum-optimism/optimism/op-node/rollup/driver" @@ -248,7 +250,7 @@ func (n *OpNode) initRegistry(ctx context.Context, cfg *Config) error { } // Initially fetch the current gateway + n gateways into the future - err = n.registrySource.FetchNextNGateways(ctx, 2, 3) + err = n.registrySource.FetchNextNGateways(ctx, 6, 3) if err != nil { return fmt.Errorf("failed to fetch initial gateways: %w", err) } @@ -259,7 +261,7 @@ func (n *OpNode) initRegistry(ctx context.Context, cfg *Config) error { fetchCtx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() - if err := n.registrySource.FetchNextNGateways(fetchCtx, 2, 3); err != nil { + if err := n.registrySource.FetchNextNGateways(fetchCtx, 6, 3); err != nil { n.log.Warn("registry fetch error", "err", err) } time.Sleep(time.Second) @@ -620,6 +622,10 @@ func (n *OpNode) onEvent(ev event.Event) bool { } func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) { + // CHANGE(thedevbirb): allow chain replication without deviation due to L1 state. + if params.BopReplay { + return + } n.tracer.OnNewL1Head(ctx, sig) if n.l2Driver == nil { @@ -634,6 +640,10 @@ func (n *OpNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) { } func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) { + // CHANGE(thedevbirb): allow chain replication without deviation due to L1 state. + if params.BopReplay { + return + } if n.l2Driver == nil { return } @@ -646,6 +656,10 @@ func (n *OpNode) OnNewL1Safe(ctx context.Context, sig eth.L1BlockRef) { } func (n *OpNode) OnNewL1Finalized(ctx context.Context, sig eth.L1BlockRef) { + // CHANGE(thedevbirb): allow chain replication without deviation due to L1 state. + if params.BopReplay { + return + } if n.l2Driver == nil { return } @@ -688,7 +702,6 @@ func (n *OpNode) PublishNewFrag(ctx context.Context, from peer.ID, frag *eth.Sig } func (n *OpNode) PublishSealFrag(ctx context.Context, from peer.ID, seal *eth.SignedSeal) error { - n.tracer.OnPublishSealFrag(ctx, from, seal) // publish to p2p, if we are running p2p at all @@ -704,7 +717,6 @@ func (n *OpNode) PublishSealFrag(ctx context.Context, from peer.ID, seal *eth.Si } func (n *OpNode) PublishEnv(ctx context.Context, from peer.ID, env *eth.SignedEnv) error { - n.tracer.OnPublishEnv(ctx, from, env) // publish to p2p, if we are running p2p at all @@ -782,7 +794,8 @@ func (n *OpNode) OnEnv(ctx context.Context, from peer.ID, env *eth.SignedEnv) er } func (n *OpNode) RequestL2Range(ctx context.Context, start, end eth.L2BlockRef) error { - if p2pNode := n.getP2PNodeIfEnabled(); p2pNode != nil && p2pNode.AltSyncEnabled() { + // CHANGE(thedevbirb): for chain replication, ignoring sending p2p syncing requests which may block the event loop. + if p2pNode := n.getP2PNodeIfEnabled(); p2pNode != nil && p2pNode.AltSyncEnabled() && !params.BopReplay { if unixTimeStale(start.Time, 12*time.Hour) { n.log.Debug( "ignoring request to sync L2 range, timestamp is too old for p2p", diff --git a/op-node/p2p/gossip.go b/op-node/p2p/gossip.go index 64d8fdbf9c53e..117e15ad4ac1e 100644 --- a/op-node/p2p/gossip.go +++ b/op-node/p2p/gossip.go @@ -7,10 +7,11 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/ethereum/go-ethereum/crypto" "sync" "time" + "github.com/ethereum/go-ethereum/crypto" + "github.com/golang/snappy" lru "github.com/hashicorp/golang-lru/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -21,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum-optimism/optimism/op-node/params" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-service/eth" opsigner "github.com/ethereum-optimism/optimism/op-service/signer" @@ -49,8 +51,10 @@ const ( // Message domains, the msg id function uncompresses to keep data monomorphic, // but invalid compressed data will need a unique different id. -var MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0} -var MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} +var ( + MessageDomainInvalidSnappy = [4]byte{0, 0, 0, 0} + MessageDomainValidSnappy = [4]byte{1, 0, 0, 0} +) type GossipSetupConfigurables interface { PeerScoringParams() *ScoringParams @@ -404,7 +408,6 @@ func verifyGatewaySignature(log log.Logger, signatureBytes []byte, messageBytes } func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRuntimeConfig, blockVersion eth.BlockVersion) pubsub.ValidatorEx { - // Seen block hashes per block height // uint64 -> *seenBlocks blockHeightLRU, err := lru.New[uint64, *seenBlocks](1000) @@ -472,10 +475,13 @@ func BuildBlocksValidator(log log.Logger, cfg *rollup.Config, runCfg GossipRunti // rounding down to seconds is fine here. now := uint64(time.Now().Unix()) - // [REJECT] if the `payload.timestamp` is older than 60 seconds in the past - if uint64(payload.Timestamp) < now-60 { - log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp)) - return pubsub.ValidationReject + // CHANGE(thedevbirb): for chain replication, allow old blocks. + if !params.BopReplay { + // [REJECT] if the `payload.timestamp` is older than 60 seconds in the past + if uint64(payload.Timestamp) < now-60 { + log.Warn("payload is too old", "timestamp", uint64(payload.Timestamp)) + return pubsub.ValidationReject + } } // [REJECT] if the `payload.timestamp` is more than 5 seconds into the future @@ -703,7 +709,7 @@ type publisher struct { var _ GossipOut = (*publisher)(nil) func combinePeers(allPeers ...[]peer.ID) []peer.ID { - var seen = make(map[peer.ID]bool) + seen := make(map[peer.ID]bool) var res []peer.ID for _, peers := range allPeers { for _, p := range peers { @@ -933,7 +939,6 @@ func newNewFragTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log validator, pubsub.WithValidatorTimeout(3*time.Second), pubsub.WithValidatorConcurrency(4)) - if err != nil { return nil, fmt.Errorf("failed to register gossip topic: %w", err) } @@ -971,7 +976,6 @@ func sealFragFragTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, l validator, pubsub.WithValidatorTimeout(3*time.Second), pubsub.WithValidatorConcurrency(4)) - if err != nil { return nil, fmt.Errorf("failed to register gossip topic: %w", err) } @@ -1009,7 +1013,6 @@ func newEnvTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log log validator, pubsub.WithValidatorTimeout(3*time.Second), pubsub.WithValidatorConcurrency(4)) - if err != nil { return nil, fmt.Errorf("failed to register gossip topic: %w", err) } @@ -1047,7 +1050,6 @@ func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log l validator, pubsub.WithValidatorTimeout(3*time.Second), pubsub.WithValidatorConcurrency(4)) - if err != nil { return nil, fmt.Errorf("failed to register gossip topic: %w", err) } @@ -1080,8 +1082,10 @@ func newBlockTopic(ctx context.Context, topicId string, ps *pubsub.PubSub, log l }, nil } -type TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription) -type MessageHandler func(ctx context.Context, from peer.ID, msg any) error +type ( + TopicSubscriber func(ctx context.Context, sub *pubsub.Subscription) + MessageHandler func(ctx context.Context, from peer.ID, msg any) error +) func NewFragHandler(onNewFrag func(ctx context.Context, from peer.ID, msg *eth.SignedNewFrag) error) MessageHandler { return func(ctx context.Context, from peer.ID, msg any) error { diff --git a/op-node/params/globals.go b/op-node/params/globals.go new file mode 100644 index 0000000000000..e34d0252543cf --- /dev/null +++ b/op-node/params/globals.go @@ -0,0 +1,6 @@ +package params + +// CHANGE(thedevbirb): A global variable set only at node startup that assess +// whether the node is running in chain replication mode, leading to some +// syncing and safety functionality to be disabled. +var BopReplay = false diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index fa2f04dba8e2d..3e5ef10eb987f 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "os" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -174,6 +175,11 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl } }() + // CHANGE(thedevbirb): for chain replication we must ignore deriving the chain from L1 data. + if _, ok := os.LookupEnv("BOP_REPLAY"); ok { + return nil, io.EOF + } + // if any stages need to be reset, do that first. if dp.resetting < len(dp.stages) { if !dp.engineIsReset { diff --git a/op-node/rollup/driver/state.go b/op-node/rollup/driver/state.go index d4787d886b954..b4fb76afff067 100644 --- a/op-node/rollup/driver/state.go +++ b/op-node/rollup/driver/state.go @@ -449,7 +449,8 @@ func (s *SyncDeriver) SyncStep() { return false } else { s.Emitter.Emit(rollup.CriticalErrorEvent{ - Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err)}) + Err: fmt.Errorf("unexpected error on SyncStep event Drain: %w", err), + }) return false } } diff --git a/op-node/rollup/sync/config.go b/op-node/rollup/sync/config.go index 4e36092aaaf3c..51b34061d32fb 100644 --- a/op-node/rollup/sync/config.go +++ b/op-node/rollup/sync/config.go @@ -22,8 +22,10 @@ const ( ELSyncString string = "execution-layer" ) -var Modes = []Mode{CLSync, ELSync} -var ModeStrings = []string{CLSyncString, ELSyncString} +var ( + Modes = []Mode{CLSync, ELSync} + ModeStrings = []string{CLSyncString, ELSyncString} +) func StringToMode(s string) (Mode, error) { switch strings.ToLower(s) { diff --git a/op-node/service.go b/op-node/service.go index fac52eb43eedc..ef6b12ef4144a 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -85,7 +85,7 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { conductorRPCEndpoint := ctx.String(flags.ConductorRpcFlag.Name) cfg := &node.Config{ L1: l1Endpoint, - Registry: registryEndpoint, + Registry: registryEndpoint, L2: l2Endpoint, Rollup: *rollupConfig, Driver: *driverConfig,