From ee9fe9910dffd98dc8b43ec4baffefdf25ccf6ea Mon Sep 17 00:00:00 2001 From: jenikd Date: Tue, 9 Sep 2025 12:01:27 +0200 Subject: [PATCH 1/3] Start feeder for archives only --- gossip/service.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/gossip/service.go b/gossip/service.go index c74a86644..d69c40e53 100644 --- a/gossip/service.go +++ b/gossip/service.go @@ -517,8 +517,12 @@ func (s *Service) Start() error { if s.store.evm.CheckLiveStateHash(blockState.LastBlock.Idx, blockState.FinalizedStateRoot) != nil { return errors.New("fullsync isn't possible because state root is missing") } - // start notification feeder - s.feed.Start(s.store.evm) + + _, _, err := s.store.evm.GetArchiveBlockHeight() + if err == nil { + // start notification feeder for archive nodes + s.feed.Start(s.store.evm) + } // start blocks processor s.blockProcTasks.Start(1) From f8398cdda980925f6ad3d43a826349a0d9028572 Mon Sep 17 00:00:00 2001 From: jenikd Date: Tue, 9 Sep 2025 20:32:02 +0200 Subject: [PATCH 2/3] Cancel notification updates when feeder is not started --- gossip/service.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gossip/service.go b/gossip/service.go index d69c40e53..265a6e9cb 100644 --- a/gossip/service.go +++ b/gossip/service.go @@ -144,6 +144,10 @@ func (f *ServiceFeed) notifyAboutNewBlock( block *evmcore.EvmBlock, logs []*types.Log, ) { + // ignore updates if not started + if f.incomingUpdates == nil { + return + } f.incomingUpdates <- feedUpdate{ block: block, logs: logs, From 7eef06e1638bcaa6f79b2c3083495636f4d3206a Mon Sep 17 00:00:00 2001 From: jenikd Date: Thu, 11 Sep 2025 09:14:17 +0200 Subject: [PATCH 3/3] Check for NoArchiveError in feeder --- gossip/evmstore/statedb.go | 7 +- gossip/service.go | 31 ++++--- gossip/service_mock.go | 55 ++++++++++++ gossip/service_test.go | 178 +++++++++++++++++++++++++++++++++++++ 4 files changed, 259 insertions(+), 12 deletions(-) create mode 100644 gossip/service_mock.go create mode 100644 gossip/service_test.go diff --git a/gossip/evmstore/statedb.go b/gossip/evmstore/statedb.go index a2218d36e..42944ff2d 100644 --- a/gossip/evmstore/statedb.go +++ b/gossip/evmstore/statedb.go @@ -2,6 +2,8 @@ package evmstore import ( "fmt" + "math/big" + cc "github.com/Fantom-foundation/Carmen/go/common" carmen "github.com/Fantom-foundation/Carmen/go/state" _ "github.com/Fantom-foundation/Carmen/go/state/gostate" @@ -9,9 +11,12 @@ import ( "github.com/Fantom-foundation/lachesis-base/hash" "github.com/Fantom-foundation/lachesis-base/inter/idx" "github.com/ethereum/go-ethereum/common" - "math/big" ) +// NoArchiveError is an error returned by implementation of the State interface +// for archive operations if no archive is maintained by this implementation. +const NoArchiveError = carmen.NoArchiveError + // GetLiveStateDb obtains StateDB for block processing - the live writable state func (s *Store) GetLiveStateDb(stateRoot hash.Hash) (state.StateDB, error) { if s.liveStateDb == nil { diff --git a/gossip/service.go b/gossip/service.go index 265a6e9cb..0d0d7c206 100644 --- a/gossip/service.go +++ b/gossip/service.go @@ -73,6 +73,10 @@ type feedUpdate struct { logs []*types.Log } +type ArchiveBlockHeightSource interface { + GetArchiveBlockHeight() (uint64, bool, error) +} + func (f *ServiceFeed) SubscribeNewEpoch(ch chan<- idx.Epoch) notify.Subscription { return f.scope.Track(f.newEpoch.Subscribe(ch)) } @@ -89,7 +93,7 @@ func (f *ServiceFeed) SubscribeNewLogs(ch chan<- []*types.Log) notify.Subscripti return f.scope.Track(f.newLogs.Subscribe(ch)) } -func (f *ServiceFeed) Start(store *evmstore.Store) { +func (f *ServiceFeed) Start(store ArchiveBlockHeightSource) { incoming := make(chan feedUpdate, 1024) f.incomingUpdates = incoming stop := make(chan struct{}) @@ -122,12 +126,20 @@ func (f *ServiceFeed) Start(store *evmstore.Store) { height, empty, err := store.GetArchiveBlockHeight() if err != nil { - log.Error("failed to get archive block height", "err", err) - continue - } - if empty { - continue + // If there is no archive, set height to the last block + // and send all notifications + if errors.Is(err, evmstore.NoArchiveError) { + height = pending[len(pending)-1].block.Number.Uint64() + } else { + log.Error("failed to get archive block height", "err", err) + continue + } + } else { + if empty { + continue + } } + for _, update := range pending { if update.block.Number.Uint64() > height { break @@ -522,11 +534,8 @@ func (s *Service) Start() error { return errors.New("fullsync isn't possible because state root is missing") } - _, _, err := s.store.evm.GetArchiveBlockHeight() - if err == nil { - // start notification feeder for archive nodes - s.feed.Start(s.store.evm) - } + // start notification feeder + s.feed.Start(s.store.evm) // start blocks processor s.blockProcTasks.Start(1) diff --git a/gossip/service_mock.go b/gossip/service_mock.go new file mode 100644 index 000000000..8d40a9931 --- /dev/null +++ b/gossip/service_mock.go @@ -0,0 +1,55 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: service.go +// +// Generated by this command: +// +// mockgen -source=service.go -package=gossip -destination=service_mock.go +// + +// Package gossip is a generated GoMock package. +package gossip + +import ( + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockArchiveBlockHeightSource is a mock of ArchiveBlockHeightSource interface. +type MockArchiveBlockHeightSource struct { + ctrl *gomock.Controller + recorder *MockArchiveBlockHeightSourceMockRecorder +} + +// MockArchiveBlockHeightSourceMockRecorder is the mock recorder for MockArchiveBlockHeightSource. +type MockArchiveBlockHeightSourceMockRecorder struct { + mock *MockArchiveBlockHeightSource +} + +// NewMockArchiveBlockHeightSource creates a new mock instance. +func NewMockArchiveBlockHeightSource(ctrl *gomock.Controller) *MockArchiveBlockHeightSource { + mock := &MockArchiveBlockHeightSource{ctrl: ctrl} + mock.recorder = &MockArchiveBlockHeightSourceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockArchiveBlockHeightSource) EXPECT() *MockArchiveBlockHeightSourceMockRecorder { + return m.recorder +} + +// GetArchiveBlockHeight mocks base method. +func (m *MockArchiveBlockHeightSource) GetArchiveBlockHeight() (uint64, bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetArchiveBlockHeight") + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(bool) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetArchiveBlockHeight indicates an expected call of GetArchiveBlockHeight. +func (mr *MockArchiveBlockHeightSourceMockRecorder) GetArchiveBlockHeight() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetArchiveBlockHeight", reflect.TypeOf((*MockArchiveBlockHeightSource)(nil).GetArchiveBlockHeight)) +} diff --git a/gossip/service_test.go b/gossip/service_test.go new file mode 100644 index 000000000..cd1fb714b --- /dev/null +++ b/gossip/service_test.go @@ -0,0 +1,178 @@ +package gossip + +import ( + "fmt" + "math/big" + "testing" + "time" + + "github.com/Fantom-foundation/go-opera/evmcore" + "github.com/Fantom-foundation/go-opera/gossip/evmstore" + "go.uber.org/mock/gomock" +) + +func TestServiceFeed_SubscribeNewBlock(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockArchiveBlockHeightSource(ctrl) + + store.EXPECT().GetArchiveBlockHeight().Return(uint64(12), false, nil).AnyTimes() + + feed := ServiceFeed{} + feed.Start(store) + + consumer := make(chan evmcore.ChainHeadNotify, 1) + feed.SubscribeNewBlock(consumer) + + // There should be no signal delivered until there is a notification. + select { + case <-consumer: + t.Fatal("expected no notification to be sent") + case <-time.After(100 * time.Millisecond): + // all good + } + + feed.notifyAboutNewBlock(&evmcore.EvmBlock{ + EvmHeader: evmcore.EvmHeader{ + Number: big.NewInt(12), + }, + }, nil) + + // The notification should be delivered. + select { + case notification := <-consumer: + if notification.Block.Number.Cmp(big.NewInt(12)) != 0 { + t.Fatalf("expected block number 12, got %d", notification.Block.Number) + } + case <-time.After(100 * time.Millisecond): + t.Fatal("expected notification to be sent") + } + + feed.Stop() +} + +func TestServiceFeed_BlocksInOrder(t *testing.T) { + ctrl := gomock.NewController(t) + store := NewMockArchiveBlockHeightSource(ctrl) + + var startBlocknumber uint64 = 5 + mockBlockNumber := startBlocknumber + expectedBlockNumber := startBlocknumber + 1 + + // Increment expected block height + store.EXPECT().GetArchiveBlockHeight().DoAndReturn(func() (uint64, bool, error) { + mockBlockNumber++ + return mockBlockNumber, false, nil + }).AnyTimes() + + feed := ServiceFeed{} + feed.Start(store) + + consumer := make(chan evmcore.ChainHeadNotify, 5) + feed.SubscribeNewBlock(consumer) + + // Emit blocks + blockNumbers := []int64{8, 6, 7, 10, 9} + for _, blockNumber := range blockNumbers { + feed.notifyAboutNewBlock(&evmcore.EvmBlock{ + EvmHeader: evmcore.EvmHeader{ + Number: big.NewInt(blockNumber), + }, + }, nil) + } + + // The notification should be delivered in correct order + for expectedBlockNumber <= startBlocknumber+uint64(len(blockNumbers)) { + select { + case notification := <-consumer: + if notification.Block.Number.Cmp(big.NewInt(int64(expectedBlockNumber))) != 0 { + t.Fatalf("expected block number %d, got %d", expectedBlockNumber, notification.Block.Number) + } + expectedBlockNumber++ + + case <-time.After(3 * time.Second): + t.Fatal("expected notification should be already received") + } + } + + feed.Stop() +} + +type expectedBlockNotification struct { + blockNumber uint64 +} + +func TestServiceFeed_ArchiveState(t *testing.T) { + + tests := map[string]struct { + blockHeight uint64 + emptyArchive bool + err error + expectedNotification *expectedBlockNotification + }{ + "empty archive": { + blockHeight: 0, + emptyArchive: true, + err: nil, + expectedNotification: nil, + }, + "non-empty archive": { + blockHeight: 12, + emptyArchive: false, + err: nil, + expectedNotification: &expectedBlockNotification{blockNumber: 12}, + }, + "non-existing archive": { + blockHeight: 12, + emptyArchive: true, + err: evmstore.NoArchiveError, + expectedNotification: &expectedBlockNotification{blockNumber: 12}, + }, + "different archive error": { + blockHeight: 12, + emptyArchive: false, + err: fmt.Errorf("some other error"), + expectedNotification: nil, + }, + } + + for testName, test := range tests { + t.Run(testName, func(t *testing.T) { + + ctrl := gomock.NewController(t) + store := NewMockArchiveBlockHeightSource(ctrl) + + store.EXPECT().GetArchiveBlockHeight().Return(test.blockHeight, test.emptyArchive, test.err).AnyTimes() + + feed := ServiceFeed{} + feed.Start(store) + + consumer := make(chan evmcore.ChainHeadNotify, 1) + feed.SubscribeNewBlock(consumer) + + feed.notifyAboutNewBlock(&evmcore.EvmBlock{ + EvmHeader: evmcore.EvmHeader{ + Number: big.NewInt(int64(test.blockHeight)), + }, + }, nil) + + // The notification should be delivered. + select { + case notification := <-consumer: + if test.expectedNotification == nil { + t.Fatal("expected notification to be sent") + } else { + if notification.Block.Number.Cmp(big.NewInt(int64(test.expectedNotification.blockNumber))) != 0 { + t.Fatalf("expected block number %d, got %d", test.expectedNotification.blockNumber, notification.Block.Number) + } + } + // no notification should be received + case <-time.After(100 * time.Millisecond): + if test.expectedNotification != nil { + t.Fatal("expected no notification to be sent") + } + } + + feed.Stop() + }) + } +}