From da36deb9f88c4c4f4d61a342ff88bbc0546e5f0e Mon Sep 17 00:00:00 2001 From: Ahmed Mohamed Date: Fri, 3 Oct 2025 12:05:32 +1000 Subject: [PATCH 1/3] Update go-eth2-client. --- go.mod | 2 +- go.sum | 4 ++-- main.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index d0d222e..6f1d4b8 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.22.0 toolchain go1.23.2 require ( - github.com/attestantio/go-eth2-client v0.24.0 + github.com/attestantio/go-eth2-client v0.27.1 github.com/mitchellh/go-homedir v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.20.5 diff --git a/go.sum b/go.sum index 0cdfc42..78cb9c9 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/attestantio/go-eth2-client v0.24.0 h1:lGVbcnhlBwRglt1Zs56JOCgXVyLWKFZOmZN8jKhE7Ws= -github.com/attestantio/go-eth2-client v0.24.0/go.mod h1:/KTLN3WuH1xrJL7ZZrpBoWM1xCCihnFbzequD5L+83o= +github.com/attestantio/go-eth2-client v0.27.1 h1:g7bm+gG/p+gfzYdEuxuAepVWYb8EO+2KojV5/Lo2BxM= +github.com/attestantio/go-eth2-client v0.27.1/go.mod h1:fvULSL9WtNskkOB4i+Yyr6BKpNHXvmpGZj9969fCrfY= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= diff --git a/main.go b/main.go index 7896835..4e9b7da 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ import ( ) // ReleaseVersion is the release version for the code. -var ReleaseVersion = "0.5.1" +var ReleaseVersion = "0.5.2" func main() { os.Exit(main2()) From ccdbe34076e1bc322dd509ec4e7c3edcd69ce49d Mon Sep 17 00:00:00 2001 From: Ahmed Mohamed Date: Fri, 3 Oct 2025 13:39:50 +1000 Subject: [PATCH 2/3] Migrate to new Events API. --- services/attestations/events/service.go | 53 ++++++++++++-------- services/blocks/events/service.go | 65 ++++++++++++------------- services/heads/events/service.go | 65 ++++++++++++------------- 3 files changed, 95 insertions(+), 88 deletions(-) diff --git a/services/attestations/events/service.go b/services/attestations/events/service.go index 149b724..cdadab9 100644 --- a/services/attestations/events/service.go +++ b/services/attestations/events/service.go @@ -22,7 +22,7 @@ import ( consensusclient "github.com/attestantio/go-eth2-client" "github.com/attestantio/go-eth2-client/api" - apiv1 "github.com/attestantio/go-eth2-client/api/v1" + "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/pkg/errors" bitfield "github.com/prysmaticlabs/go-bitfield" @@ -89,26 +89,39 @@ func (s *Service) monitorEvents(ctx context.Context, eventsProvider consensusclient.EventsProvider, nodeVersionProvider consensusclient.NodeVersionProvider, ) error { - if err := eventsProvider.Events(ctx, []string{"attestation"}, func(event *apiv1.Event) { - data, isData := event.Data.(*phase0.Attestation) - if !isData { - log.Error().Msg("Event data not of expected type") - return - } - delay := time.Since(s.chainTime.StartOfSlot(data.Data.Slot)) - if delay.Seconds() < 0 || delay.Seconds() > 12 { - log.Trace().Uint64("slot", uint64(data.Data.Slot)).Stringer("delay", delay).Msg("Delay out of range, ignoring") - return - } - monitorEventProcessed(delay) + if err := eventsProvider.Events(ctx, &api.EventsOpts{ + AttestationHandler: func(ctx context.Context, event *spec.VersionedAttestation) { + data, err := event.Data() + if err != nil { + log.Error().Err(err).Msg("Failed to get attestation data") + return + } - // We treat attestations differently depending on if they are individual or aggregate. - validators := data.AggregationBits.Count() - if validators == 1 { - s.handleAttestation(ctx, address, data, delay) - } else { - s.handleAggregateAttestation(ctx, nodeVersionProvider, data, delay) - } + delay := time.Since(s.chainTime.StartOfSlot(data.Slot)) + if delay.Seconds() < 0 || delay.Seconds() > 12 { + log.Trace().Uint64("slot", uint64(data.Slot)).Stringer("delay", delay).Msg("Delay out of range, ignoring") + return + } + monitorEventProcessed(delay) + + // We treat attestations differently depending on if they are individual or aggregate. + aggregationBits, err := event.AggregationBits() + if err != nil { + log.Error().Err(err).Msg("Failed to get attestation aggregation bits") + return + } + attestation := &phase0.Attestation{ + AggregationBits: aggregationBits, + Data: data, + } + + validators := aggregationBits.Count() + if validators == 1 { + s.handleAttestation(ctx, address, attestation, delay) + } else { + s.handleAggregateAttestation(ctx, nodeVersionProvider, attestation, delay) + } + }, }); err != nil { return errors.Wrap(err, "failed to create events provider") } diff --git a/services/blocks/events/service.go b/services/blocks/events/service.go index 16408c3..23d57c4 100644 --- a/services/blocks/events/service.go +++ b/services/blocks/events/service.go @@ -72,40 +72,37 @@ func (s *Service) monitorEvents(ctx context.Context, eventsProvider consensusclient.EventsProvider, nodeVersionProvider consensusclient.NodeVersionProvider, ) error { - if err := eventsProvider.Events(ctx, []string{"block"}, func(event *apiv1.Event) { - data, isData := event.Data.(*apiv1.BlockEvent) - if !isData { - log.Error().Msg("Event data not of expected type") - return - } - delay := time.Since(s.chainTime.StartOfSlot(data.Slot)) - - // Ensure the node is synced. - syncingResponse, err := eventsProvider.(consensusclient.NodeSyncingProvider).NodeSyncing(ctx, &api.NodeSyncingOpts{}) - if err != nil { - log.Error().Err(err).Msg("Failed to ascertain if node is syncing") - return - } - if syncingResponse.Data.IsSyncing { - log.Debug().Msg("Node is syncing, not sending information") - } - - monitorEventProcessed(delay) - - nodeVersionResponse, err := nodeVersionProvider.NodeVersion(ctx, &api.NodeVersionOpts{}) - if err != nil { - log.Error().Err(err).Msg("Failed to obtain node version") - return - } - - // Build and send the data. - body := fmt.Sprintf( - `{"source":"%s","method":"block event","slot":"%d","delay_ms":"%d"}`, - nodeVersionResponse.Data, - data.Slot, - int(delay.Milliseconds()), - ) - s.submitter.SubmitBlockDelay(ctx, body) + if err := eventsProvider.Events(ctx, &api.EventsOpts{ + BlockHandler: func(ctx context.Context, event *apiv1.BlockEvent) { + delay := time.Since(s.chainTime.StartOfSlot(event.Slot)) + + // Ensure the node is synced. + syncingResponse, err := eventsProvider.(consensusclient.NodeSyncingProvider).NodeSyncing(ctx, &api.NodeSyncingOpts{}) + if err != nil { + log.Error().Err(err).Msg("Failed to ascertain if node is syncing") + return + } + if syncingResponse.Data.IsSyncing { + log.Debug().Msg("Node is syncing, not sending information") + } + + monitorEventProcessed(delay) + + nodeVersionResponse, err := nodeVersionProvider.NodeVersion(ctx, &api.NodeVersionOpts{}) + if err != nil { + log.Error().Err(err).Msg("Failed to obtain node version") + return + } + + // Build and send the data. + body := fmt.Sprintf( + `{"source":"%s","method":"block event","slot":"%d","delay_ms":"%d"}`, + nodeVersionResponse.Data, + event.Slot, + int(delay.Milliseconds()), + ) + s.submitter.SubmitBlockDelay(ctx, body) + }, }); err != nil { return errors.Wrap(err, "failed to create events provider") } diff --git a/services/heads/events/service.go b/services/heads/events/service.go index fdbeddf..04e84b1 100644 --- a/services/heads/events/service.go +++ b/services/heads/events/service.go @@ -72,40 +72,37 @@ func (s *Service) monitorEvents(ctx context.Context, eventsProvider consensusclient.EventsProvider, nodeVersionProvider consensusclient.NodeVersionProvider, ) error { - if err := eventsProvider.Events(ctx, []string{"head"}, func(event *apiv1.Event) { - data, isData := event.Data.(*apiv1.HeadEvent) - if !isData { - log.Error().Msg("Event data not of expected type") - return - } - delay := time.Since(s.chainTime.StartOfSlot(data.Slot)) - - // Ensure the node is synced. - syncingResponse, err := eventsProvider.(consensusclient.NodeSyncingProvider).NodeSyncing(ctx, &api.NodeSyncingOpts{}) - if err != nil { - log.Error().Err(err).Msg("Failed to ascertain if node is syncing") - return - } - if syncingResponse.Data.IsSyncing { - log.Debug().Msg("Node is syncing, not sending information") - } - - monitorEventProcessed(delay) - - nodeVersionResponse, err := nodeVersionProvider.NodeVersion(ctx, &api.NodeVersionOpts{}) - if err != nil { - log.Error().Err(err).Msg("Failed to obtain node version") - return - } - - // Build and send the data. - body := fmt.Sprintf( - `{"source":"%s","method":"head event","slot":"%d","delay_ms":"%d"}`, - nodeVersionResponse.Data, - data.Slot, - int(delay.Milliseconds()), - ) - s.submitter.SubmitHeadDelay(ctx, body) + if err := eventsProvider.Events(ctx, &api.EventsOpts{ + HeadHandler: func(ctx context.Context, event *apiv1.HeadEvent) { + delay := time.Since(s.chainTime.StartOfSlot(event.Slot)) + + // Ensure the node is synced. + syncingResponse, err := eventsProvider.(consensusclient.NodeSyncingProvider).NodeSyncing(ctx, &api.NodeSyncingOpts{}) + if err != nil { + log.Error().Err(err).Msg("Failed to ascertain if node is syncing") + return + } + if syncingResponse.Data.IsSyncing { + log.Debug().Msg("Node is syncing, not sending information") + } + + monitorEventProcessed(delay) + + nodeVersionResponse, err := nodeVersionProvider.NodeVersion(ctx, &api.NodeVersionOpts{}) + if err != nil { + log.Error().Err(err).Msg("Failed to obtain node version") + return + } + + // Build and send the data. + body := fmt.Sprintf( + `{"source":"%s","method":"head event","slot":"%d","delay_ms":"%d"}`, + nodeVersionResponse.Data, + event.Slot, + int(delay.Milliseconds()), + ) + s.submitter.SubmitHeadDelay(ctx, body) + }, }); err != nil { return errors.Wrap(err, "failed to create events provider") } From 84477b5783b6e4ca8e82060a9031feab4ff97b1d Mon Sep 17 00:00:00 2001 From: Ahmed Mohamed Date: Fri, 3 Oct 2025 14:08:25 +1000 Subject: [PATCH 3/3] Linting. --- .golangci.yml | 2 +- services/blocks/events/service.go | 7 ++++++- services/heads/events/service.go | 7 ++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 6fe5271..8ab8d3a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -81,7 +81,7 @@ run: # output configuration options -output: +# output: # Format: colored-line-number|line-number|json|tab|checkstyle|code-climate|junit-xml|github-actions # # Multiple can be specified by separating them by comma, output can be provided diff --git a/services/blocks/events/service.go b/services/blocks/events/service.go index 23d57c4..c369a45 100644 --- a/services/blocks/events/service.go +++ b/services/blocks/events/service.go @@ -77,7 +77,12 @@ func (s *Service) monitorEvents(ctx context.Context, delay := time.Since(s.chainTime.StartOfSlot(event.Slot)) // Ensure the node is synced. - syncingResponse, err := eventsProvider.(consensusclient.NodeSyncingProvider).NodeSyncing(ctx, &api.NodeSyncingOpts{}) + syncingProvider, ok := eventsProvider.(consensusclient.NodeSyncingProvider) + if !ok { + log.Error().Msg("Node syncing provider not supported") + return + } + syncingResponse, err := syncingProvider.NodeSyncing(ctx, &api.NodeSyncingOpts{}) if err != nil { log.Error().Err(err).Msg("Failed to ascertain if node is syncing") return diff --git a/services/heads/events/service.go b/services/heads/events/service.go index 04e84b1..80eae36 100644 --- a/services/heads/events/service.go +++ b/services/heads/events/service.go @@ -77,7 +77,12 @@ func (s *Service) monitorEvents(ctx context.Context, delay := time.Since(s.chainTime.StartOfSlot(event.Slot)) // Ensure the node is synced. - syncingResponse, err := eventsProvider.(consensusclient.NodeSyncingProvider).NodeSyncing(ctx, &api.NodeSyncingOpts{}) + syncingProvider, ok := eventsProvider.(consensusclient.NodeSyncingProvider) + if !ok { + log.Error().Msg("Node syncing provider not supported") + return + } + syncingResponse, err := syncingProvider.NodeSyncing(ctx, &api.NodeSyncingOpts{}) if err != nil { log.Error().Err(err).Msg("Failed to ascertain if node is syncing") return