From a74fdae384a74c3e2bb9bbc35300d21f69569137 Mon Sep 17 00:00:00 2001 From: syntrust Date: Fri, 26 Dec 2025 18:28:41 +0800 Subject: [PATCH] fix delay --- ethstorage/eth/polling_client.go | 14 ++----- ethstorage/node/node.go | 68 +++++++++++++++++++++----------- 2 files changed, 50 insertions(+), 32 deletions(-) diff --git a/ethstorage/eth/polling_client.go b/ethstorage/eth/polling_client.go index 95169cb7..07906217 100644 --- a/ethstorage/eth/polling_client.go +++ b/ethstorage/eth/polling_client.go @@ -17,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/rpc" "golang.org/x/mod/semver" ) @@ -167,7 +168,7 @@ func (w *PollingClient) pollHeads() { // after the pollRate elapses. head, err := w.queryHeader() if err != nil { - w.lg.Info("Error getting latest header", "err", err) + w.lg.Warn("Error getting latest header", "err", err) reqPollAfter() continue } @@ -177,7 +178,7 @@ func (w *PollingClient) pollHeads() { continue } - w.lg.Trace("Notifying subscribers of new head", "head", head.Hash()) + w.lg.Trace("Notifying subscribers of new head", "height", head.Number.String(), "head", head.Hash()) w.currHead = head w.mtx.RLock() for _, sub := range w.subs { @@ -195,14 +196,7 @@ func (w *PollingClient) pollHeads() { func (w *PollingClient) getLatestHeader() (*types.Header, error) { ctx, cancel := context.WithTimeout(w.ctx, 5*time.Second) defer cancel() - latest, err := w.BlockNumber(ctx) - if err != nil { - w.lg.Error("Failed to get latest block number", "err", err) - return nil, err - } - // The latest blockhash could be empty - number := new(big.Int).SetUint64(latest - 1) - return w.HeaderByNumber(ctx, number) + return w.HeaderByNumber(ctx, big.NewInt(rpc.LatestBlockNumber.Int64())) } func (w *PollingClient) reqPoll() { diff --git a/ethstorage/node/node.go b/ethstorage/node/node.go index 4f20bd49..25874da4 100644 --- a/ethstorage/node/node.go +++ b/ethstorage/node/node.go @@ -8,6 +8,7 @@ import ( "encoding/json" "fmt" "io" + "math/big" "net/http" "strings" "time" @@ -149,7 +150,7 @@ func (n *EsNode) initL2(ctx context.Context, cfg *Config) error { } func (n *EsNode) initL1(ctx context.Context, cfg *Config) error { - client, err := eth.Dial(cfg.L1.L1NodeAddr, cfg.Storage.L1Contract, cfg.L1.L1BlockTime, n.lg) + client, err := eth.Dial(cfg.L1.L1NodeAddr, cfg.Storage.L1Contract, 1, n.lg) if err != nil { return fmt.Errorf("failed to create L1 source: %w", err) } @@ -168,7 +169,7 @@ func (n *EsNode) initL1(ctx context.Context, cfg *Config) error { return fmt.Errorf("no L1 beacon or DA URL provided") } if cfg.RandaoSourceURL != "" { - rc, err := eth.DialRandaoSource(ctx, cfg.RandaoSourceURL, cfg.L1.L1NodeAddr, cfg.L1.L1BlockTime, n.lg) + rc, err := eth.DialRandaoSource(ctx, cfg.RandaoSourceURL, cfg.L1.L1NodeAddr, 1, n.lg) if err != nil { return fmt.Errorf("failed to create randao source: %w", err) } @@ -193,26 +194,26 @@ func (n *EsNode) startL1(cfg *Config) { } n.lg.Error("L1 heads subscription error", "err", err) }() - - // Keep subscribed to the randao heads, which helps miner to get proper random seeds - n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { - if err != nil { - n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) - } - if n.randaoSource != nil { - return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) - } else { - return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewRandaoSourceHead) - } - }) - go func() { - err, ok := <-n.randaoHeadsSub.Err() - if !ok { - return - } - n.lg.Error("Randao heads subscription error", "err", err) - }() - + if n.miner != nil { + // Keep subscribed to the randao heads, which helps miner to get proper random seeds + n.randaoHeadsSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) { + if err != nil { + n.lg.Warn("Resubscribing after failed randao head subscription", "err", err) + } + if n.randaoSource != nil { + return eth.WatchHeadChanges(n.resourcesCtx, n.randaoSource, n.OnNewRandaoSourceHead) + } else { + return eth.WatchHeadChanges(n.resourcesCtx, n.l1Source, n.OnNewL1HeadDelayed) + } + }) + go func() { + err, ok := <-n.randaoHeadsSub.Err() + if !ok { + return + } + n.lg.Error("Randao heads subscription error", "err", err) + }() + } // Poll for the safe L1 block and finalized block, // which only change once per epoch at most and may be delayed. n.l1SafeSub = eth.PollBlockChanges(n.resourcesCtx, n.lg, n.l1Source, n.OnNewL1Safe, ethRPC.SafeBlockNumber, @@ -471,6 +472,29 @@ func (n *EsNode) OnNewL1Head(ctx context.Context, sig eth.L1BlockRef) { } } +func (n *EsNode) OnNewL1HeadDelayed(ctx context.Context, sig eth.L1BlockRef) { + if n.miner != nil { + // Use the second latest block to mine because blockhash() for the latest could be empty from the contract + header, err := n.l1Source.HeaderByNumber(ctx, new(big.Int).SetUint64(sig.Number-1)) + if err != nil { + n.lg.Warn("Failed to get randao source head for miner", "blockNumber", header.Number.Uint64(), "err", err) + return + } + select { + case n.miner.ChainHeadCh <- eth.L1BlockRef{ + Hash: header.Hash(), + Number: header.Number.Uint64(), + ParentHash: header.ParentHash, + Time: header.Time, + MixDigest: header.MixDigest, + }: + n.lg.Debug("OnNewRandaoSourceHead sent to miner", "blockNumber", header.Number.Uint64()) + default: + // Channel is full, skipping + } + } +} + func (n *EsNode) OnNewRandaoSourceHead(ctx context.Context, sig eth.L1BlockRef) { n.lg.Debug("OnNewRandaoSourceHead", "blockNumber", sig.Number) if n.miner != nil {