Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ type Config struct {
CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"`
CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"`
CommitterLagByBlocks uint64 `env:"COMMITTER_LAG_BY_BLOCKS" envDefault:"0"`
// CommitterStartBlock, when set (>0), forces the committer to start publishing
// from this block number regardless of what ClickHouse says is already committed.
// This can cause duplicate publishing if ClickHouse already contains higher blocks.
CommitterStartBlock uint64 `env:"COMMITTER_START_BLOCK" envDefault:"0"`
StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"`
StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"`
StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"`
Expand Down
14 changes: 14 additions & 0 deletions internal/committer/committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,20 @@ func getLastTrackedBlockNumberAndBlockRangesFromS3() (int64, []types.BlockRange,
}
log.Debug().Int64("max_block_number", maxBlockNumber).Msg("Retrieved max block number from ClickHouse.(-1 means nothing committed yet, start from 0)")

// Optional override: force the committer to start from a specific block number.
// We implement this by pretending ClickHouse max is (startBlock - 1), so both S3
// range scanning and live RPC polling begin at startBlock.
if config.Cfg.CommitterStartBlock > 0 {
overrideMax := int64(config.Cfg.CommitterStartBlock) - 1
if maxBlockNumber < overrideMax {
maxBlockNumber = overrideMax
log.Info().
Int64("clickhouse_max_block", maxBlockNumber).
Uint64("override_start_block", config.Cfg.CommitterStartBlock).
Msg("CommitterStartBlock override enabled; starting earlier than ClickHouse cursor")
}
}

blockRanges, err := libs.GetBlockRangesFromS3(maxBlockNumber)
if err != nil {
log.Error().Err(err).Msg("Failed to get block ranges from S3")
Expand Down
Loading