Skip to content

Commit 2cbcf4f

Browse files
committed
simplified startup
1 parent 4dd16ca commit 2cbcf4f

File tree

5 files changed

+31
-63
lines changed

5 files changed

+31
-63
lines changed

duva/src/adapters/loggers/disk_based.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,10 +414,7 @@ impl TWriteAheadLog for FileOpLogs {
414414
}
415415

416416
/// Replays all existing operations in the op_logs, invoking a callback for each.
417-
fn replay<F>(&mut self, mut replay_handler: F) -> Result<()>
418-
where
419-
F: FnMut(WriteOperation) + Send,
420-
{
417+
fn replay(&mut self, mut replay_handler: &mut dyn FnMut(WriteOperation)) -> Result<()> {
421418
// Replay all segments in order with streaming to avoid loading everything into memory
422419
for segment in self.segments.iter_mut().chain(std::iter::once(&mut self.active_segment)) {
423420
if segment.lookups.is_empty() {
@@ -610,7 +607,7 @@ mod tests {
610607
let mut op_logs = FileOpLogs::new(&path)?;
611608
let mut ops = Vec::new();
612609

613-
op_logs.replay(|op| {
610+
op_logs.replay(&mut |op| {
614611
ops.push(op);
615612
})?;
616613

@@ -657,7 +654,7 @@ mod tests {
657654

658655
assert!(
659656
op_logs
660-
.replay(|op| {
657+
.replay(&mut |op| {
661658
ops.push(op);
662659
})
663660
.is_err()
@@ -775,7 +772,7 @@ mod tests {
775772

776773
// Verify we can read operations from both segments
777774
let mut ops = Vec::new();
778-
op_logs.replay(|op| ops.push(op))?;
775+
op_logs.replay(&mut |op| ops.push(op))?;
779776
assert_eq!(ops.len(), 101);
780777

781778
Ok(())

duva/src/adapters/loggers/memory_based.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,7 @@ impl TWriteAheadLog for MemoryOpLogs {
1414
Ok(())
1515
}
1616

17-
fn replay<F>(&mut self, mut f: F) -> Result<()>
18-
where
19-
F: FnMut(WriteOperation) + Send,
20-
{
17+
fn replay(&mut self, f: &mut dyn FnMut(WriteOperation)) -> Result<()> {
2118
for op in self.writer.iter() {
2219
f(op.clone());
2320
}

duva/src/adapters/loggers/mod.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,15 @@ mod memory_based;
55
pub mod op_logs;
66

77
trait TWriteAheadLog: Send + Sync + 'static {
8-
/// Append one or more `WriteOperation`s to the log.
98
fn write_many(&mut self, ops: Vec<WriteOperation>) -> anyhow::Result<()>;
109

11-
/// Retrieve logs that fall between the current 'commit' index and target 'log' index.
12-
/// This is NOT async as it is expected to be infallible and an in-memory operation.
1310
fn range(&self, start_exclusive: u64, end_inclusive: u64) -> Vec<WriteOperation>;
1411

15-
/// Replays all logged operations from the beginning of the WAL, calling the provided callback `f` for each operation.
16-
/// The callback `f(WriteOperation)` receives each operation in the order it was appended.
17-
fn replay<F>(&mut self, f: F) -> anyhow::Result<()>
18-
where
19-
F: FnMut(WriteOperation) + Send;
12+
fn replay(&mut self, f: &mut dyn FnMut(WriteOperation)) -> anyhow::Result<()>;
2013

21-
/// Retrieves the log at a given index.
2214
fn read_at(&mut self, at: u64) -> Option<WriteOperation>;
2315

24-
/// Returns true if there are no logs. Otherwise, returns false.
2516
fn is_empty(&self) -> bool;
2617

27-
/// Truncate logs that are positioned after `log_index`.
2818
fn truncate_after(&mut self, log_index: u64);
2919
}

duva/src/adapters/loggers/op_logs.rs

Lines changed: 20 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -17,60 +17,48 @@ impl OperationLogs {
1717
pub fn new_ondisk() -> Self {
1818
Self::OnDisk(FileOpLogs::new(ENV.dir.clone()).unwrap())
1919
}
20-
/// Append one or more `WriteOperation`s to the log.
21-
pub(crate) fn write_many(&mut self, ops: Vec<WriteOperation>) -> anyhow::Result<()> {
20+
fn get_mut(&mut self) -> &mut dyn TWriteAheadLog {
21+
match self {
22+
OperationLogs::Memory(m) => m as &mut dyn TWriteAheadLog,
23+
OperationLogs::OnDisk(f) => f as &mut dyn TWriteAheadLog,
24+
}
25+
}
26+
fn get(&self) -> &dyn TWriteAheadLog {
2227
match self {
23-
OperationLogs::Memory(memory_op_logs) => memory_op_logs.write_many(ops),
24-
OperationLogs::OnDisk(file_op_logs) => file_op_logs.write_many(ops),
28+
OperationLogs::Memory(m) => m as &dyn TWriteAheadLog,
29+
OperationLogs::OnDisk(f) => f as &dyn TWriteAheadLog,
2530
}
2631
}
2732

33+
/// Append one or more `WriteOperation`s to the log.
34+
pub(crate) fn write_many(&mut self, ops: Vec<WriteOperation>) -> anyhow::Result<()> {
35+
self.get_mut().write_many(ops)
36+
}
37+
2838
/// Retrieve logs that fall between the current 'commit' index and target 'log' index.
2939
/// This is NOT async as it is expected to be infallible and an in-memory operation.
3040
pub(crate) fn range(&self, start_exclusive: u64, end_inclusive: u64) -> Vec<WriteOperation> {
31-
match self {
32-
OperationLogs::Memory(memory_op_logs) => {
33-
memory_op_logs.range(start_exclusive, end_inclusive)
34-
},
35-
OperationLogs::OnDisk(file_op_logs) => {
36-
file_op_logs.range(start_exclusive, end_inclusive)
37-
},
38-
}
41+
self.get().range(start_exclusive, end_inclusive)
3942
}
4043

4144
/// Replays all logged operations from the beginning of the WAL, calling the provided callback `f` for each operation.
4245
/// The callback `f(WriteOperation)` receives each operation in the order it was appended.
43-
pub(crate) fn replay<F>(&mut self, f: F) -> anyhow::Result<()>
44-
where
45-
F: FnMut(WriteOperation) + Send,
46-
{
47-
match self {
48-
OperationLogs::Memory(memory_op_logs) => memory_op_logs.replay(f),
49-
OperationLogs::OnDisk(file_op_logs) => file_op_logs.replay(f),
50-
}
46+
pub(crate) fn replay(&mut self, f: &mut dyn FnMut(WriteOperation)) -> anyhow::Result<()> {
47+
self.get_mut().replay(f)
5148
}
5249

5350
/// Retrieves the log at a given index.
5451
pub(crate) fn read_at(&mut self, at: u64) -> Option<WriteOperation> {
55-
match self {
56-
OperationLogs::Memory(memory_op_logs) => memory_op_logs.read_at(at),
57-
OperationLogs::OnDisk(file_op_logs) => file_op_logs.read_at(at),
58-
}
52+
self.get_mut().read_at(at)
5953
}
6054

6155
/// Returns true if there are no logs. Otherwise, returns false.
6256
pub(crate) fn is_empty(&self) -> bool {
63-
match self {
64-
OperationLogs::Memory(memory_op_logs) => memory_op_logs.is_empty(),
65-
OperationLogs::OnDisk(file_op_logs) => file_op_logs.is_empty(),
66-
}
57+
self.get().is_empty()
6758
}
6859

6960
/// Truncate logs that are positioned after `log_index`.
7061
pub(crate) fn truncate_after(&mut self, log_index: u64) {
71-
match self {
72-
OperationLogs::Memory(memory_op_logs) => memory_op_logs.truncate_after(log_index),
73-
OperationLogs::OnDisk(file_op_logs) => file_op_logs.truncate_after(log_index),
74-
}
62+
self.get_mut().truncate_after(log_index)
7563
}
7664
}

duva/src/main.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,9 @@ use duva::{ENV, Environment, StartUpFacade, adapters::loggers::op_logs::Operatio
44
async fn main() -> anyhow::Result<()> {
55
let topology_writer = Environment::open_topology_file(ENV.tpp.clone()).await;
66

7-
if ENV.append_only {
8-
let local_aof = OperationLogs::new_ondisk();
9-
let start_up_runner = StartUpFacade::new(local_aof, topology_writer);
10-
start_up_runner.run().await
11-
} else {
12-
let in_memory_aof = OperationLogs::new_inmemory();
13-
let start_up_runner = StartUpFacade::new(in_memory_aof, topology_writer);
14-
start_up_runner.run().await
15-
}
7+
let op_logs =
8+
if ENV.append_only { OperationLogs::new_ondisk() } else { OperationLogs::new_inmemory() };
9+
10+
let start_up_runner = StartUpFacade::new(op_logs, topology_writer);
11+
start_up_runner.run().await
1612
}

0 commit comments

Comments
 (0)