diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index ae9e079..f40a426 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -33,20 +33,20 @@ func RunReorgValidator() { continue } - if endBlock == lastBlockCheck || endBlock-startBlock < 100 { - log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.") + if endBlock-startBlock < 100 { + log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.") time.Sleep(1 * time.Minute) continue } // Detect reorgs and handle them - err = detectAndHandleReorgs(startBlock, endBlock) + lastValidBlock, err := detectAndHandleReorgs(startBlock, endBlock) if err != nil { log.Error().Err(err).Msg("Failed to detect and handle reorgs") time.Sleep(2 * time.Second) continue } - lastBlockCheck = endBlock + lastBlockCheck = lastValidBlock } } @@ -91,21 +91,21 @@ func getLastValidBlock() (int64, error) { return lastValidBlock, nil } -func detectAndHandleReorgs(startBlock int64, endBlock int64) error { +func detectAndHandleReorgs(startBlock int64, endBlock int64) (int64, error) { log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock) // Fetch block headers for the range blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) if err != nil { - return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err) + return 0, fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err) } if len(blockHeaders) == 0 { log.Debug().Msg("detectAndHandleReorgs: No block headers found in range") - return nil + return 0, nil } - // finding the reorg start and end block + // 1) Block verification: find reorg range from header continuity (existing behavior) reorgStartBlock := int64(-1) reorgEndBlock := int64(-1) for i := 1; i < len(blockHeaders); i++ { @@ -131,20 +131,71 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { } // set end to the last block if not set + lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64() if reorgEndBlock == -1 { - reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64() + // No header-based end detected; default to the last header for last-valid-block tracking. + reorgEndBlock = lastHeaderBlock + } + + // 2) Transaction verification: check for mismatches between block.transaction_count + // and the number of transactions stored per block in ClickHouse. + txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) + if err != nil { + return 0, fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err) } + // 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse. + logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) + if err != nil { + return 0, fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err) + } + + // 4) Combine all ranges: + // - If all three ranges (blocks, tx, logs) are empty, then there is no reorg. + // - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range. + finalStart := int64(-1) + finalEnd := int64(-1) + + // block headers range if reorgStartBlock > -1 { - if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { - return err + finalStart = reorgStartBlock + finalEnd = reorgEndBlock + } + + // transactions range + if txStart > -1 { + if finalStart == -1 || txStart < finalStart { + finalStart = txStart + } + if finalEnd == -1 || txEnd > finalEnd { + finalEnd = txEnd } } - // update last valid block. if there was no reorg, this will update to the last block - libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock) + // logs range + if logsStart > -1 { + if finalStart == -1 || logsStart < finalStart { + finalStart = logsStart + } + if finalEnd == -1 || logsEnd > finalEnd { + finalEnd = logsEnd + } + } - return nil + lastValidBlock := lastHeaderBlock + if finalStart > -1 { + // We found at least one inconsistent range; reorg from min(start) to max(end). + if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil { + return 0, err + } + lastValidBlock = finalEnd + } + err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock) + if err != nil { + return 0, fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err) + } + + return lastValidBlock, nil } func handleReorgForRange(startBlock uint64, endBlock uint64) error { diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 699f0dc..b5419d7 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -44,6 +44,17 @@ var defaultTraceFields = []string{ "reward_type", "refund_address", } +type blockTxAggregate struct { + BlockNumber *big.Int `ch:"block_number"` + TxCount uint64 `ch:"tx_count"` +} + +type blockLogAggregate struct { + BlockNumber *big.Int `ch:"block_number"` + LogCount uint64 `ch:"log_count"` + MaxLogIndex uint64 `ch:"max_log_index"` +} + // only use this for backfill or getting old data. var ClickhouseConnV1 clickhouse.Conn @@ -229,20 +240,6 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl Msg("skipping block because chainId is nil") continue } - if blocksRaw[i].TransactionCount != uint64(len(transactionsRaw[i])) { - log.Info(). - Any("transactionCount", blocksRaw[i].TransactionCount). - Any("transactionsRaw", transactionsRaw[i]). - Msg("skipping block because transactionCount does not match") - continue - } - if (blocksRaw[i].LogsBloom != "" && blocksRaw[i].LogsBloom != EMPTY_LOGS_BLOOM) && len(logsRaw[i]) == 0 { - log.Info(). - Any("logsBloom", blocksRaw[i].LogsBloom). - Any("logsRaw", logsRaw[i]). - Msg("skipping block because logsBloom is not empty and logsRaw is empty") - continue - } blockData[i] = &common.BlockData{ Block: blocksRaw[i], Transactions: transactionsRaw[i], @@ -253,6 +250,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } +// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range, +// where the stored transaction_count in the blocks table does not match the number +// of transactions in the transactions table. It returns the minimum and maximum +// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent. +func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) { + if endBlockNumber < startBlockNumber { + return -1, -1, nil + } + + blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) + if err != nil { + return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err) + } + + // Aggregate transaction counts per block from the transactions table. + query := fmt.Sprintf( + "SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number", + config.Cfg.CommitterClickhouseDatabase, + chainId, + startBlockNumber, + endBlockNumber, + ) + + txAggRows, err := execQueryV2[blockTxAggregate](query) + if err != nil { + return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err) + } + + txCounts := make(map[uint64]uint64, len(txAggRows)) + for _, row := range txAggRows { + if row.BlockNumber == nil { + continue + } + txCounts[row.BlockNumber.Uint64()] = row.TxCount + } + + var mismatchStart int64 = -1 + var mismatchEnd int64 = -1 + + for _, block := range blocksRaw { + if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil { + continue + } + + bn := block.Number.Uint64() + expectedTxCount := block.TransactionCount + actualTxCount, hasTx := txCounts[bn] + + mismatch := false + if expectedTxCount == 0 { + // Header says no transactions; ensure there are none in the table. + if hasTx && actualTxCount > 0 { + mismatch = true + } + } else { + // Header says there should be transactions. + if !hasTx || actualTxCount != expectedTxCount { + mismatch = true + } + } + + if mismatch { + if mismatchStart == -1 || int64(bn) < mismatchStart { + mismatchStart = int64(bn) + } + if mismatchEnd == -1 || int64(bn) > mismatchEnd { + mismatchEnd = int64(bn) + } + } + } + + return mismatchStart, mismatchEnd, nil +} + +// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range, +// where logs in the logs table are inconsistent with the block's logs_bloom: +// - logsBloom is non-empty but there are no logs for that block +// - logsBloom is empty/zero but logs exist +// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist) +// It returns the minimum and maximum block numbers that have a mismatch, or +// (-1, -1) if all blocks are consistent. +func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) { + if endBlockNumber < startBlockNumber { + return -1, -1, nil + } + + blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) + if err != nil { + return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err) + } + + // Aggregate log counts and max log_index per block from the logs table. + query := fmt.Sprintf( + "SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number", + config.Cfg.CommitterClickhouseDatabase, + chainId, + startBlockNumber, + endBlockNumber, + ) + + logAggRows, err := execQueryV2[blockLogAggregate](query) + if err != nil { + return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err) + } + + logAggs := make(map[uint64]blockLogAggregate, len(logAggRows)) + for _, row := range logAggRows { + if row.BlockNumber == nil { + continue + } + bn := row.BlockNumber.Uint64() + logAggs[bn] = row + } + + var mismatchStart int64 = -1 + var mismatchEnd int64 = -1 + + for _, block := range blocksRaw { + if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil { + continue + } + + bn := block.Number.Uint64() + hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM + logAgg, hasLogAgg := logAggs[bn] + + mismatch := false + + if hasLogsBloom { + // logsBloom indicates logs should exist + if !hasLogAgg || logAgg.LogCount == 0 { + mismatch = true + } else if logAgg.MaxLogIndex+1 != logAgg.LogCount { + // log_index should be contiguous from 0..log_count-1 + mismatch = true + } + } else { + // logsBloom is empty/zero; there should be no logs + if hasLogAgg && logAgg.LogCount > 0 { + mismatch = true + } + } + + if mismatch { + if mismatchStart == -1 || int64(bn) < mismatchStart { + mismatchStart = int64(bn) + } + if mismatchEnd == -1 || int64(bn) > mismatchEnd { + mismatchEnd = int64(bn) + } + } + } + + return mismatchStart, mismatchEnd, nil +} + func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 70c1d54..3dabbc1 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -116,6 +116,7 @@ func (p *KafkaPublisher) PublishBlockData(blockData []*common.BlockData) error { } func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData, oldBlockData []*common.BlockData) error { + log.Debug().Int("new_block_data_count", len(newBlockData)).Int("old_block_data_count", len(oldBlockData)).Msg("PublishBlockDataReorg: Publishing block data reorg") if err := p.publishBlockData(oldBlockData, true, true); err != nil { return fmt.Errorf("failed to publish old block data: %v", err) } @@ -127,24 +128,6 @@ func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData, return nil } -func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error { - chainId := newData[0].Block.ChainId.Uint64() - newHead := uint64(newData[0].Block.Number.Uint64()) - // Publish revert the revert to the new head - 1, so that the new updated block data can be re-processed - if err := p.publishBlockRevert(chainId, newHead-1); err != nil { - return fmt.Errorf("failed to revert: %v", err) - } - - if err := p.publishBlockData(oldData, true, true); err != nil { - return fmt.Errorf("failed to publish old block data: %v", err) - } - - if err := p.publishBlockData(newData, false, true); err != nil { - return fmt.Errorf("failed to publish new block data: %v", err) - } - return nil -} - func (p *KafkaPublisher) Close() error { p.mu.Lock() defer p.mu.Unlock() @@ -219,27 +202,6 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re return nil } -func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) error { - publishStart := time.Now() - - // Prepare messages for blocks, events, transactions and traces - blockMessages := make([]*kgo.Record, 1) - - // Block message - if blockMsg, err := p.createBlockRevertMessage(chainId, blockNumber); err == nil { - blockMessages[0] = blockMsg - } else { - return fmt.Errorf("failed to create block revert message: %v", err) - } - - if err := p.publishMessages(context.Background(), blockMessages); err != nil { - return fmt.Errorf("failed to publish block revert messages: %v", err) - } - - log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) - return nil -} - func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool, isReorg bool) error { if len(blockData) == 0 { return nil @@ -306,31 +268,19 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet return nil, fmt.Errorf("failed to marshal block data: %v", err) } - return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) -} - -func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber uint64) (*kgo.Record, error) { - timestamp := time.Now() - - data := PublishableMessageRevert{ - ChainId: chainId, - BlockNumber: blockNumber, - IsDeleted: 0, - InsertTimestamp: timestamp, + if isReorg { + log.Debug(). + Uint64("chain_id", data.ChainId). + Uint64("block_number", block.Block.Number.Uint64()). + Int("tx_count", len(block.Transactions)). + Int("log_count", len(block.Logs)). + Int("trace_count", len(block.Traces)). + Bool("is_deleted", isDeleted). + Bool("is_reorg", isReorg). + Msg("KafkaPublisher Message Reorg: Block metadata") } - msg := PublishableMessagePayload{ - Data: data, - Type: data.GetType(), - Timestamp: timestamp, - } - - msgJson, err := json.Marshal(msg) - if err != nil { - return nil, fmt.Errorf("failed to marshal block data: %v", err) - } - - return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, false, false, msgJson) + return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) } func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, isDeleted bool, isReorg bool, msgJson []byte) (*kgo.Record, error) {