Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ signet-storage = { version = "0.0.1", path = "./crates/storage" }
signet-storage-types = { version = "0.0.1", path = "./crates/types" }

# External, in-house
signet-libmdbx = "0.4.0"
signet-libmdbx = "0.6.0"

signet-zenith = "0.16.0-rc.5"

Expand Down
2 changes: 1 addition & 1 deletion crates/hot-mdbx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bytes.workspace = true
page_size.workspace = true
parking_lot.workspace = true
signet-hot.workspace = true
signet-libmdbx = { workspace = true, features = ["read-tx-timeouts"] }
signet-libmdbx.workspace = true
sysinfo = "0.37.2"
tempfile = { workspace = true, optional = true }
thiserror.workspace = true
Expand Down
31 changes: 19 additions & 12 deletions crates/hot-mdbx/src/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ use signet_hot::{
MAX_FIXED_VAL_SIZE, MAX_KEY_SIZE,
model::{DualKeyTraverse, KvTraverse, KvTraverseMut, RawDualKeyValue, RawKeyValue, RawValue},
};
use signet_libmdbx::{RO, RW, TransactionKind};
use signet_libmdbx::{Ro, Rw, RwSync, TransactionKind, tx::WriteMarker};
use std::{
borrow::Cow,
ops::{Deref, DerefMut},
};

/// Read only Cursor.
pub type CursorRO<'a> = Cursor<'a, RO>;
pub type CursorRo<'a> = Cursor<'a, Ro>;

/// Read write cursor.
pub type CursorRW<'a> = Cursor<'a, RW>;
pub type CursorRw<'a> = Cursor<'a, Rw>;

/// Synchronized read only cursor.
pub type CursorRoSync<'a> = Cursor<'a, signet_libmdbx::RoSync>;

/// Synchronized read write cursor.
pub type CursorRwSync<'a> = Cursor<'a, RwSync>;

/// Cursor wrapper to access KV items.
///
/// The inner cursor type uses `K::Inner` which is the transaction's internal
/// pointer access type:
/// - For `RO`: `K::Inner = RoGuard`
/// - For `RW`: `K::Inner = RwUnsync`
pub struct Cursor<'a, K: TransactionKind> {
/// Inner `libmdbx` cursor.
pub(crate) inner: signet_libmdbx::Cursor<'a, K>,
Expand All @@ -30,13 +41,9 @@ pub struct Cursor<'a, K: TransactionKind> {
buf: [u8; MAX_KEY_SIZE + MAX_FIXED_VAL_SIZE],
}

impl<K: TransactionKind + std::fmt::Debug> std::fmt::Debug for Cursor<'_, K> {
impl<K: TransactionKind> std::fmt::Debug for Cursor<'_, K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Cursor")
.field("inner", &self.inner)
.field("fsi", &self.fsi)
.field("buf", &self.buf)
.finish()
f.debug_struct("Cursor").field("fsi", &self.fsi).finish_non_exhaustive()
}
}

Expand All @@ -48,7 +55,7 @@ impl<'a, K: TransactionKind> Deref for Cursor<'a, K> {
}
}

impl<'a> DerefMut for Cursor<'a, RW> {
impl<'a, K: TransactionKind> DerefMut for Cursor<'a, K> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
Expand Down Expand Up @@ -95,13 +102,13 @@ where
}
}

impl KvTraverseMut<MdbxError> for Cursor<'_, RW> {
impl<K: TransactionKind + WriteMarker> KvTraverseMut<MdbxError> for Cursor<'_, K> {
fn delete_current(&mut self) -> Result<(), MdbxError> {
self.inner.del(Default::default()).map_err(MdbxError::Mdbx)
}
}

impl Cursor<'_, RW> {
impl<K: TransactionKind + WriteMarker> Cursor<'_, K> {
/// Stores multiple contiguous fixed-size data elements in a single request.
///
/// This directly calls MDBX FFI, bypassing the transaction execution wrapper
Expand Down
60 changes: 26 additions & 34 deletions crates/hot-mdbx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@

