Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 65 additions & 14 deletions internal/committer/reorg.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,20 @@ func RunReorgValidator() {
continue
}

if endBlock == lastBlockCheck || endBlock-startBlock < 100 {
log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.")
if endBlock-startBlock < 100 {
log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.")
time.Sleep(1 * time.Minute)
continue
}

// Detect reorgs and handle them
err = detectAndHandleReorgs(startBlock, endBlock)
lastValidBlock, err := detectAndHandleReorgs(startBlock, endBlock)
if err != nil {
log.Error().Err(err).Msg("Failed to detect and handle reorgs")
time.Sleep(2 * time.Second)
continue
}
lastBlockCheck = endBlock
lastBlockCheck = lastValidBlock
}
}

Expand Down Expand Up @@ -91,21 +91,21 @@ func getLastValidBlock() (int64, error) {
return lastValidBlock, nil
}

func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
func detectAndHandleReorgs(startBlock int64, endBlock int64) (int64, error) {
log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock)

// Fetch block headers for the range
blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
return 0, fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
}

if len(blockHeaders) == 0 {
log.Debug().Msg("detectAndHandleReorgs: No block headers found in range")
return nil
return 0, nil
}
Comment on lines +94 to 106
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Harden header handling against nils and decouple header-based reorgs from ClickHouse verification failures

Overall structure of the new multi-source reorg detection looks solid, and handling the error from SetReorgLastValidBlock is a good improvement. There are, however, a couple of important robustness gaps in this function:

  1. Potential panic on nil headers from GetBlockHeadersForReorgCheck

Per the provided context, GetBlockHeadersForReorgCheck can return a []*common.Block "with nils for missing entries if any". The continuity loop and lastHeaderBlock assignment assume that every element is non-nil:

  • blockHeaders[i].Number.Int64() and blockHeaders[i-1].Number.Int64() (Lines 112–115, 121, 127)
  • lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64() (Line 134)

If any entry is nil, this will panic and take down the validator. At minimum, you should defensively guard against nil entries before dereferencing:

 blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
 if err != nil {
   return 0, fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err)
 }
 
 if len(blockHeaders) == 0 {
   log.Debug().Msg("detectAndHandleReorgs: No block headers found in range")
   return 0, nil
 }
+
+ // Fail fast if any header in the range is unexpectedly nil to avoid panics.
+ for i, h := range blockHeaders {
+   if h == nil {
+     return 0, fmt.Errorf("detectAndHandleReorgs: nil header at index %d (start=%d end=%d)", i, startBlock, endBlock)
+   }
+ }
@@
- // set end to the last block if not set
- lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()
+ // set end to the last block if not set
+ lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()

If nils are actually expected in normal operation, you may instead want to skip those entries or treat them as mismatches rather than returning an error, but you should still avoid dereferencing them.

  1. ClickHouse mismatch queries currently block header-based reorg handling

Right now any failure from GetTransactionMismatchRangeFromClickHouseV2 or GetLogsMismatchRangeFromClickHouseV2 aborts the whole function:

txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(...)
if err != nil {
    return 0, fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
}
...
logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(...)
if err != nil {
    return 0, fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
}

This couples correctness to ClickHouse health even when header continuity has already detected a clear reorg range (reorgStartBlock > -1). A transient failure in either mismatch query will prevent you from handling a header-based reorg at all.

Consider treating these mismatch checks as best-effort signals and falling back to header-only behavior when they fail, for example:

 txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
 if err != nil {
-    return 0, fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
+    log.Error().Err(err).
+        Int64("start_block", startBlock).
+        Int64("end_block", endBlock).
+        Msg("detectAndHandleReorgs: transaction verification failed; continuing with other signals")
+    txStart, txEnd = -1, -1
 }
@@
 logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
 if err != nil {
-    return 0, fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
+    log.Error().Err(err).
+        Int64("start_block", startBlock).
+        Int64("end_block", endBlock).
+        Msg("detectAndHandleReorgs: logs verification failed; continuing with other signals")
+    logsStart, logsEnd = -1, -1
 }

