Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions crates/flashblocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,6 @@ pub use subscription::FlashblocksSubscriber;

mod traits;
pub use traits::{FlashblocksAPI, FlashblocksReceiver, PendingBlocksAPI};

mod state_builder;
pub use state_builder::{ExecutedPendingTransaction, PendingStateBuilder};
281 changes: 41 additions & 240 deletions crates/flashblocks/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,34 @@ use std::{
};

use alloy_consensus::{
Eip658Value, Header, TxReceipt,
transaction::{Recovered, SignerRecoverable, TransactionMeta},
Header,
transaction::{Recovered, SignerRecoverable},
};
use alloy_eips::BlockNumberOrTag;
use alloy_op_evm::block::receipt_builder::OpReceiptBuilder;
use alloy_primitives::{Address, B256, BlockNumber, Bytes, Sealable};
use alloy_rpc_types::{TransactionTrait, Withdrawal};
use alloy_primitives::{Address, B256, BlockNumber, Bytes};
use alloy_rpc_types::Withdrawal;
use alloy_rpc_types_engine::{ExecutionPayloadV1, ExecutionPayloadV2, ExecutionPayloadV3};
use alloy_rpc_types_eth::state::StateOverride;
use arc_swap::ArcSwapOption;
use base_flashtypes::Flashblock;
use eyre::eyre;
use op_alloy_consensus::{OpDepositReceipt, OpTxEnvelope};
use op_alloy_consensus::OpTxEnvelope;
use op_alloy_network::TransactionResponse;
use op_alloy_rpc_types::Transaction;
use rayon::prelude::*;
use reth::{
chainspec::{ChainSpecProvider, EthChainSpec},
providers::{BlockReaderIdExt, StateProviderFactory},
revm::{
DatabaseCommit, State, context::result::ResultAndState, database::StateProviderDatabase,
db::CacheDB,
},
revm::{State, database::StateProviderDatabase, db::CacheDB},
};
use reth_evm::{ConfigureEvm, Evm, eth::receipt_builder::ReceiptBuilderCtx};
use reth_evm::ConfigureEvm;
use reth_optimism_chainspec::OpHardforks;
use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes};
use reth_optimism_primitives::{OpBlock, OpPrimitives};
use reth_optimism_rpc::OpReceiptBuilder as OpRpcReceiptBuilder;
use reth_optimism_primitives::OpBlock;
use reth_primitives::RecoveredBlock;
use reth_rpc_convert::transaction::ConvertReceiptInput;
use tokio::sync::{Mutex, broadcast::Sender, mpsc::UnboundedReceiver};
use tracing::{debug, error, info, warn};

use crate::{Metrics, PendingBlocks, PendingBlocksBuilder};
use crate::{Metrics, PendingBlocks, PendingBlocksBuilder, PendingStateBuilder};

/// Messages consumed by the state processor.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -360,9 +353,10 @@ where
};

let block: OpBlock = execution_payload.try_into_block()?;
let mut l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?;
let header = block.header.clone().seal_slow();
pending_blocks_builder.with_header(header.clone());
let l1_block_info = reth_optimism_evm::extract_l1_info(&block.body)?;
let block_header = block.header.clone(); // prevents us from needing to clone the entire block
let sealed_header = block_header.clone().seal(B256::ZERO); // zero block hash for flashblocks
pending_blocks_builder.with_header(sealed_header);

let block_env_attributes = OpNextBlockEnvAttributes {
timestamp: base.timestamp,
Expand All @@ -374,18 +368,16 @@ where
};

let evm_env = evm_config.next_evm_env(&last_block_header, &block_env_attributes)?;
let mut evm = evm_config.evm_with_env(db, evm_env);

let mut cumulative_gas_used: u64 = 0;
let mut next_log_index = 0;
let evm = evm_config.evm_with_env(db, evm_env);