use parking_lot::RwLock;
use signet_libmdbx::{
Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
MaxReadTransactionDuration, Mode, PageSize, RO, RW, SyncMode, ffi,
Environment, EnvironmentFlags, Geometry, Mode, Ro, RoSync, Rw, RwSync, SyncMode, ffi,
sys::{HandleSlowReadersReturnCode, PageSize},
};
use std::{
collections::HashMap,
Expand All @@ -45,7 +45,7 @@ use std::{
};

mod cursor;
pub use cursor::{Cursor, CursorRO, CursorRW};
pub use cursor::{Cursor, CursorRo, CursorRoSync, CursorRw, CursorRwSync};

mod db_info;
pub use db_info::{FixedSizeInfo, FsiCache};
Expand Down Expand Up @@ -105,8 +105,6 @@ pub struct DatabaseArguments {
/// Database geometry settings.
geometry: Geometry<Range<usize>>,

/// Maximum duration of a read transaction. If [None], the default value is used.
max_read_transaction_duration: Option<MaxReadTransactionDuration>,
/// Open environment in exclusive/monopolistic mode. If [None], the default value is used.
///
/// This can be used as a replacement for `MDB_NOLOCK`, which don't supported by MDBX. In this
Expand Down Expand Up @@ -165,7 +163,6 @@ impl DatabaseArguments {
shrink_threshold: Some(0),
page_size: Some(PageSize::Set(utils::default_page_size())),
},
max_read_transaction_duration: None,
exclusive: None,
max_readers: None,
sync_mode: SyncMode::Durable,
Expand Down Expand Up @@ -206,23 +203,6 @@ impl DatabaseArguments {
self
}

/// Set the maximum duration of a read transaction.
pub const fn max_read_transaction_duration(
&mut self,
max_read_transaction_duration: Option<MaxReadTransactionDuration>,
) {
self.max_read_transaction_duration = max_read_transaction_duration;
}

/// Set the maximum duration of a read transaction.
pub const fn with_max_read_transaction_duration(
mut self,
max_read_transaction_duration: Option<MaxReadTransactionDuration>,
) -> Self {
self.max_read_transaction_duration(max_read_transaction_duration);
self
}

/// Set the mdbx exclusive flag.
pub const fn with_exclusive(mut self, exclusive: Option<bool>) -> Self {
self.exclusive = exclusive;
Expand Down Expand Up @@ -372,28 +352,40 @@ impl DatabaseEnv {
// https://github.com/paradigmxyz/reth/blob/fa2b9b685ed9787636d962f4366caf34a9186e66/crates/storage/libmdbx-rs/mdbx-sys/libmdbx/mdbx.c#L16017.
inner_env.set_rp_augment_limit(256 * 1024);

if let Some(max_read_transaction_duration) = args.max_read_transaction_duration {
inner_env.set_max_read_transaction_duration(max_read_transaction_duration);
}

let fsi_cache = Arc::new(RwLock::new(HashMap::new()));
let env = Self { inner: inner_env.open(path)?, fsi_cache, _lock_file };

Ok(env)
}

/// Start a new read-only transaction.
fn tx(&self) -> Result<Tx<RO>, MdbxError> {
pub fn tx(&self) -> Result<Tx<Ro>, MdbxError> {
self.inner
.begin_ro_txn()
.begin_ro_unsync()
.map(|tx| Tx::new(tx, self.fsi_cache.clone()))
.map_err(MdbxError::Mdbx)
}

/// Start a new read-write transaction.
fn tx_mut(&self) -> Result<Tx<RW>, MdbxError> {
pub fn tx_rw(&self) -> Result<Tx<Rw>, MdbxError> {
self.inner
.begin_rw_unsync()
.map(|tx| Tx::new(tx, self.fsi_cache.clone()))
.map_err(MdbxError::Mdbx)
}

/// Start a new read-only synchronous transaction.
pub fn tx_sync(&self) -> Result<Tx<RoSync>, MdbxError> {
self.inner
.begin_ro_sync()
.map(|tx| Tx::new(tx, self.fsi_cache.clone()))
.map_err(MdbxError::Mdbx)
}

/// Start a new read-write synchronous transaction.
pub fn tx_rw_sync(&self) -> Result<Tx<RwSync>, MdbxError> {
self.inner
.begin_rw_txn()
.begin_rw_sync()
.map(|tx| Tx::new(tx, self.fsi_cache.clone()))
.map_err(MdbxError::Mdbx)
}
Expand All @@ -408,14 +400,14 @@ impl Deref for DatabaseEnv {
}

impl HotKv for DatabaseEnv {
type RoTx = Tx<RO>;
type RwTx = Tx<RW>;
type RoTx = Tx<Ro>;
type RwTx = Tx<Rw>;

fn reader(&self) -> Result<Self::RoTx, HotKvError> {
self.tx().map_err(HotKvError::from_err)
}

fn writer(&self) -> Result<Self::RwTx, HotKvError> {
self.tx_mut().map_err(HotKvError::from_err)
self.tx_rw().map_err(HotKvError::from_err)
}
}
Loading