diff --git a/configs/config.go b/configs/config.go index 35f4731..2c05606 100644 --- a/configs/config.go +++ b/configs/config.go @@ -62,6 +62,10 @@ type Config struct { CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"` + // CommitterStartBlock, when set (>0), forces the committer to start publishing + // from this block number regardless of what ClickHouse says is already committed. + // This can cause duplicate publishing if ClickHouse already contains higher blocks. + CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"` StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` diff --git a/internal/committer/committer.go b/internal/committer/committer.go index 0c99808..90726bb 100644 --- a/internal/committer/committer.go +++ b/internal/committer/committer.go @@ -104,6 +104,20 @@ func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange, } log.Debug().Int64("max_block_number", maxBlockNumber).Msg("Retrieved max block number from ClickHouse.(-1 means nothing committed yet, start from 0)") + // Optional override: force the committer to start from a specific block number. + // We implement this by pretending ClickHouse max is (startBlock - 1), so both S3 + // range scanning and live RPC polling begin at startBlock. + if config.Cfg.CommitterStartBlock > 0 { + overrideMax := int64(config.Cfg.CommitterStartBlock) - 1 + if maxBlockNumber < overrideMax { + maxBlockNumber = overrideMax + log.Info(). + Int64("clickhouse_max_block", maxBlockNumber). + Uint64("override_start_block", config.Cfg.CommitterStartBlock). + Msg("CommitterStartBlock override enabled; starting earlier than ClickHouse cursor") + } + } + blockRanges, err := libs.GetBlockRangesFromS3(maxBlockNumber) if err != nil { log.Error().Err(err).Msg("Failed to get block ranges from S3")