From 53f1895f07ce177d7a55b4908ede80183a97350e Mon Sep 17 00:00:00 2001 From: nischit Date: Tue, 23 Dec 2025 00:00:19 +0545 Subject: [PATCH 1/2] committer start block --- cmd/committer.go | 12 ++++++++++++ configs/config.go | 4 ++++ internal/committer/committer.go | 14 ++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/cmd/committer.go b/cmd/committer.go index eef0bba..75beec3 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -7,9 +7,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" "github.com/spf13/cobra" + config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/committer" ) +var committerStartBlock uint64 + var committerCmd = &cobra.Command{ Use: "committer", Short: "run committer", @@ -17,9 +20,18 @@ var committerCmd = &cobra.Command{ Run: RunCommitter, } +func init() { + committerCmd.Flags().Uint64Var(&committerStartBlock, "start-block", 0, "start committing from this block number (overrides ClickHouse max block when > 0)") +} + func RunCommitter(cmd *cobra.Command, args []string) { fmt.Println("running committer") + if committerStartBlock > 0 { + config.Cfg.CommitterStartBlock = committerStartBlock + log.Info().Uint64("start_block", committerStartBlock).Msg("Committer start block override enabled") + } + // Start Prometheus metrics server log.Info().Msg("Starting Metrics Server on port 2112") go func() { diff --git a/configs/config.go b/configs/config.go index 35f4731..2c05606 100644 --- a/configs/config.go +++ b/configs/config.go @@ -62,6 +62,10 @@ type Config struct { CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"` + // CommitterStartBlock, when set (>0), forces the committer to start publishing + // from this block number regardless of what ClickHouse says is already committed. + // This can cause duplicate publishing if ClickHouse already contains higher blocks. + CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"` StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 0c99808..90726bb 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -104,6 +104,20 @@ func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange, } log.Debug().Int64("max_block_number", maxBlockNumber).Msg("Retrieved max block number from ClickHouse.(-1 means nothing committed yet, start from 0)") + // Optional override: force the committer to start from a specific block number. + // We implement this by pretending ClickHouse max is (startBlock - 1), so both S3 + // range scanning and live RPC polling begin at startBlock. + if config.Cfg.CommitterStartBlock > 0 { + overrideMax := int64(config.Cfg.CommitterStartBlock) - 1 + if maxBlockNumber < overrideMax { + maxBlockNumber = overrideMax + log.Info(). + Int64("clickhouse_max_block", maxBlockNumber). + Uint64("override_start_block", config.Cfg.CommitterStartBlock). + Msg("CommitterStartBlock override enabled; starting earlier than ClickHouse cursor") + } + } + blockRanges, err := libs.GetBlockRangesFromS3(maxBlockNumber) if err != nil { log.Error().Err(err).Msg("Failed to get block ranges from S3") From 857101013aef0af66e37bebfd832f3be3774e176 Mon Sep 17 00:00:00 2001 From: nischit Date: Tue, 23 Dec 2025 00:01:59 +0545 Subject: [PATCH 2/2] minor change --- cmd/committer.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/cmd/committer.go b/cmd/committer.go index 75beec3..eef0bba 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -7,12 +7,9 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog/log" "github.com/spf13/cobra" - config "github.com/thirdweb-dev/indexer/configs" "github.com/thirdweb-dev/indexer/internal/committer" ) -var committerStartBlock uint64 - var committerCmd = &cobra.Command{ Use: "committer", Short: "run committer", @@ -20,18 +17,9 @@ var committerCmd = &cobra.Command{ Run: RunCommitter, } -func init() { - committerCmd.Flags().Uint64Var(&committerStartBlock, "start-block", 0, "start committing from this block number (overrides ClickHouse max block when > 0)") -} - func RunCommitter(cmd *cobra.Command, args []string) { fmt.Println("running committer") - if committerStartBlock > 0 { - config.Cfg.CommitterStartBlock = committerStartBlock - log.Info().Uint64("start_block", committerStartBlock).Msg("Committer start block override enabled") - } - // Start Prometheus metrics server log.Info().Msg("Starting Metrics Server on port 2112") go func() {