This way:

  • If headers clearly show a reorg, you still reorg even when the ClickHouse aggregate queries are unhealthy.
  • When ClickHouse is healthy, you still get the stronger "combined" detection window via the tx/log ranges.
  1. Semantics when len(blockHeaders) == 0

Currently the "no headers found" path returns (0, nil) and never updates Redis:

if len(blockHeaders) == 0 {
    log.Debug().Msg("detectAndHandleReorgs: No block headers found in range")
    return 0, nil
}

Given the caller only uses the returned lastValidBlock for logging, this is not a correctness bug, but it does mean:

  • last_block_check will be logged as 0 for that iteration.
  • getLastValidBlock will continue to drive the window based on the previous Redis value or the "1 day ago" fallback.

If this "no headers in range" case is expected to happen in practice (e.g., sparse data or lag), you might want to either:

  • Return an error so it is clearly visible and handled as a failure, or
  • Return a documented sentinel (e.g., startBlock-1) and handle that in the caller instead of overloading 0.

Also applies to: 108-138, 140-152, 153-183, 185-198


// finding the reorg start and end block
// 1) Block verification: find reorg range from header continuity (existing behavior)
reorgStartBlock := int64(-1)
reorgEndBlock := int64(-1)
for i := 1; i < len(blockHeaders); i++ {
Expand All @@ -131,20 +131,71 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error {
}

// set end to the last block if not set
lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64()
if reorgEndBlock == -1 {
reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64()
// No header-based end detected; default to the last header for last-valid-block tracking.
reorgEndBlock = lastHeaderBlock
}

// 2) Transaction verification: check for mismatches between block.transaction_count
// and the number of transactions stored per block in ClickHouse.
txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return 0, fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err)
}

// 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse.
logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock))
if err != nil {
return 0, fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err)
}

// 4) Combine all ranges:
// - If all three ranges (blocks, tx, logs) are empty, then there is no reorg.
// - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range.
finalStart := int64(-1)
finalEnd := int64(-1)

// block headers range
if reorgStartBlock > -1 {
if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil {
return err
finalStart = reorgStartBlock
finalEnd = reorgEndBlock
}

// transactions range
if txStart > -1 {
if finalStart == -1 || txStart < finalStart {
finalStart = txStart
}
if finalEnd == -1 || txEnd > finalEnd {
finalEnd = txEnd
}
}

// update last valid block. if there was no reorg, this will update to the last block
libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock)
// logs range
if logsStart > -1 {
if finalStart == -1 || logsStart < finalStart {
finalStart = logsStart
}
if finalEnd == -1 || logsEnd > finalEnd {
finalEnd = logsEnd
}
}

return nil
lastValidBlock := lastHeaderBlock
if finalStart > -1 {
// We found at least one inconsistent range; reorg from min(start) to max(end).
if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil {
return 0, err
}
lastValidBlock = finalEnd
}
err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock)
if err != nil {
return 0, fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err)
}

return lastValidBlock, nil
}

func handleReorgForRange(startBlock uint64, endBlock uint64) error {
Expand Down
181 changes: 167 additions & 14 deletions internal/libs/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ var defaultTraceFields = []string{
"reward_type", "refund_address",
}

type blockTxAggregate struct {
BlockNumber *big.Int `ch:"block_number"`
TxCount uint64 `ch:"tx_count"`
}

type blockLogAggregate struct {
BlockNumber *big.Int `ch:"block_number"`
LogCount uint64 `ch:"log_count"`
MaxLogIndex uint64 `ch:"max_log_index"`
}

// only use this for backfill or getting old data.
var ClickhouseConnV1 clickhouse.Conn

Expand Down Expand Up @@ -229,20 +240,6 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
Msg("skipping block because chainId is nil")
continue
}
if blocksRaw[i].TransactionCount != uint64(len(transactionsRaw[i])) {
log.Info().
Any("transactionCount", blocksRaw[i].TransactionCount).
Any("transactionsRaw", transactionsRaw[i]).
Msg("skipping block because transactionCount does not match")
continue
}
if (blocksRaw[i].LogsBloom != "" && blocksRaw[i].LogsBloom != EMPTY_LOGS_BLOOM) && len(logsRaw[i]) == 0 {
log.Info().
Any("logsBloom", blocksRaw[i].LogsBloom).
Any("logsRaw", logsRaw[i]).
Msg("skipping block because logsBloom is not empty and logsRaw is empty")
continue
}
blockData[i] = &common.BlockData{
Block: blocksRaw[i],
Transactions: transactionsRaw[i],
Expand All @@ -253,6 +250,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl
return blockData, nil
}

// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where the stored transaction_count in the blocks table does not match the number
// of transactions in the transactions table. It returns the minimum and maximum
// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent.
func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
if endBlockNumber < startBlockNumber {
return -1, -1, nil
}

blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
if err != nil {
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
}

// Aggregate transaction counts per block from the transactions table.
query := fmt.Sprintf(
"SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)

txAggRows, err := execQueryV2[blockTxAggregate](query)
if err != nil {
return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err)
}

txCounts := make(map[uint64]uint64, len(txAggRows))
for _, row := range txAggRows {
if row.BlockNumber == nil {
continue
}
txCounts[row.BlockNumber.Uint64()] = row.TxCount
}

var mismatchStart int64 = -1
var mismatchEnd int64 = -1

for _, block := range blocksRaw {
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
continue
}

bn := block.Number.Uint64()
expectedTxCount := block.TransactionCount
actualTxCount, hasTx := txCounts[bn]

mismatch := false
if expectedTxCount == 0 {
// Header says no transactions; ensure there are none in the table.
if hasTx && actualTxCount > 0 {
mismatch = true
}
} else {
// Header says there should be transactions.
if !hasTx || actualTxCount != expectedTxCount {
mismatch = true
}
}

if mismatch {
if mismatchStart == -1 || int64(bn) < mismatchStart {
mismatchStart = int64(bn)
}
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
mismatchEnd = int64(bn)
}
}
}

return mismatchStart, mismatchEnd, nil
}

// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range,
// where logs in the logs table are inconsistent with the block's logs_bloom:
// - logsBloom is non-empty but there are no logs for that block
// - logsBloom is empty/zero but logs exist
// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist)
// It returns the minimum and maximum block numbers that have a mismatch, or
// (-1, -1) if all blocks are consistent.
func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) {
if endBlockNumber < startBlockNumber {
return -1, -1, nil
}

blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber)
if err != nil {
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err)
}

// Aggregate log counts and max log_index per block from the logs table.
query := fmt.Sprintf(
"SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number",
config.Cfg.CommitterClickhouseDatabase,
chainId,
startBlockNumber,
endBlockNumber,
)

logAggRows, err := execQueryV2[blockLogAggregate](query)
if err != nil {
return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err)
}

logAggs := make(map[uint64]blockLogAggregate, len(logAggRows))
for _, row := range logAggRows {
if row.BlockNumber == nil {
continue
}
bn := row.BlockNumber.Uint64()
logAggs[bn] = row
}

var mismatchStart int64 = -1
var mismatchEnd int64 = -1

for _, block := range blocksRaw {
if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil {
continue
}

bn := block.Number.Uint64()
hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM
logAgg, hasLogAgg := logAggs[bn]

mismatch := false

if hasLogsBloom {
// logsBloom indicates logs should exist
if !hasLogAgg || logAgg.LogCount == 0 {
mismatch = true
} else if logAgg.MaxLogIndex+1 != logAgg.LogCount {
// log_index should be contiguous from 0..log_count-1
mismatch = true
}
} else {
// logsBloom is empty/zero; there should be no logs
if hasLogAgg && logAgg.LogCount > 0 {
mismatch = true
}
}

if mismatch {
if mismatchStart == -1 || int64(bn) < mismatchStart {
mismatchStart = int64(bn)
}
if mismatchEnd == -1 || int64(bn) > mismatchEnd {
mismatchEnd = int64(bn)
}
}
}

return mismatchStart, mismatchEnd, nil
}

func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) {
sb := startBlockNumber
length := endBlockNumber - startBlockNumber + 1
Expand Down
Loading