diff --git a/crates/cold-mdbx/Cargo.toml b/crates/cold-mdbx/Cargo.toml index 2ea6387..d947cb5 100644 --- a/crates/cold-mdbx/Cargo.toml +++ b/crates/cold-mdbx/Cargo.toml @@ -13,7 +13,9 @@ categories = ["database-implementations"] [dependencies] alloy.workspace = true +signet-cold = { version = "0.0.1", path = "../cold" } signet-hot.workspace = true +signet-hot-mdbx.workspace = true signet-storage-types.workspace = true thiserror.workspace = true @@ -21,6 +23,8 @@ thiserror.workspace = true signet-hot-mdbx = { workspace = true, features = ["test-utils"] } signet-libmdbx.workspace = true tempfile.workspace = true +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } [features] default = [] +test-utils = ["signet-cold/test-utils"] diff --git a/crates/cold-mdbx/src/backend.rs b/crates/cold-mdbx/src/backend.rs new file mode 100644 index 0000000..158b7b6 --- /dev/null +++ b/crates/cold-mdbx/src/backend.rs @@ -0,0 +1,580 @@ +//! MDBX backend implementation for [`ColdStorage`]. +//! +//! This module provides an MDBX-based implementation of the cold storage +//! backend. It uses the table definitions from this crate and the MDBX +//! database environment from `signet-hot-mdbx`. + +use crate::{ + ColdBlockHashIndex, ColdHeaders, ColdMetadata, ColdReceipts, ColdSignetEvents, + ColdTransactions, ColdTxHashIndex, ColdZenithHeaders, MdbxColdError, MetadataKey, +}; +use alloy::{consensus::Header, primitives::BlockNumber}; +use signet_cold::{ + BlockData, BlockTag, ColdResult, ColdStorage, HeaderSpecifier, ReceiptSpecifier, + SignetEventsSpecifier, TransactionSpecifier, ZenithHeaderSpecifier, +}; +use signet_hot::{ + KeySer, MAX_KEY_SIZE, ValSer, + model::{DualTableTraverse, HotKvWrite, KvTraverse, TableTraverse}, + tables::Table, +}; +use signet_hot_mdbx::{DatabaseArguments, DatabaseEnv, DatabaseEnvKind}; +use signet_storage_types::{DbSignetEvent, DbZenithHeader, Receipt, TransactionSigned, TxLocation}; +use std::path::Path; + +/// MDBX-based cold storage backend. +/// +/// This backend stores historical blockchain data in an MDBX database. +/// It implements the [`ColdStorage`] trait for use with the cold storage +/// task runner. +#[derive(Debug)] +pub struct MdbxColdBackend { + /// The MDBX environment. + env: DatabaseEnv, +} + +impl MdbxColdBackend { + /// Open an existing MDBX cold storage database in read-only mode. + pub fn open_ro(path: &Path) -> Result { + let env = DatabaseArguments::new().open_ro(path)?; + Ok(Self { env }) + } + + /// Open or create an MDBX cold storage database in read-write mode. + pub fn open_rw(path: &Path) -> Result { + let env = DatabaseArguments::new().open_rw(path)?; + let backend = Self { env }; + backend.create_tables()?; + Ok(backend) + } + + /// Open an MDBX cold storage database with custom arguments. + pub fn open( + path: &Path, + kind: DatabaseEnvKind, + args: DatabaseArguments, + ) -> Result { + let env = DatabaseEnv::open(path, kind, args)?; + let backend = Self { env }; + if kind.is_rw() { + backend.create_tables()?; + } + Ok(backend) + } + + fn create_tables(&self) -> Result<(), MdbxColdError> { + let tx = self.env.tx_rw()?; + + for (name, dual_key_size, fixed_val_size, int_key) in [ + ( + ColdHeaders::NAME, + ColdHeaders::DUAL_KEY_SIZE, + ColdHeaders::FIXED_VAL_SIZE, + ColdHeaders::INT_KEY, + ), + ( + ColdZenithHeaders::NAME, + ColdZenithHeaders::DUAL_KEY_SIZE, + ColdZenithHeaders::FIXED_VAL_SIZE, + ColdZenithHeaders::INT_KEY, + ), + ( + ColdBlockHashIndex::NAME, + ColdBlockHashIndex::DUAL_KEY_SIZE, + ColdBlockHashIndex::FIXED_VAL_SIZE, + ColdBlockHashIndex::INT_KEY, + ), + ( + ColdTxHashIndex::NAME, + ColdTxHashIndex::DUAL_KEY_SIZE, + ColdTxHashIndex::FIXED_VAL_SIZE, + ColdTxHashIndex::INT_KEY, + ), + ( + ColdMetadata::NAME, + ColdMetadata::DUAL_KEY_SIZE, + ColdMetadata::FIXED_VAL_SIZE, + ColdMetadata::INT_KEY, + ), + ( + ColdTransactions::NAME, + ColdTransactions::DUAL_KEY_SIZE, + ColdTransactions::FIXED_VAL_SIZE, + ColdTransactions::INT_KEY, + ), + ( + ColdReceipts::NAME, + ColdReceipts::DUAL_KEY_SIZE, + ColdReceipts::FIXED_VAL_SIZE, + ColdReceipts::INT_KEY, + ), + ( + ColdSignetEvents::NAME, + ColdSignetEvents::DUAL_KEY_SIZE, + ColdSignetEvents::FIXED_VAL_SIZE, + ColdSignetEvents::INT_KEY, + ), + ] { + tx.queue_raw_create(name, dual_key_size, fixed_val_size, int_key)?; + } + + tx.raw_commit()?; + Ok(()) + } + + fn resolve_tag(&self, tag: BlockTag) -> Result, MdbxColdError> { + let key = match tag { + BlockTag::Latest => MetadataKey::LatestBlock, + BlockTag::Finalized => MetadataKey::FinalizedBlock, + BlockTag::Safe => MetadataKey::SafeBlock, + BlockTag::Earliest => MetadataKey::EarliestBlock, + }; + self.get_metadata(key) + } + + fn get_metadata(&self, key: MetadataKey) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(TableTraverse::::exact(&mut tx.new_cursor::()?, &key)?) + } + + fn get_block_by_hash( + &self, + hash: alloy::primitives::B256, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(TableTraverse::::exact( + &mut tx.new_cursor::()?, + &hash, + )?) + } + + fn get_tx_location( + &self, + hash: alloy::primitives::B256, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(TableTraverse::::exact( + &mut tx.new_cursor::()?, + &hash, + )?) + } + + fn resolve_header_spec( + &self, + spec: HeaderSpecifier, + ) -> Result, MdbxColdError> { + match spec { + HeaderSpecifier::Number(n) => Ok(Some(n)), + HeaderSpecifier::Hash(h) => self.get_block_by_hash(h), + HeaderSpecifier::Tag(tag) => self.resolve_tag(tag), + } + } + + fn resolve_tx_spec( + &self, + spec: TransactionSpecifier, + ) -> Result, MdbxColdError> { + match spec { + TransactionSpecifier::Hash(h) => { + self.get_tx_location(h).map(|opt| opt.map(|loc| (loc.block, loc.index))) + } + TransactionSpecifier::BlockAndIndex { block, index } => Ok(Some((block, index))), + TransactionSpecifier::BlockHashAndIndex { block_hash, index } => { + self.get_block_by_hash(block_hash).map(|opt| opt.map(|b| (b, index))) + } + } + } + + fn resolve_receipt_spec( + &self, + spec: ReceiptSpecifier, + ) -> Result, MdbxColdError> { + match spec { + ReceiptSpecifier::TxHash(h) => { + self.get_tx_location(h).map(|opt| opt.map(|loc| (loc.block, loc.index))) + } + ReceiptSpecifier::BlockAndIndex { block, index } => Ok(Some((block, index))), + } + } + + fn get_header_by_number( + &self, + block_num: BlockNumber, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(TableTraverse::::exact( + &mut tx.new_cursor::()?, + &block_num, + )?) + } + + fn get_transaction_by_location( + &self, + block: BlockNumber, + index: u64, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(DualTableTraverse::::exact_dual( + &mut tx.new_cursor::()?, + &block, + &index, + )?) + } + + fn get_receipt_by_location( + &self, + block: BlockNumber, + index: u64, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(DualTableTraverse::::exact_dual( + &mut tx.new_cursor::()?, + &block, + &index, + )?) + } + + fn get_zenith_header_by_number( + &self, + block: BlockNumber, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + Ok(TableTraverse::::exact( + &mut tx.new_cursor::()?, + &block, + )?) + } + + fn collect_transactions_in_block( + &self, + block: BlockNumber, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + let mut cursor = tx.new_cursor::()?; + DualTableTraverse::::iter_k2(&mut cursor, &block)? + .map(|item| item.map(|(_, v)| v)) + .collect::>() + .map_err(Into::into) + } + + fn count_transactions_in_block(&self, block: BlockNumber) -> Result { + let tx = self.env.tx()?; + let mut cursor = tx.new_cursor::()?; + let mut count = 0u64; + for item in DualTableTraverse::::iter_k2(&mut cursor, &block)? { + item?; + count += 1; + } + Ok(count) + } + + fn collect_receipts_in_block(&self, block: BlockNumber) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + let mut cursor = tx.new_cursor::()?; + DualTableTraverse::::iter_k2(&mut cursor, &block)? + .map(|item| item.map(|(_, v)| v)) + .collect::>() + .map_err(Into::into) + } + + fn collect_signet_events_in_block( + &self, + block: BlockNumber, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + let mut cursor = tx.new_cursor::()?; + DualTableTraverse::::iter_k2(&mut cursor, &block)? + .map(|item| item.map(|(_, v)| v)) + .collect::>() + .map_err(Into::into) + } + + fn collect_signet_events_in_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + let mut events = Vec::new(); + for block in start..=end { + let mut cursor = tx.new_cursor::()?; + for item in DualTableTraverse::::iter_k2(&mut cursor, &block)? { + events.push(item?.1); + } + } + Ok(events) + } + + fn collect_zenith_headers_in_range( + &self, + start: BlockNumber, + end: BlockNumber, + ) -> Result, MdbxColdError> { + let tx = self.env.tx()?; + let mut cursor = tx.new_cursor::()?; + let mut headers = Vec::new(); + + let mut key_buf = [0u8; MAX_KEY_SIZE]; + let key_bytes = start.encode_key(&mut key_buf); + + let Some((key, value)) = KvTraverse::<_>::lower_bound(&mut cursor, key_bytes)? else { + return Ok(headers); + }; + + let block_num = BlockNumber::decode_key(&key)?; + if block_num <= end { + headers.push(DbZenithHeader::decode_value(&value)?); + } + + while let Some((key, value)) = KvTraverse::<_>::read_next(&mut cursor)? { + let block_num = BlockNumber::decode_key(&key)?; + if block_num > end { + break; + } + headers.push(DbZenithHeader::decode_value(&value)?); + } + + Ok(headers) + } + + fn append_block_inner(&self, data: BlockData) -> Result<(), MdbxColdError> { + let tx = self.env.tx_rw()?; + let block = data.block_number(); + + tx.queue_put::(&block, &data.header)?; + tx.queue_put::(&data.header.hash_slow(), &block)?; + + for (idx, tx_signed) in data.transactions.iter().enumerate() { + let tx_idx = idx as u64; + tx.queue_put_dual::(&block, &tx_idx, tx_signed)?; + tx.queue_put::(tx_signed.hash(), &TxLocation::new(block, tx_idx))?; + } + + for (idx, receipt) in data.receipts.iter().enumerate() { + tx.queue_put_dual::(&block, &(idx as u64), receipt)?; + } + + for (idx, event) in data.signet_events.iter().enumerate() { + tx.queue_put_dual::(&block, &(idx as u64), event)?; + } + + if let Some(zh) = &data.zenith_header { + tx.queue_put::(&block, zh)?; + } + + // Update block bounds + let current_latest: Option = TableTraverse::::exact( + &mut tx.new_cursor::()?, + &MetadataKey::LatestBlock, + )?; + tx.queue_put::( + &MetadataKey::LatestBlock, + ¤t_latest.map_or(block, |prev| prev.max(block)), + )?; + + let current_earliest: Option = TableTraverse::::exact( + &mut tx.new_cursor::()?, + &MetadataKey::EarliestBlock, + )?; + tx.queue_put::( + &MetadataKey::EarliestBlock, + ¤t_earliest.map_or(block, |prev| prev.min(block)), + )?; + + tx.raw_commit()?; + Ok(()) + } + + fn truncate_above_inner(&self, block: BlockNumber) -> Result<(), MdbxColdError> { + let tx = self.env.tx_rw()?; + + // Collect headers above the cutoff + let headers_to_remove = { + let mut cursor = tx.new_cursor::()?; + let mut headers: Vec<(BlockNumber, Header)> = Vec::new(); + + let start_block = block + 1; + let mut key_buf = [0u8; MAX_KEY_SIZE]; + let key_bytes = start_block.encode_key(&mut key_buf); + + if let Some((key, value)) = KvTraverse::<_>::lower_bound(&mut cursor, key_bytes)? { + headers.push((BlockNumber::decode_key(&key)?, Header::decode_value(&value)?)); + + while let Some((key, value)) = KvTraverse::<_>::read_next(&mut cursor)? { + headers.push((BlockNumber::decode_key(&key)?, Header::decode_value(&value)?)); + } + } + headers + }; + + if headers_to_remove.is_empty() { + return Ok(()); + } + + // Delete each block's data + for (block_num, header) in &headers_to_remove { + // Delete transaction hash indices + { + let mut tx_cursor = tx.new_cursor::()?; + for item in + DualTableTraverse::::iter_k2(&mut tx_cursor, block_num)? + { + let (_, tx_signed): (u64, TransactionSigned) = item?; + tx.queue_delete::(tx_signed.hash())?; + } + } + + tx.queue_delete::(block_num)?; + tx.queue_delete::(&header.hash_slow())?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.clear_k1_for::(block_num)?; + tx.queue_delete::(block_num)?; + } + + // Update latest block + { + let mut cursor = tx.new_cursor::()?; + match KvTraverse::<_>::last(&mut cursor)? { + Some((key, _)) => { + tx.queue_put::( + &MetadataKey::LatestBlock, + &BlockNumber::decode_key(&key)?, + )?; + } + None => { + tx.queue_delete::(&MetadataKey::LatestBlock)?; + } + } + } + + tx.raw_commit()?; + Ok(()) + } +} + +impl ColdStorage for MdbxColdBackend { + async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult> { + let Some(block_num) = self.resolve_header_spec(spec)? else { + return Ok(None); + }; + Ok(self.get_header_by_number(block_num)?) + } + + async fn get_headers(&self, specs: Vec) -> ColdResult>> { + specs + .into_iter() + .map(|spec| { + self.resolve_header_spec(spec)? + .map(|n| self.get_header_by_number(n)) + .transpose() + .map(Option::flatten) + }) + .collect::>() + .map_err(Into::into) + } + + async fn get_transaction( + &self, + spec: TransactionSpecifier, + ) -> ColdResult> { + let Some((block, index)) = self.resolve_tx_spec(spec)? else { + return Ok(None); + }; + Ok(self.get_transaction_by_location(block, index)?) + } + + async fn get_transactions_in_block( + &self, + block: BlockNumber, + ) -> ColdResult> { + Ok(self.collect_transactions_in_block(block)?) + } + + async fn get_transaction_count(&self, block: BlockNumber) -> ColdResult { + Ok(self.count_transactions_in_block(block)?) + } + + async fn get_receipt(&self, spec: ReceiptSpecifier) -> ColdResult> { + let Some((block, index)) = self.resolve_receipt_spec(spec)? else { + return Ok(None); + }; + Ok(self.get_receipt_by_location(block, index)?) + } + + async fn get_receipts_in_block(&self, block: BlockNumber) -> ColdResult> { + Ok(self.collect_receipts_in_block(block)?) + } + + async fn get_signet_events( + &self, + spec: SignetEventsSpecifier, + ) -> ColdResult> { + let events = match spec { + SignetEventsSpecifier::Block(block) => self.collect_signet_events_in_block(block)?, + SignetEventsSpecifier::BlockRange { start, end } => { + self.collect_signet_events_in_range(start, end)? + } + }; + Ok(events) + } + + async fn get_zenith_header( + &self, + spec: ZenithHeaderSpecifier, + ) -> ColdResult> { + let block = match spec { + ZenithHeaderSpecifier::Number(n) => n, + ZenithHeaderSpecifier::Range { start, .. } => start, + }; + Ok(self.get_zenith_header_by_number(block)?) + } + + async fn get_zenith_headers( + &self, + spec: ZenithHeaderSpecifier, + ) -> ColdResult> { + let headers = match spec { + ZenithHeaderSpecifier::Number(n) => { + self.get_zenith_header_by_number(n)?.into_iter().collect() + } + ZenithHeaderSpecifier::Range { start, end } => { + self.collect_zenith_headers_in_range(start, end)? + } + }; + Ok(headers) + } + + async fn get_latest_block(&self) -> ColdResult> { + Ok(self.get_metadata(MetadataKey::LatestBlock)?) + } + + async fn append_block(&self, data: BlockData) -> ColdResult<()> { + Ok(self.append_block_inner(data)?) + } + + async fn append_blocks(&self, data: Vec) -> ColdResult<()> { + for block_data in data { + self.append_block_inner(block_data)?; + } + Ok(()) + } + + async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> { + Ok(self.truncate_above_inner(block)?) + } +} + +#[cfg(all(test, feature = "test-utils"))] +mod tests { + use super::*; + use signet_cold::conformance::conformance; + use tempfile::tempdir; + + #[tokio::test] + async fn mdbx_backend_conformance() { + let dir = tempdir().unwrap(); + let backend = MdbxColdBackend::open_rw(dir.path()).unwrap(); + conformance(&backend).await.unwrap(); + } +} diff --git a/crates/cold-mdbx/src/error.rs b/crates/cold-mdbx/src/error.rs index 1aa5fe6..8cc29fa 100644 --- a/crates/cold-mdbx/src/error.rs +++ b/crates/cold-mdbx/src/error.rs @@ -8,4 +8,18 @@ pub enum MdbxColdError { /// A serialization/deserialization error occurred. #[error("serialization error: {0}")] Ser(#[from] DeserError), + + /// An MDBX error occurred. + #[error("mdbx error: {0}")] + Mdbx(#[from] signet_hot_mdbx::MdbxError), + + /// Database is read-only. + #[error("database is read-only")] + ReadOnly, +} + +impl From for signet_cold::ColdStorageError { + fn from(error: MdbxColdError) -> Self { + Self::Backend(Box::new(error)) + } } diff --git a/crates/cold-mdbx/src/lib.rs b/crates/cold-mdbx/src/lib.rs index b34887f..0093203 100644 --- a/crates/cold-mdbx/src/lib.rs +++ b/crates/cold-mdbx/src/lib.rs @@ -1,4 +1,4 @@ -//! MDBX table definitions for cold storage. +//! MDBX table definitions and backend for cold storage. //! //! This crate provides table definitions for storing historical blockchain data //! in MDBX. It defines 8 tables: @@ -40,3 +40,8 @@ pub use tables::{ ColdBlockHashIndex, ColdHeaders, ColdMetadata, ColdReceipts, ColdSignetEvents, ColdTransactions, ColdTxHashIndex, ColdZenithHeaders, MetadataKey, }; + +mod backend; +pub use backend::MdbxColdBackend; + +pub use signet_hot_mdbx::{DatabaseArguments, DatabaseEnvKind};