diff --git a/crates/flashblocks/src/lib.rs b/crates/flashblocks/src/lib.rs index 12a36fe2..b9bec3e5 100644 --- a/crates/flashblocks/src/lib.rs +++ b/crates/flashblocks/src/lib.rs @@ -20,3 +20,6 @@ pub use subscription::FlashblocksSubscriber; mod traits; pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI}; + +mod state_builder; +pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder}; diff --git a/crates/flashblocks/src/processor.rs b/crates/flashblocks/src/processor.rs index 97b52adb..5524383d 100644 --- a/crates/flashblocks/src/processor.rs +++ b/crates/flashblocks/src/processor.rs @@ -7,41 +7,34 @@ use std::{ }; use alloy_consensus::{ - Eip658Value, Header, TxReceipt, - transaction::{Recovered, SignerRecoverable, TransactionMeta}, + Header, + transaction::{Recovered, SignerRecoverable}, }; use alloy_eips::BlockNumberOrTag; -use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; -use alloy_primitives::{Address, B256, BlockNumber, Bytes, Sealable}; -use alloy_rpc_types::{TransactionTrait, Withdrawal}; +use alloy_primitives::{Address, B256, BlockNumber, Bytes}; +use alloy_rpc_types::Withdrawal; use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3}; use alloy_rpc_types_eth::state::StateOverride; use arc_swap::ArcSwapOption; use base_flashtypes::Flashblock; use eyre::eyre; -use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope}; +use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::TransactionResponse; -use op_alloy_rpc_types::Transaction; use rayon::prelude::*; use reth::{ chainspec::{ChainSpecProvider, EthChainSpec}, providers::{BlockReaderIdExt, StateProviderFactory}, - revm::{ - DatabaseCommit, State, context::result::ResultAndState, database::StateProviderDatabase, - db::CacheDB, - }, + revm::{State, database::StateProviderDatabase, db::CacheDB}, }; -use reth_evm::{ConfigureEvm, Evm, eth::receipt_builder::ReceiptBuilderCtx}; +use reth_evm::ConfigureEvm; use reth_optimism_chainspec::OpHardforks; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; -use reth_optimism_primitives::{OpBlock, OpPrimitives}; -use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder; +use reth_optimism_primitives::OpBlock; use reth_primitives::RecoveredBlock; -use reth_rpc_convert::transaction::ConvertReceiptInput; use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver}; use tracing::{debug, error, info, warn}; -use crate::{Metrics, PendingBlocks, PendingBlocksBuilder}; +use crate::{Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder}; /// Messages consumed by the state processor. #[derive(Debug, Clone)] @@ -360,9 +353,10 @@ where }; let block: OpBlock = execution_payload.try_into_block()?; - let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; - let header = block.header.clone().seal_slow(); - pending_blocks_builder.with_header(header.clone()); + let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?; + let block_header = block.header.clone(); // prevents us from needing to clone the entire block + let sealed_header = block_header.clone().seal(B256::ZERO); // zero block hash for flashblocks + pending_blocks_builder.with_header(sealed_header); let block_env_attributes = OpNextBlockEnvAttributes { timestamp: base.timestamp, @@ -374,18 +368,16 @@ where }; let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?; - let mut evm = evm_config.evm_with_env(db, evm_env); - - let mut cumulative_gas_used: u64 = 0; - let mut next_log_index = 0; + let evm = evm_config.evm_with_env(db, evm_env); // Parallel sender recovery - batch all ECDSA operations upfront let recovery_start = Instant::now(); - let txs_with_senders: Vec<(&OpTxEnvelope, Address)> = block + let txs_with_senders: Vec<(OpTxEnvelope, Address)> = block .body .transactions .par_iter() - .map(|tx| -> eyre::Result<(&OpTxEnvelope, Address)> { + .cloned() + .map(|tx| -> eyre::Result<(OpTxEnvelope, Address)> { let tx_hash = tx.tx_hash(); let sender = match prev_pending_blocks .as_ref() @@ -399,236 +391,45 @@ where .collect::>()?; self.metrics.sender_recovery_duration.record(recovery_start.elapsed()); + let mut pending_state_builder = PendingStateBuilder::new( + self.client.chain_spec(), + evm, + block, + prev_pending_blocks.clone(), + l1_block_info, + state_overrides, + *evm_config.block_executor_factory().receipt_builder(), + ); + for (idx, (transaction, sender)) in txs_with_senders.into_iter().enumerate() { let tx_hash = transaction.tx_hash(); pending_blocks_builder.with_transaction_sender(tx_hash, sender); pending_blocks_builder.increment_nonce(sender); - let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender); - - let effective_gas_price = if transaction.is_deposit() { - 0 - } else { - block - .base_fee_per_gas - .map(|base_fee| { - transaction.effective_tip_per_gas(base_fee).unwrap_or_default() - + base_fee as u128 - }) - .unwrap_or_else(|| transaction.max_fee_per_gas()) - }; - - // Check if we have all the data we need (receipt + state) - let cached_data = prev_pending_blocks.as_ref().and_then(|p| { - let receipt = p.get_receipt(tx_hash)?; - let state = p.get_transaction_state(&tx_hash)?; - Some((receipt, state)) - }); - - // If cached, we can fill out pending block data using previous execution results - // If not cached, we need to execute the transaction and build pending block data from scratch - // The `pending_blocks_builder.with*` calls should fill out the same data in both cases - // We also need to update the cumulative gas used and next log index in both cases - if let Some((receipt, state)) = cached_data { - let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { - let deposit_receipt = receipt - .inner - .inner - .as_deposit_receipt() - .ok_or(eyre!("deposit transaction, non deposit receipt"))?; - - (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) - } else { - (None, None) - }; - - let envelope = recovered_transaction.clone().convert::(); - let rpc_txn = Transaction { - inner: alloy_rpc_types_eth::Transaction { - inner: envelope, - block_hash: Some(header.hash()), - block_number: Some(base.block_number), - transaction_index: Some(idx as u64), - effective_gas_price: Some(effective_gas_price), - }, - deposit_nonce, - deposit_receipt_version, - }; - - pending_blocks_builder.with_transaction(rpc_txn); - pending_blocks_builder.with_receipt(tx_hash, receipt.clone()); - - for (address, account) in state.iter() { - if account.is_touched() { - pending_blocks_builder - .with_account_balance(*address, account.info.balance); - } - } - pending_blocks_builder.with_transaction_state(tx_hash, state); + let recovered_transaction = Recovered::new_unchecked(transaction, sender); - cumulative_gas_used = cumulative_gas_used - .checked_add(receipt.inner.gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; - next_log_index += receipt.inner.logs().len(); - } else { - let envelope = recovered_transaction.clone().convert::(); - - match evm.transact(recovered_transaction.clone()) { - Ok(ResultAndState { state, result }) => { - let gas_used = result.gas_used(); - for (addr, acc) in &state { - if acc.is_touched() { - pending_blocks_builder - .with_account_balance(*addr, acc.info.balance); - } - - let existing_override = state_overrides.entry(*addr).or_default(); - existing_override.balance = Some(acc.info.balance); - existing_override.nonce = Some(acc.info.nonce); - existing_override.code = - acc.info.code.clone().map(|code| code.bytes()); - - let existing = - existing_override.state_diff.get_or_insert(Default::default()); - let changed_slots = acc.storage.iter().map(|(&key, slot)| { - (B256::from(key), B256::from(slot.present_value)) - }); - - existing.extend(changed_slots); - } + let executed_transaction = + pending_state_builder.execute_transaction(idx, recovered_transaction)?; - cumulative_gas_used = cumulative_gas_used - .checked_add(gas_used) - .ok_or(eyre!("cumulative gas used overflow"))?; - - let receipt_builder = - evm_config.block_executor_factory().receipt_builder(); - - let is_canyon_active = self - .client - .chain_spec() - .is_canyon_active_at_timestamp(block.timestamp); - - let is_regolith_active = self - .client - .chain_spec() - .is_regolith_active_at_timestamp(block.timestamp); - - let receipt = match receipt_builder.build_receipt(ReceiptBuilderCtx { - tx: &recovered_transaction, - evm: &evm, - result, - state: &state, - cumulative_gas_used, - }) { - Ok(receipt) => receipt, - Err(ctx) => { - // This is a deposit transaction, so build the receipt from the context - let receipt = alloy_consensus::Receipt { - status: Eip658Value::Eip658(ctx.result.is_success()), - cumulative_gas_used: ctx.cumulative_gas_used, - logs: ctx.result.into_logs(), - }; - - let deposit_nonce = (is_regolith_active - && transaction.is_deposit()) - .then(|| { - evm.db_mut() - .load_account(recovered_transaction.signer()) - .map(|acc| acc.info.nonce) - }) - .transpose() - .map_err(|_| { - eyre!("failed to load cache account for depositor") - })?; - - receipt_builder.build_deposit_receipt(OpDepositReceipt { - inner: receipt, - deposit_nonce, - deposit_receipt_version: is_canyon_active.then_some(1), - }) - } - }; - - let meta = TransactionMeta { - tx_hash, - index: idx as u64, - block_hash: header.hash(), - block_number: block.number, - base_fee: block.base_fee_per_gas, - excess_blob_gas: block.excess_blob_gas, - timestamp: block.timestamp, - }; - - let input: ConvertReceiptInput<'_, OpPrimitives> = - ConvertReceiptInput { - receipt: receipt.clone(), - tx: Recovered::new_unchecked(transaction, sender), - gas_used, - next_log_index, - meta, - }; - - let op_receipt = OpRpcReceiptBuilder::new( - self.client.chain_spec().as_ref(), - input, - &mut l1_block_info, - ) - .unwrap() - .build(); - next_log_index += receipt.logs().len(); - - let (deposit_receipt_version, deposit_nonce) = - if transaction.is_deposit() { - let deposit_receipt = - op_receipt.inner.inner.as_deposit_receipt().ok_or( - eyre!("deposit transaction, non deposit receipt"), - )?; - - ( - deposit_receipt.deposit_receipt_version, - deposit_receipt.deposit_nonce, - ) - } else { - (None, None) - }; - - let rpc_txn = Transaction { - inner: alloy_rpc_types_eth::Transaction { - inner: envelope, - block_hash: Some(header.hash()), - block_number: Some(base.block_number), - transaction_index: Some(idx as u64), - effective_gas_price: Some(effective_gas_price), - }, - deposit_nonce, - deposit_receipt_version, - }; - - pending_blocks_builder.with_transaction(rpc_txn); - pending_blocks_builder.with_receipt(tx_hash, op_receipt); - pending_blocks_builder.with_transaction_state(tx_hash, state.clone()); - evm.db_mut().commit(state); - } - Err(e) => { - return Err(eyre!( - "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", - e, - tx_hash, - sender - )); - } + for (address, account) in executed_transaction.state.iter() { + if account.is_touched() { + pending_blocks_builder.with_account_balance(*address, account.info.balance); } } + + pending_blocks_builder.with_transaction(executed_transaction.rpc_transaction); + pending_blocks_builder.with_receipt(tx_hash, executed_transaction.receipt); + pending_blocks_builder.with_transaction_state(tx_hash, executed_transaction.state); } - db = evm.into_db(); - last_block_header = block.header.clone(); + (db, state_overrides) = pending_state_builder.into_db_and_state_overrides(); + last_block_header = block_header; } - pending_blocks_builder.with_db_cache(db.cache); pending_blocks_builder.with_state_overrides(state_overrides); + pending_blocks_builder.with_db_cache(db.cache); + Ok(Some(Arc::new(pending_blocks_builder.build()?))) } diff --git a/crates/flashblocks/src/state_builder.rs b/crates/flashblocks/src/state_builder.rs new file mode 100644 index 00000000..3505bf67 --- /dev/null +++ b/crates/flashblocks/src/state_builder.rs @@ -0,0 +1,299 @@ +use std::sync::Arc; + +use alloy_consensus::{ + Block, Eip658Value, Header, TxReceipt, + transaction::{Recovered, TransactionMeta}, +}; +use alloy_op_evm::block::receipt_builder::OpReceiptBuilder; +use alloy_primitives::B256; +use alloy_rpc_types::TransactionTrait; +use alloy_rpc_types_eth::state::StateOverride; +use eyre::eyre; +use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope}; +use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; +use reth::revm::{Database, DatabaseCommit, context::result::ResultAndState, state::EvmState}; +use reth_evm::{ + Evm, FromRecoveredTx, eth::receipt_builder::ReceiptBuilderCtx, op_revm::L1BlockInfo, +}; +use reth_optimism_chainspec::OpHardforks; +use reth_optimism_evm::OpRethReceiptBuilder; +use reth_optimism_primitives::OpPrimitives; +use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder; +use reth_rpc_convert::transaction::ConvertReceiptInput; + +use crate::PendingBlocks; + +/// Represents the result of executing or fetching a cached pending transaction. +#[derive(Debug, Clone)] +pub struct ExecutedPendingTransaction { + /// The RPC transaction. + pub rpc_transaction: Transaction, + /// The receipt of the transaction. + pub receipt: OpTransactionReceipt, + /// The updated EVM state. + pub state: EvmState, +} + +/// Executes or fetches cached values for transactions in a flashblock. +#[derive(Debug)] +pub struct PendingStateBuilder { + cumulative_gas_used: u64, + next_log_index: usize, + + evm: E, + pending_block: Block, + l1_block_info: L1BlockInfo, + chain_spec: ChainSpec, + receipt_builder: OpRethReceiptBuilder, + + prev_pending_blocks: Option>, + state_overrides: StateOverride, +} + +impl PendingStateBuilder +where + E: Evm, + DB: Database + DatabaseCommit, + E::Tx: FromRecoveredTx, + ChainSpec: OpHardforks, +{ + /// Creates a new pending state builder. + pub const fn new( + chain_spec: ChainSpec, + evm: E, + pending_block: Block, + prev_pending_blocks: Option>, + l1_block_info: L1BlockInfo, + state_overrides: StateOverride, + receipt_builder: OpRethReceiptBuilder, + ) -> Self { + Self { + pending_block, + evm, + cumulative_gas_used: 0, + next_log_index: 0, + prev_pending_blocks, + l1_block_info, + state_overrides, + chain_spec, + receipt_builder, + } + } + + /// Consumes the builder and returns the database and state overrides. + pub fn into_db_and_state_overrides(self) -> (DB, StateOverride) { + (self.evm.into_db(), self.state_overrides) + } + + /// Executes a single transaction and updates internal state. + /// Should be called in order for each transaction. + pub fn execute_transaction( + &mut self, + idx: usize, + transaction: Recovered, + ) -> eyre::Result { + let tx_hash = transaction.tx_hash(); + + let effective_gas_price = if transaction.is_deposit() { + 0 + } else { + self.pending_block + .base_fee_per_gas + .map(|base_fee| { + transaction.effective_tip_per_gas(base_fee).unwrap_or_default() + + base_fee as u128 + }) + .unwrap_or_else(|| transaction.max_fee_per_gas()) + }; + + // Check if we have all the data we need (receipt + state) + let cached_data = self.prev_pending_blocks.as_ref().and_then(|p| { + let receipt = p.get_receipt(tx_hash)?; + let state = p.get_transaction_state(&tx_hash)?; + Some((receipt, state)) + }); + + // If cached, we can fill out pending block data using previous execution results + // If not cached, we need to execute the transaction and build pending block data from scratch + if let Some((receipt, state)) = cached_data { + self.execute_with_cached_data(transaction, receipt, state, idx, effective_gas_price) + } else { + self.execute_with_evm(transaction, idx, effective_gas_price) + } + } + + /// Builds transaction result from cached receipt and state data. + fn execute_with_cached_data( + &mut self, + transaction: Recovered, + receipt: OpTransactionReceipt, + state: EvmState, + idx: usize, + effective_gas_price: u128, + ) -> eyre::Result { + let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { + let deposit_receipt = receipt + .inner + .inner + .as_deposit_receipt() + .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + + (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) + } else { + (None, None) + }; + + let rpc_transaction = Transaction { + inner: alloy_rpc_types_eth::Transaction { + inner: transaction, + block_hash: None, + block_number: Some(self.pending_block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + + self.cumulative_gas_used = self + .cumulative_gas_used + .checked_add(receipt.inner.gas_used) + .ok_or(eyre!("cumulative gas used overflow"))?; + self.next_log_index += receipt.inner.logs().len(); + + Ok(ExecutedPendingTransaction { rpc_transaction, receipt, state }) + } + + /// Executes the transaction through the EVM and builds the result from scratch. + fn execute_with_evm( + &mut self, + transaction: Recovered, + idx: usize, + effective_gas_price: u128, + ) -> eyre::Result { + let tx_hash = transaction.tx_hash(); + + match self.evm.transact(&transaction) { + Ok(ResultAndState { state, result }) => { + let gas_used = result.gas_used(); + for (addr, acc) in &state { + let existing_override = self.state_overrides.entry(*addr).or_default(); + existing_override.balance = Some(acc.info.balance); + existing_override.nonce = Some(acc.info.nonce); + existing_override.code = acc.info.code.clone().map(|code| code.bytes()); + + let existing = existing_override.state_diff.get_or_insert(Default::default()); + let changed_slots = acc + .storage + .iter() + .map(|(&key, slot)| (B256::from(key), B256::from(slot.present_value))); + + existing.extend(changed_slots); + } + + self.cumulative_gas_used = self + .cumulative_gas_used + .checked_add(gas_used) + .ok_or(eyre!("cumulative gas used overflow"))?; + + let is_canyon_active = + self.chain_spec.is_canyon_active_at_timestamp(self.pending_block.timestamp); + + let is_regolith_active = + self.chain_spec.is_regolith_active_at_timestamp(self.pending_block.timestamp); + + let receipt = match self.receipt_builder.build_receipt(ReceiptBuilderCtx { + tx: &transaction, + evm: &mut self.evm, + result, + state: &state, + cumulative_gas_used: self.cumulative_gas_used, + }) { + Ok(receipt) => receipt, + Err(ctx) => { + // This is a deposit transaction, so build the receipt from the context + let receipt = alloy_consensus::Receipt { + status: Eip658Value::Eip658(ctx.result.is_success()), + cumulative_gas_used: ctx.cumulative_gas_used, + logs: ctx.result.into_logs(), + }; + + let deposit_nonce = (is_regolith_active && transaction.is_deposit()) + .then(|| { + self.evm + .db_mut() + .basic(transaction.signer()) + .map(|acc| acc.unwrap_or_default().nonce) + }) + .transpose() + .map_err(|_| eyre!("failed to load cache account for depositor"))?; + + self.receipt_builder.build_deposit_receipt(OpDepositReceipt { + inner: receipt, + deposit_nonce, + deposit_receipt_version: is_canyon_active.then_some(1), + }) + } + }; + + let meta = TransactionMeta { + tx_hash, + index: idx as u64, + block_hash: B256::ZERO, // block hash is not available yet for flashblocks + block_number: self.pending_block.number, + base_fee: self.pending_block.base_fee_per_gas, + excess_blob_gas: self.pending_block.excess_blob_gas, + timestamp: self.pending_block.timestamp, + }; + + let sender = transaction.signer(); + let input: ConvertReceiptInput<'_, OpPrimitives> = ConvertReceiptInput { + receipt: receipt.clone(), + tx: Recovered::new_unchecked(&transaction, sender), + gas_used, + next_log_index: self.next_log_index, + meta, + }; + + let op_receipt = + OpRpcReceiptBuilder::new(&self.chain_spec, input, &mut self.l1_block_info) + .unwrap() + .build(); + self.next_log_index += receipt.logs().len(); + + let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() { + let deposit_receipt = op_receipt + .inner + .inner + .as_deposit_receipt() + .ok_or(eyre!("deposit transaction, non deposit receipt"))?; + + (deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce) + } else { + (None, None) + }; + + let rpc_transaction = Transaction { + inner: alloy_rpc_types_eth::Transaction { + inner: transaction, + block_hash: None, + block_number: Some(self.pending_block.number), + transaction_index: Some(idx as u64), + effective_gas_price: Some(effective_gas_price), + }, + deposit_nonce, + deposit_receipt_version, + }; + self.evm.db_mut().commit(state.clone()); + + Ok(ExecutedPendingTransaction { rpc_transaction, receipt: op_receipt, state }) + } + Err(e) => Err(eyre!( + "failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}", + e, + tx_hash, + transaction.signer() + )), + } + } +}