From 2a808a8bbe13307ea9e505cf94d99faed58fd978 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 17:13:06 +0545 Subject: [PATCH 1/9] reorg handles transaction/logs count --- internal/committer/reorg.go | 62 +++++++++++-- internal/libs/clickhouse.go | 167 ++++++++++++++++++++++++++++++++++++ 2 files changed, 223 insertions(+), 6 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index ae9e079..e56828e 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -105,7 +105,7 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { return nil } - // finding the reorg start and end block + // 1) Block verification: find reorg range from header continuity (existing behavior) reorgStartBlock := int64(-1) reorgEndBlock := int64(-1) for i := 1; i < len(blockHeaders); i++ { @@ -131,18 +131,68 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { } // set end to the last block if not set + lastHeaderBlock := blockHeaders[len(blockHeaders)-1].Number.Int64() if reorgEndBlock == -1 { - reorgEndBlock = blockHeaders[len(blockHeaders)-1].Number.Int64() + // No header-based end detected; default to the last header for last-valid-block tracking. + reorgEndBlock = lastHeaderBlock } + // 2) Transaction verification: check for mismatches between block.transaction_count + // and the number of transactions stored per block in ClickHouse. + txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) + if err != nil { + return fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err) + } + + // 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse. + logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) + if err != nil { + return fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err) + } + + // 4) Combine all ranges: + // - If all three ranges (blocks, tx, logs) are empty, then there is no reorg. + // - Otherwise, take min(start) and max(end) across all non-empty ranges as the final reorg range. + finalStart := int64(-1) + finalEnd := int64(-1) + + // block headers range if reorgStartBlock > -1 { - if err := handleReorgForRange(uint64(reorgStartBlock), uint64(reorgEndBlock)); err != nil { - return err + finalStart = reorgStartBlock + finalEnd = reorgEndBlock + } + + // transactions range + if txStart > -1 { + if finalStart == -1 || txStart < finalStart { + finalStart = txStart + } + if finalEnd == -1 || txEnd > finalEnd { + finalEnd = txEnd } } - // update last valid block. if there was no reorg, this will update to the last block - libs.SetReorgLastValidBlock(libs.ChainIdStr, reorgEndBlock) + // logs range + if logsStart > -1 { + if finalStart == -1 || logsStart < finalStart { + finalStart = logsStart + } + if finalEnd == -1 || logsEnd > finalEnd { + finalEnd = logsEnd + } + } + + if finalStart > -1 { + // We found at least one inconsistent range; reorg from min(start) to max(end). + if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil { + return err + } + libs.SetReorgLastValidBlock(libs.ChainIdStr, finalEnd) + } else { + // No inconsistencies across blocks, transactions, or logs; mark the last checked + // header block as the last valid block. + libs.SetReorgLastValidBlock(libs.ChainIdStr, lastHeaderBlock) + } return nil } diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 699f0dc..3aa33ef 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -44,6 +44,17 @@ var defaultTraceFields = []string{ "reward_type", "refund_address", } +type blockTxAggregate struct { + BlockNumber *big.Int `ch:"block_number"` + TxCount uint64 `ch:"tx_count"` +} + +type blockLogAggregate struct { + BlockNumber *big.Int `ch:"block_number"` + LogCount uint64 `ch:"log_count"` + MaxLogIndex uint64 `ch:"max_log_index"` +} + // only use this for backfill or getting old data. var ClickhouseConnV1 clickhouse.Conn @@ -253,6 +264,162 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl return blockData, nil } +// GetTransactionMismatchRangeFromClickHouseV2 checks, for blocks in the given range, +// where the stored transaction_count in the blocks table does not match the number +// of transactions in the transactions table. It returns the minimum and maximum +// block numbers that have a mismatch, or (-1, -1) if all blocks are consistent. +func GetTransactionMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) { + if endBlockNumber < startBlockNumber { + return -1, -1, nil + } + + blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) + if err != nil { + return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load blocks: %w", err) + } + + // Aggregate transaction counts per block from the transactions table. + query := fmt.Sprintf( + "SELECT block_number, count() AS tx_count FROM %s.transactions FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number", + config.Cfg.CommitterClickhouseDatabase, + chainId, + startBlockNumber, + endBlockNumber, + ) + + txAggRows, err := execQueryV2[blockTxAggregate](query) + if err != nil { + return -1, -1, fmt.Errorf("GetTransactionMismatchRangeFromClickHouseV2: failed to load tx aggregates: %w", err) + } + + txCounts := make(map[uint64]uint64, len(txAggRows)) + for _, row := range txAggRows { + if row.BlockNumber == nil { + continue + } + txCounts[row.BlockNumber.Uint64()] = row.TxCount + } + + var mismatchStart int64 = -1 + var mismatchEnd int64 = -1 + + for _, block := range blocksRaw { + if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil { + continue + } + + bn := block.Number.Uint64() + expectedTxCount := block.TransactionCount + actualTxCount, hasTx := txCounts[bn] + + mismatch := false + if expectedTxCount == 0 { + // Header says no transactions; ensure there are none in the table. + if hasTx && actualTxCount > 0 { + mismatch = true + } + } else { + // Header says there should be transactions. + if !hasTx || actualTxCount != expectedTxCount { + mismatch = true + } + } + + if mismatch { + if mismatchStart == -1 || int64(bn) < mismatchStart { + mismatchStart = int64(bn) + } + if mismatchEnd == -1 || int64(bn) > mismatchEnd { + mismatchEnd = int64(bn) + } + } + } + + return mismatchStart, mismatchEnd, nil +} + +// GetLogsMismatchRangeFromClickHouseV2 checks, for blocks in the given range, +// where logs in the logs table are inconsistent with the block's logs_bloom: +// - logsBloom is non-empty but there are no logs for that block +// - logsBloom is empty/zero but logs exist +// - log indexes are not contiguous (count(*) != max(log_index)+1 when logs exist) +// It returns the minimum and maximum block numbers that have a mismatch, or +// (-1, -1) if all blocks are consistent. +func GetLogsMismatchRangeFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) (int64, int64, error) { + if endBlockNumber < startBlockNumber { + return -1, -1, nil + } + + blocksRaw, err := getBlocksFromV2(chainId, startBlockNumber, endBlockNumber) + if err != nil { + return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load blocks: %w", err) + } + + // Aggregate log counts and max log_index per block from the logs table. + query := fmt.Sprintf( + "SELECT block_number, count() AS log_count, max(log_index) AS max_log_index FROM %s.logs FINAL WHERE chain_id = %d AND block_number BETWEEN %d AND %d GROUP BY block_number ORDER BY block_number", + config.Cfg.CommitterClickhouseDatabase, + chainId, + startBlockNumber, + endBlockNumber, + ) + + logAggRows, err := execQueryV2[blockLogAggregate](query) + if err != nil { + return -1, -1, fmt.Errorf("GetLogsMismatchRangeFromClickHouseV2: failed to load log aggregates: %w", err) + } + + logAggs := make(map[uint64]blockLogAggregate, len(logAggRows)) + for _, row := range logAggRows { + if row.BlockNumber == nil { + continue + } + bn := row.BlockNumber.Uint64() + logAggs[bn] = row + } + + var mismatchStart int64 = -1 + var mismatchEnd int64 = -1 + + for _, block := range blocksRaw { + if block.ChainId == nil || block.ChainId.Uint64() == 0 || block.Number == nil { + continue + } + + bn := block.Number.Uint64() + hasLogsBloom := block.LogsBloom != "" && block.LogsBloom != EMPTY_LOGS_BLOOM + logAgg, hasLogAgg := logAggs[bn] + + mismatch := false + + if hasLogsBloom { + // logsBloom indicates logs should exist + if !hasLogAgg || logAgg.LogCount == 0 { + mismatch = true + } else if logAgg.MaxLogIndex+1 != logAgg.LogCount { + // log_index should be contiguous from 0..log_count-1 + mismatch = true + } + } else { + // logsBloom is empty/zero; there should be no logs + if hasLogAgg && logAgg.LogCount > 0 { + mismatch = true + } + } + + if mismatch { + if mismatchStart == -1 || int64(bn) < mismatchStart { + mismatchStart = int64(bn) + } + if mismatchEnd == -1 || int64(bn) > mismatchEnd { + mismatchEnd = int64(bn) + } + } + } + + return mismatchStart, mismatchEnd, nil +} + func getBlocksFromV2(chainId uint64, startBlockNumber uint64, endBlockNumber uint64) ([]common.Block, error) { sb := startBlockNumber length := endBlockNumber - startBlockNumber + 1 From 5a75c644968742c3bcdbdbf07d031fc8ae846238 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 17:39:41 +0545 Subject: [PATCH 2/9] minor change --- internal/committer/reorg.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index e56828e..c235544 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -182,16 +182,17 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { } } + lastValidBlock := lastHeaderBlock if finalStart > -1 { // We found at least one inconsistent range; reorg from min(start) to max(end). if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil { return err } - libs.SetReorgLastValidBlock(libs.ChainIdStr, finalEnd) - } else { - // No inconsistencies across blocks, transactions, or logs; mark the last checked - // header block as the last valid block. - libs.SetReorgLastValidBlock(libs.ChainIdStr, lastHeaderBlock) + lastValidBlock = finalEnd + } + err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock) + if err != nil { + return fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err) } return nil From a8a23b7e464087287eff254fcc8b2729cfd69023 Mon Sep 17 00:00:00 2001 From: Nischit Prasad Nhuchhe Pradhan Date: Thu, 11 Dec 2025 19:00:39 +0545 Subject: [PATCH 3/9] Np/debug reorg (#311) --- internal/committer/reorg.go | 4 ++-- internal/libs/clickhouse.go | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index c235544..9e4fc4b 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -33,8 +33,8 @@ func RunReorgValidator() { continue } - if endBlock == lastBlockCheck || endBlock-startBlock < 100 { - log.Debug().Msg("Not enough new blocks to check. Sleeping for 1 minute.") + if endBlock-startBlock < 100 { + log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.") time.Sleep(1 * time.Minute) continue } diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index 3aa33ef..d5f1076 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -243,14 +243,12 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl if blocksRaw[i].TransactionCount != uint64(len(transactionsRaw[i])) { log.Info(). Any("transactionCount", blocksRaw[i].TransactionCount). - Any("transactionsRaw", transactionsRaw[i]). Msg("skipping block because transactionCount does not match") continue } if (blocksRaw[i].LogsBloom != "" && blocksRaw[i].LogsBloom != EMPTY_LOGS_BLOOM) && len(logsRaw[i]) == 0 { log.Info(). Any("logsBloom", blocksRaw[i].LogsBloom). - Any("logsRaw", logsRaw[i]). Msg("skipping block because logsBloom is not empty and logsRaw is empty") continue } From 6a68b0f078c6d6d3fd92046e28079fac69654696 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 19:31:57 +0545 Subject: [PATCH 4/9] log kafka message --- cmd/committer.go | 4 +-- internal/libs/redis.go | 2 +- internal/storage/kafka_publisher.go | 56 ++++++++--------------------- 3 files changed, 17 insertions(+), 45 deletions(-) diff --git a/cmd/committer.go b/cmd/committer.go index eef0bba..34121d3 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -32,6 +32,6 @@ func RunCommitter(cmd *cobra.Command, args []string) { committer.Init() committer.InitReorg() - go committer.RunReorgValidator() - committer.CommitStreaming() + committer.RunReorgValidator() + // committer.CommitStreaming() } diff --git a/internal/libs/redis.go b/internal/libs/redis.go index 6827282..b612c01 100644 --- a/internal/libs/redis.go +++ b/internal/libs/redis.go @@ -12,7 +12,7 @@ import ( var RedisClient *redis.Client -const RedisReorgLastValidBlock = "reorg_last_valid" +const RedisReorgLastValidBlock = "reorg_last_valid_debug" // InitRedis initializes the Redis client func InitRedis() { diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 70c1d54..d3b0ce8 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -127,24 +127,6 @@ func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData, return nil } -func (p *KafkaPublisher) PublishReorg(oldData []*common.BlockData, newData []*common.BlockData) error { - chainId := newData[0].Block.ChainId.Uint64() - newHead := uint64(newData[0].Block.Number.Uint64()) - // Publish revert the revert to the new head - 1, so that the new updated block data can be re-processed - if err := p.publishBlockRevert(chainId, newHead-1); err != nil { - return fmt.Errorf("failed to revert: %v", err) - } - - if err := p.publishBlockData(oldData, true, true); err != nil { - return fmt.Errorf("failed to publish old block data: %v", err) - } - - if err := p.publishBlockData(newData, false, true); err != nil { - return fmt.Errorf("failed to publish new block data: %v", err) - } - return nil -} - func (p *KafkaPublisher) Close() error { p.mu.Lock() defer p.mu.Unlock() @@ -219,27 +201,6 @@ func (p *KafkaPublisher) publishMessages(ctx context.Context, messages []*kgo.Re return nil } -func (p *KafkaPublisher) publishBlockRevert(chainId uint64, blockNumber uint64) error { - publishStart := time.Now() - - // Prepare messages for blocks, events, transactions and traces - blockMessages := make([]*kgo.Record, 1) - - // Block message - if blockMsg, err := p.createBlockRevertMessage(chainId, blockNumber); err == nil { - blockMessages[0] = blockMsg - } else { - return fmt.Errorf("failed to create block revert message: %v", err) - } - - if err := p.publishMessages(context.Background(), blockMessages); err != nil { - return fmt.Errorf("failed to publish block revert messages: %v", err) - } - - log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) - return nil -} - func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDeleted bool, isReorg bool) error { if len(blockData) == 0 { return nil @@ -270,9 +231,10 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet return nil } - if err := p.publishMessages(context.Background(), blockMessages); err != nil { - return fmt.Errorf("failed to publish block messages: %v", err) - } + // test code, uncomment later + // if err := p.publishMessages(context.Background(), blockMessages); err != nil { + // return fmt.Errorf("failed to publish block messages: %v", err) + // } log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) return nil @@ -306,6 +268,16 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet return nil, fmt.Errorf("failed to marshal block data: %v", err) } + log.Debug(). + Uint64("chain_id", data.ChainId). + Uint64("block_number", block.Block.Number.Uint64()). + Int("tx_count", len(block.Transactions)). + Int("log_count", len(block.Logs)). + Int("trace_count", len(block.Traces)). + Bool("is_deleted", isDeleted). + Bool("is_reorg", isReorg). + Msg("KafkaPublisher Message: Block metadata") + return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) } From 9309415ccb0d2f6c40ea3216bf9215ac560e8443 Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 21:17:18 +0545 Subject: [PATCH 5/9] remove get block from clickhouse verificatino --- internal/committer/reorg.go | 13 +++++++++++++ internal/libs/clickhouse.go | 12 ------------ internal/storage/kafka_publisher.go | 1 + 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 9e4fc4b..1975b97 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -225,6 +225,19 @@ func handleReorgForRange(startBlock uint64, endBlock uint64) error { return fmt.Errorf("handleReorgForRange: failed to get old block data: %w", err) } + nonNilOldBlocks := 0 + for _, bd := range oldblockDataArray { + if bd != nil { + nonNilOldBlocks++ + } + } + log.Debug(). + Uint64("start_block", startBlock). + Uint64("end_block", endBlock). + Int("requested_old_blocks", len(oldblockDataArray)). + Int("non_nil_old_blocks", nonNilOldBlocks). + Msg("handleReorgForRange: loaded old block data from ClickHouse") + if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil { log.Error(). Err(err). diff --git a/internal/libs/clickhouse.go b/internal/libs/clickhouse.go index d5f1076..b5419d7 100644 --- a/internal/libs/clickhouse.go +++ b/internal/libs/clickhouse.go @@ -240,18 +240,6 @@ func GetBlockDataFromClickHouseV2(chainId uint64, startBlockNumber uint64, endBl Msg("skipping block because chainId is nil") continue } - if blocksRaw[i].TransactionCount != uint64(len(transactionsRaw[i])) { - log.Info(). - Any("transactionCount", blocksRaw[i].TransactionCount). - Msg("skipping block because transactionCount does not match") - continue - } - if (blocksRaw[i].LogsBloom != "" && blocksRaw[i].LogsBloom != EMPTY_LOGS_BLOOM) && len(logsRaw[i]) == 0 { - log.Info(). - Any("logsBloom", blocksRaw[i].LogsBloom). - Msg("skipping block because logsBloom is not empty and logsRaw is empty") - continue - } blockData[i] = &common.BlockData{ Block: blocksRaw[i], Transactions: transactionsRaw[i], diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index d3b0ce8..ad0dab9 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -116,6 +116,7 @@ func (p *KafkaPublisher) PublishBlockData(blockData []*common.BlockData) error { } func (p *KafkaPublisher) PublishBlockDataReorg(newBlockData []*common.BlockData, oldBlockData []*common.BlockData) error { + log.Debug().Int("new_block_data_count", len(newBlockData)).Int("old_block_data_count", len(oldBlockData)).Msg("PublishBlockDataReorg: Publishing block data reorg") if err := p.publishBlockData(oldBlockData, true, true); err != nil { return fmt.Errorf("failed to publish old block data: %v", err) } From 8ab3050064ed23db4c126dfddf415bd288c98eef Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 22:58:02 +0545 Subject: [PATCH 6/9] undo test codes --- cmd/committer.go | 4 ++-- internal/committer/reorg.go | 2 +- internal/libs/redis.go | 2 +- internal/storage/kafka_publisher.go | 7 +++---- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/cmd/committer.go b/cmd/committer.go index 34121d3..eef0bba 100644 --- a/cmd/committer.go +++ b/cmd/committer.go @@ -32,6 +32,6 @@ func RunCommitter(cmd *cobra.Command, args []string) { committer.Init() committer.InitReorg() - committer.RunReorgValidator() - // committer.CommitStreaming() + go committer.RunReorgValidator() + committer.CommitStreaming() } diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 1975b97..48963e1 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -33,7 +33,7 @@ func RunReorgValidator() { continue } - if endBlock-startBlock < 100 { + if endBlock == lastBlockCheck || endBlock-startBlock < 100 { log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.") time.Sleep(1 * time.Minute) continue diff --git a/internal/libs/redis.go b/internal/libs/redis.go index b612c01..6827282 100644 --- a/internal/libs/redis.go +++ b/internal/libs/redis.go @@ -12,7 +12,7 @@ import ( var RedisClient *redis.Client -const RedisReorgLastValidBlock = "reorg_last_valid_debug" +const RedisReorgLastValidBlock = "reorg_last_valid" // InitRedis initializes the Redis client func InitRedis() { diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index ad0dab9..65418fe 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -232,10 +232,9 @@ func (p *KafkaPublisher) publishBlockData(blockData []*common.BlockData, isDelet return nil } - // test code, uncomment later - // if err := p.publishMessages(context.Background(), blockMessages); err != nil { - // return fmt.Errorf("failed to publish block messages: %v", err) - // } + if err := p.publishMessages(context.Background(), blockMessages); err != nil { + return fmt.Errorf("failed to publish block messages: %v", err) + } log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds()) return nil From cc81319ae02e5c6e7ef9bcb7db1cc58c9a54304e Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 23:04:20 +0545 Subject: [PATCH 7/9] log updates --- internal/committer/reorg.go | 13 --------- internal/storage/kafka_publisher.go | 44 ++++++++--------------------- 2 files changed, 11 insertions(+), 46 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 48963e1..d647817 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -225,19 +225,6 @@ func handleReorgForRange(startBlock uint64, endBlock uint64) error { return fmt.Errorf("handleReorgForRange: failed to get old block data: %w", err) } - nonNilOldBlocks := 0 - for _, bd := range oldblockDataArray { - if bd != nil { - nonNilOldBlocks++ - } - } - log.Debug(). - Uint64("start_block", startBlock). - Uint64("end_block", endBlock). - Int("requested_old_blocks", len(oldblockDataArray)). - Int("non_nil_old_blocks", nonNilOldBlocks). - Msg("handleReorgForRange: loaded old block data from ClickHouse") - if err := libs.KafkaPublisherV2.PublishBlockDataReorg(newblockDataArray, oldblockDataArray); err != nil { log.Error(). Err(err). diff --git a/internal/storage/kafka_publisher.go b/internal/storage/kafka_publisher.go index 65418fe..3dabbc1 100644 --- a/internal/storage/kafka_publisher.go +++ b/internal/storage/kafka_publisher.go @@ -268,41 +268,19 @@ func (p *KafkaPublisher) createBlockDataMessage(block *common.BlockData, isDelet return nil, fmt.Errorf("failed to marshal block data: %v", err) } - log.Debug(). - Uint64("chain_id", data.ChainId). - Uint64("block_number", block.Block.Number.Uint64()). - Int("tx_count", len(block.Transactions)). - Int("log_count", len(block.Logs)). - Int("trace_count", len(block.Traces)). - Bool("is_deleted", isDeleted). - Bool("is_reorg", isReorg). - Msg("KafkaPublisher Message: Block metadata") - - return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) -} - -func (p *KafkaPublisher) createBlockRevertMessage(chainId uint64, blockNumber uint64) (*kgo.Record, error) { - timestamp := time.Now() - - data := PublishableMessageRevert{ - ChainId: chainId, - BlockNumber: blockNumber, - IsDeleted: 0, - InsertTimestamp: timestamp, - } - - msg := PublishableMessagePayload{ - Data: data, - Type: data.GetType(), - Timestamp: timestamp, - } - - msgJson, err := json.Marshal(msg) - if err != nil { - return nil, fmt.Errorf("failed to marshal block data: %v", err) + if isReorg { + log.Debug(). + Uint64("chain_id", data.ChainId). + Uint64("block_number", block.Block.Number.Uint64()). + Int("tx_count", len(block.Transactions)). + Int("log_count", len(block.Logs)). + Int("trace_count", len(block.Traces)). + Bool("is_deleted", isDeleted). + Bool("is_reorg", isReorg). + Msg("KafkaPublisher Message Reorg: Block metadata") } - return p.createRecord(data.GetType(), chainId, blockNumber, timestamp, false, false, msgJson) + return p.createRecord(data.GetType(), data.ChainId, block.Block.Number.Uint64(), timestamp, isDeleted, isReorg, msgJson) } func (p *KafkaPublisher) createRecord(msgType MessageType, chainId uint64, blockNumber uint64, timestamp time.Time, isDeleted bool, isReorg bool, msgJson []byte) (*kgo.Record, error) { From ee8e4b5f83aaff6c5e878c81bf0e9c9bdcb84d9c Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 23:37:35 +0545 Subject: [PATCH 8/9] last block check --- internal/committer/reorg.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index d647817..d2eafa7 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -40,13 +40,13 @@ func RunReorgValidator() { } // Detect reorgs and handle them - err = detectAndHandleReorgs(startBlock, endBlock) + lastValidBlock, err := detectAndHandleReorgs(startBlock, endBlock) if err != nil { log.Error().Err(err).Msg("Failed to detect and handle reorgs") time.Sleep(2 * time.Second) continue } - lastBlockCheck = endBlock + lastBlockCheck = lastValidBlock } } @@ -91,18 +91,18 @@ func getLastValidBlock() (int64, error) { return lastValidBlock, nil } -func detectAndHandleReorgs(startBlock int64, endBlock int64) error { +func detectAndHandleReorgs(startBlock int64, endBlock int64) (int64, error) { log.Debug().Msgf("Checking for reorgs from block %d to %d", startBlock, endBlock) // Fetch block headers for the range blockHeaders, err := libs.GetBlockHeadersForReorgCheck(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) if err != nil { - return fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err) + return 0, fmt.Errorf("detectAndHandleReorgs: failed to get block headers: %w", err) } if len(blockHeaders) == 0 { log.Debug().Msg("detectAndHandleReorgs: No block headers found in range") - return nil + return 0, nil } // 1) Block verification: find reorg range from header continuity (existing behavior) @@ -141,13 +141,13 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { // and the number of transactions stored per block in ClickHouse. txStart, txEnd, err := libs.GetTransactionMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) if err != nil { - return fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err) + return 0, fmt.Errorf("detectAndHandleReorgs: transaction verification failed: %w", err) } // 3) Logs verification: check for mismatches between logsBloom and logs stored in ClickHouse. logsStart, logsEnd, err := libs.GetLogsMismatchRangeFromClickHouseV2(libs.ChainId.Uint64(), uint64(startBlock), uint64(endBlock)) if err != nil { - return fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err) + return 0, fmt.Errorf("detectAndHandleReorgs: logs verification failed: %w", err) } // 4) Combine all ranges: @@ -186,16 +186,16 @@ func detectAndHandleReorgs(startBlock int64, endBlock int64) error { if finalStart > -1 { // We found at least one inconsistent range; reorg from min(start) to max(end). if err := handleReorgForRange(uint64(finalStart), uint64(finalEnd)); err != nil { - return err + return 0, err } lastValidBlock = finalEnd } err = libs.SetReorgLastValidBlock(libs.ChainIdStr, lastValidBlock) if err != nil { - return fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err) + return 0, fmt.Errorf("detectAndHandleReorgs: failed to set last valid block: %w", err) } - return nil + return lastValidBlock, nil } func handleReorgForRange(startBlock uint64, endBlock uint64) error { From ebf125448ce794bdbd699b985806fec87e47ed3c Mon Sep 17 00:00:00 2001 From: nischit Date: Thu, 11 Dec 2025 23:39:14 +0545 Subject: [PATCH 9/9] minor change --- internal/committer/reorg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index d2eafa7..f40a426 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -33,7 +33,7 @@ func RunReorgValidator() { continue } - if endBlock == lastBlockCheck || endBlock-startBlock < 100 { + if endBlock-startBlock < 100 { log.Debug().Int64("last_block_check", lastBlockCheck).Int64("start_block", startBlock).Int64("end_block", endBlock).Msg("Not enough new blocks to check. Sleeping for 1 minute.") time.Sleep(1 * time.Minute) continue