// Parallel sender recovery - batch all ECDSA operations upfront
let recovery_start = Instant::now();
let txs_with_senders: Vec<(&OpTxEnvelope, Address)> = block
let txs_with_senders: Vec<(OpTxEnvelope, Address)> = block
.body
.transactions
.par_iter()
.map(|tx| -> eyre::Result<(&OpTxEnvelope, Address)> {
.cloned()
.map(|tx| -> eyre::Result<(OpTxEnvelope, Address)> {
let tx_hash = tx.tx_hash();
let sender = match prev_pending_blocks
.as_ref()
Expand All @@ -399,236 +391,45 @@ where
.collect::<eyre::Result<_>>()?;
self.metrics.sender_recovery_duration.record(recovery_start.elapsed());

let mut pending_state_builder = PendingStateBuilder::new(
self.client.chain_spec(),
evm,
block,
prev_pending_blocks.clone(),
l1_block_info,
state_overrides,
*evm_config.block_executor_factory().receipt_builder(),
);

for (idx, (transaction, sender)) in txs_with_senders.into_iter().enumerate() {
let tx_hash = transaction.tx_hash();

pending_blocks_builder.with_transaction_sender(tx_hash, sender);
pending_blocks_builder.increment_nonce(sender);

let recovered_transaction = Recovered::new_unchecked(transaction.clone(), sender);

let effective_gas_price = if transaction.is_deposit() {
0
} else {
block
.base_fee_per_gas
.map(|base_fee| {
transaction.effective_tip_per_gas(base_fee).unwrap_or_default()
+ base_fee as u128
})
.unwrap_or_else(|| transaction.max_fee_per_gas())
};

// Check if we have all the data we need (receipt + state)
let cached_data = prev_pending_blocks.as_ref().and_then(|p| {
let receipt = p.get_receipt(tx_hash)?;
let state = p.get_transaction_state(&tx_hash)?;
Some((receipt, state))
});

// If cached, we can fill out pending block data using previous execution results
// If not cached, we need to execute the transaction and build pending block data from scratch
// The `pending_blocks_builder.with*` calls should fill out the same data in both cases
// We also need to update the cumulative gas used and next log index in both cases
if let Some((receipt, state)) = cached_data {
let (deposit_receipt_version, deposit_nonce) = if transaction.is_deposit() {
let deposit_receipt = receipt
.inner
.inner
.as_deposit_receipt()
.ok_or(eyre!("deposit transaction, non deposit receipt"))?;

(deposit_receipt.deposit_receipt_version, deposit_receipt.deposit_nonce)
} else {
(None, None)
};

let envelope = recovered_transaction.clone().convert::<OpTxEnvelope>();
let rpc_txn = Transaction {
inner: alloy_rpc_types_eth::Transaction {
inner: envelope,
block_hash: Some(header.hash()),
block_number: Some(base.block_number),
transaction_index: Some(idx as u64),
effective_gas_price: Some(effective_gas_price),
},
deposit_nonce,
deposit_receipt_version,
};

pending_blocks_builder.with_transaction(rpc_txn);
pending_blocks_builder.with_receipt(tx_hash, receipt.clone());

for (address, account) in state.iter() {
if account.is_touched() {
pending_blocks_builder
.with_account_balance(*address, account.info.balance);
}
}
pending_blocks_builder.with_transaction_state(tx_hash, state);
let recovered_transaction = Recovered::new_unchecked(transaction, sender);

cumulative_gas_used = cumulative_gas_used
.checked_add(receipt.inner.gas_used)
.ok_or(eyre!("cumulative gas used overflow"))?;
next_log_index += receipt.inner.logs().len();
} else {
let envelope = recovered_transaction.clone().convert::<OpTxEnvelope>();

match evm.transact(recovered_transaction.clone()) {
Ok(ResultAndState { state, result }) => {
let gas_used = result.gas_used();
for (addr, acc) in &state {
if acc.is_touched() {
pending_blocks_builder
.with_account_balance(*addr, acc.info.balance);
}

let existing_override = state_overrides.entry(*addr).or_default();
existing_override.balance = Some(acc.info.balance);
existing_override.nonce = Some(acc.info.nonce);
existing_override.code =
acc.info.code.clone().map(|code| code.bytes());

let existing =
existing_override.state_diff.get_or_insert(Default::default());
let changed_slots = acc.storage.iter().map(|(&key, slot)| {
(B256::from(key), B256::from(slot.present_value))
});

existing.extend(changed_slots);
}
let executed_transaction =
pending_state_builder.execute_transaction(idx, recovered_transaction)?;

cumulative_gas_used = cumulative_gas_used
.checked_add(gas_used)
.ok_or(eyre!("cumulative gas used overflow"))?;

let receipt_builder =
evm_config.block_executor_factory().receipt_builder();

let is_canyon_active = self
.client
.chain_spec()
.is_canyon_active_at_timestamp(block.timestamp);

let is_regolith_active = self
.client
.chain_spec()
.is_regolith_active_at_timestamp(block.timestamp);

let receipt = match receipt_builder.build_receipt(ReceiptBuilderCtx {
tx: &recovered_transaction,
evm: &evm,
result,
state: &state,
cumulative_gas_used,
}) {
Ok(receipt) => receipt,
Err(ctx) => {
// This is a deposit transaction, so build the receipt from the context
let receipt = alloy_consensus::Receipt {
status: Eip658Value::Eip658(ctx.result.is_success()),
cumulative_gas_used: ctx.cumulative_gas_used,
logs: ctx.result.into_logs(),
};

let deposit_nonce = (is_regolith_active
&& transaction.is_deposit())
.then(|| {
evm.db_mut()
.load_account(recovered_transaction.signer())
.map(|acc| acc.info.nonce)
})
.transpose()
.map_err(|_| {
eyre!("failed to load cache account for depositor")
})?;

receipt_builder.build_deposit_receipt(OpDepositReceipt {
inner: receipt,
deposit_nonce,
deposit_receipt_version: is_canyon_active.then_some(1),
})
}
};

let meta = TransactionMeta {
tx_hash,
index: idx as u64,
block_hash: header.hash(),
block_number: block.number,
base_fee: block.base_fee_per_gas,
excess_blob_gas: block.excess_blob_gas,
timestamp: block.timestamp,
};

let input: ConvertReceiptInput<'_, OpPrimitives> =
ConvertReceiptInput {
receipt: receipt.clone(),
tx: Recovered::new_unchecked(transaction, sender),
gas_used,
next_log_index,
meta,
};

let op_receipt = OpRpcReceiptBuilder::new(
self.client.chain_spec().as_ref(),
input,
&mut l1_block_info,
)
.unwrap()
.build();
next_log_index += receipt.logs().len();

let (deposit_receipt_version, deposit_nonce) =
if transaction.is_deposit() {
let deposit_receipt =
op_receipt.inner.inner.as_deposit_receipt().ok_or(
eyre!("deposit transaction, non deposit receipt"),
)?;

(
deposit_receipt.deposit_receipt_version,
deposit_receipt.deposit_nonce,
)
} else {
(None, None)
};

let rpc_txn = Transaction {
inner: alloy_rpc_types_eth::Transaction {
inner: envelope,
block_hash: Some(header.hash()),
block_number: Some(base.block_number),
transaction_index: Some(idx as u64),
effective_gas_price: Some(effective_gas_price),
},
deposit_nonce,
deposit_receipt_version,
};

pending_blocks_builder.with_transaction(rpc_txn);
pending_blocks_builder.with_receipt(tx_hash, op_receipt);
pending_blocks_builder.with_transaction_state(tx_hash, state.clone());
evm.db_mut().commit(state);
}
Err(e) => {
return Err(eyre!(
"failed to execute transaction: {:?} tx_hash: {:?} sender: {:?}",
e,
tx_hash,
sender
));
}
for (address, account) in executed_transaction.state.iter() {
if account.is_touched() {
pending_blocks_builder.with_account_balance(*address, account.info.balance);
}
}

pending_blocks_builder.with_transaction(executed_transaction.rpc_transaction);
pending_blocks_builder.with_receipt(tx_hash, executed_transaction.receipt);
pending_blocks_builder.with_transaction_state(tx_hash, executed_transaction.state);
}

db = evm.into_db();
last_block_header = block.header.clone();
(db, state_overrides) = pending_state_builder.into_db_and_state_overrides();
last_block_header = block_header;
}

pending_blocks_builder.with_db_cache(db.cache);
pending_blocks_builder.with_state_overrides(state_overrides);
pending_blocks_builder.with_db_cache(db.cache);

Ok(Some(Arc::new(pending_blocks_builder.build()?)))
}

Expand Down
Loading