Skip to content

Commit 4e60ff0

Browse files
committed
connection offset
1 parent b8732c3 commit 4e60ff0

File tree

14 files changed

+84
-78
lines changed

14 files changed

+84
-78
lines changed

duva-client/src/broker/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Broker {
102102
} else {
103103
context.callback(ServerResponse::Err {
104104
reason: "Failed to route command. Try again after ttl time".to_string(),
105-
request_id: 0,
105+
conn_offset: 0,
106106
})
107107
};
108108
},
@@ -292,7 +292,10 @@ impl Broker {
292292
// ! otherwise, server will not be able to process the next command
293293

294294
match res {
295-
ServerResponse::ReadRes { res: QueryIO::BulkString(..), request_id } => {
295+
ServerResponse::ReadRes {
296+
res: QueryIO::BulkString(..),
297+
conn_offset: request_id,
298+
} => {
296299
connection.request_id = connection.request_id.max(*request_id);
297300
},
298301
ServerResponse::WriteRes { res: QueryIO::BulkString(..), log_index, .. } => {

duva-client/src/broker/node_connections.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ impl NodeConnection {
9797
}
9898

9999
pub(crate) async fn send(&self, client_action: ClientAction) -> anyhow::Result<()> {
100-
let session_request = SessionRequest { request_id: self.request_id, action: client_action };
100+
let session_request =
101+
SessionRequest { conn_offset: self.request_id, action: client_action };
101102
self.writer
102103
.send(MsgToServer::Command(bincode::encode_to_vec(session_request, SERDE_CONFIG)?))
103104
.await

duva-client/src/command.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl CommandQueue {
3535

3636
let result = context
3737
.get_result()
38-
.unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), request_id: 0 });
38+
.unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), conn_offset: 0 });
3939
context.callback(result);
4040
}
4141
}
@@ -84,19 +84,21 @@ impl InputContext {
8484
ClientAction::NonMutating(Keys { pattern: _ } | MGet { keys: _ }) => {
8585
let mut init = QueryIO::Array(Vec::with_capacity(iterator.len()));
8686

87-
while let Some(ServerResponse::ReadRes { res, request_id }) = iterator.next() {
87+
while let Some(ServerResponse::ReadRes { res, conn_offset: request_id }) =
88+
iterator.next()
89+
{
8890
init = init.merge(res)?;
8991
highest_req_id = highest_req_id.max(request_id);
9092
}
91-
Ok(ServerResponse::ReadRes { res: init, request_id: highest_req_id })
93+
Ok(ServerResponse::ReadRes { res: init, conn_offset: highest_req_id })
9294
},
9395

9496
ClientAction::NonMutating(Exists { keys: _ }) => {
9597
let mut count = 0;
9698

9799
while let Some(ServerResponse::ReadRes {
98100
res: QueryIO::BulkString(byte),
99-
request_id,
101+
conn_offset: request_id,
100102
}) = iterator.next()
101103
{
102104
let num = String::from_utf8(byte.to_vec())
@@ -109,15 +111,15 @@ impl InputContext {
109111

110112
Ok(ServerResponse::ReadRes {
111113
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
112-
request_id: highest_req_id,
114+
conn_offset: highest_req_id,
113115
})
114116
},
115117
ClientAction::Mutating(LogEntry::Delete { keys: _ }) => {
116118
let mut count = 0;
117119

118120
while let Some(ServerResponse::WriteRes {
119121
res: QueryIO::BulkString(value),
120-
request_id,
122+
conn_offset: request_id,
121123
..
122124
}) = iterator.next()
123125
{
@@ -128,7 +130,7 @@ impl InputContext {
128130
Ok(ServerResponse::WriteRes {
129131
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
130132
log_index: 0, // TODO
131-
request_id: highest_req_id,
133+
conn_offset: highest_req_id,
132134
})
133135
},
134136
_ => iterator.next().ok_or(anyhow::anyhow!("Expected exactly one result")),

duva/src/domains/cluster_actors/actor.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::ClusterCommand;
2-
use super::ConsensusClientResponse;
3-
use super::ConsensusReq;
2+
use super::ConsensusRequest;
3+
use super::ConsensusResponse;
44
use super::LazyOption;
55
use super::hash_ring::HashRing;
66
pub mod client_sessions;
@@ -325,9 +325,9 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
325325
}
326326
}
327327

328-
pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusReq) {
328+
pub(crate) async fn leader_req_consensus(&mut self, req: ConsensusRequest) {
329329
if !self.replication.is_leader() {
330-
req.callback.send(ConsensusClientResponse::Result {
330+
req.callback.send(ConsensusResponse::Result {
331331
res: Err(anyhow::anyhow!("Write given to follower")),
332332
log_index: self.replication.last_log_index(),
333333
});
@@ -337,7 +337,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
337337
if self.client_sessions.is_processed(&req.conn_offset) {
338338
// mapping between early returned values to client result
339339
let key = req.entry.all_keys().into_iter().map(String::from).collect();
340-
req.callback.send(ConsensusClientResponse::AlreadyProcessed {
340+
req.callback.send(ConsensusResponse::AlreadyProcessed {
341341
key,
342342
// TODO : remove unwrap
343343
request_id: req.conn_offset.unwrap().offset,
@@ -358,22 +358,22 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
358358
// To notify client's of what keys have been moved.
359359
// ! Still, client won't know where the key has been moved. The assumption here is client SHOULD have correct hashring information.
360360
let moved_keys = replids.except(&self.log_state().replid).join(" ");
361-
req.callback.send(ConsensusClientResponse::Result {
361+
req.callback.send(ConsensusResponse::Result {
362362
res: Err(anyhow::anyhow!("Moved! {moved_keys}")),
363363
log_index: self.replication.last_log_index(),
364364
})
365365
},
366366
Err(err) => {
367367
err!("{}", err);
368-
req.callback.send(ConsensusClientResponse::Result {
368+
req.callback.send(ConsensusResponse::Result {
369369
res: Err(anyhow::anyhow!(err)),
370370
log_index: self.replication.last_log_index(),
371371
});
372372
},
373373
}
374374
}
375375

376-
async fn req_consensus(&mut self, req: ConsensusReq, send_in_mills: Option<u64>) {
376+
async fn req_consensus(&mut self, req: ConsensusRequest, send_in_mills: Option<u64>) {
377377
let log_index = self.replication.write_single_entry(
378378
req.entry,
379379
self.log_state().term,
@@ -387,7 +387,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
387387
self.replication.increase_con_idx_by(1);
388388
let _ = self.replication.flush();
389389
let res = self.commit_entry(entry.entry, log_index).await;
390-
req.callback.send(ConsensusClientResponse::Result { res, log_index });
390+
req.callback.send(ConsensusResponse::Result { res, log_index });
391391
return;
392392
}
393393
self.consensus_tracker
@@ -959,7 +959,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
959959
let res = self.commit_entry(log_entry.entry, log_index).await;
960960
let _ = self.replication.flush();
961961

962-
voting.callback.send(ConsensusClientResponse::Result { res, log_index });
962+
voting.callback.send(ConsensusResponse::Result { res, log_index });
963963
}
964964

965965
async fn commit_entry(&mut self, entry: LogEntry, index: u64) -> anyhow::Result<QueryIO> {
@@ -1340,7 +1340,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
13401340
}
13411341

13421342
let (callback, rx) = Callback::create();
1343-
let req = ConsensusReq {
1343+
let req = ConsensusRequest {
13441344
entry: LogEntry::MSet { entries: migrate_batch.entries.clone() },
13451345
callback,
13461346
conn_offset: None,
@@ -1365,7 +1365,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
13651365
});
13661366
}
13671367

1368-
async fn make_consensus_in_batch(&mut self, req: ConsensusReq) {
1368+
async fn make_consensus_in_batch(&mut self, req: ConsensusRequest) {
13691369
self.req_consensus(req, None).await;
13701370
let _ = self.replication.flush();
13711371
self.send_rpc().await;
@@ -1384,7 +1384,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
13841384

13851385
// make consensus request for delete
13861386
let (callback, rx) = Callback::create();
1387-
let req = ConsensusReq {
1387+
let req = ConsensusRequest {
13881388
entry: LogEntry::Delete { keys: pending_migration_batch.keys.clone() },
13891389
callback,
13901390
conn_offset: None,

duva/src/domains/cluster_actors/actor/tests/elections.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ async fn test_become_candidate_not_allow_write_request_processing() {
233233
let (tx, rx) = Callback::create();
234234
let session_req = ConnectionOffset::new(1, "client1".to_string());
235235

236-
let consensus_request = ConsensusReq {
236+
let consensus_request = ConsensusRequest {
237237
entry: LogEntry::Set { entry: CacheEntry::new("key".to_string(), "value") },
238238
callback: tx,
239239
conn_offset: Some(session_req),
@@ -248,5 +248,5 @@ async fn test_become_candidate_not_allow_write_request_processing() {
248248
assert!(value.is_null());
249249

250250
let res = rx.0.await.unwrap();
251-
assert!(matches!(res, ConsensusClientResponse::Result { res: Err(_), log_index: _ }))
251+
assert!(matches!(res, ConsensusResponse::Result { res: Err(_), log_index: _ }))
252252
}

duva/src/domains/cluster_actors/actor/tests/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,10 @@ impl Helper {
237237
}
238238

239239
fn consensus_request(
240-
callback: Callback<ConsensusClientResponse>,
240+
callback: Callback<ConsensusResponse>,
241241
session_req: Option<ConnectionOffset>,
242-
) -> ConsensusReq {
243-
ConsensusReq {
242+
) -> ConsensusRequest {
243+
ConsensusRequest {
244244
entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") },
245245
callback,
246246
conn_offset: session_req,

duva/src/domains/cluster_actors/actor/tests/replications.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ async fn req_consensus_inserts_consensus_voting() {
433433
let session_request = ConnectionOffset::new(1, client_id);
434434
let w_req = LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") };
435435
let consensus_request =
436-
ConsensusReq { entry: w_req.clone(), callback, conn_offset: Some(session_request.clone()) };
436+
ConsensusRequest { entry: w_req.clone(), callback, conn_offset: Some(session_request.clone()) };
437437

438438
// WHEN
439439
leader_c_actor.req_consensus(consensus_request, None).await;
@@ -462,7 +462,7 @@ async fn test_leader_req_consensus_early_return_when_already_processed_session_r
462462
tokio::spawn(cluster_actor.handle());
463463
let (callback, rx) = Callback::create();
464464
handler
465-
.send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(ConsensusReq {
465+
.send(ClusterCommand::Client(ClusterClientRequest::MakeConsensus(ConsensusRequest {
466466
entry: LogEntry::Set { entry: CacheEntry::new("foo".to_string(), "bar") },
467467
callback,
468468
conn_offset: Some(client_req),
@@ -619,7 +619,7 @@ async fn test_leader_req_consensus_with_processed_session() {
619619

620620
// WHEN - send request with already processed session
621621
let (tx, rx) = Callback::create();
622-
let consensus_request = ConsensusReq {
622+
let consensus_request = ConsensusRequest {
623623
entry: LogEntry::Set { entry: CacheEntry::new("test_key".to_string(), "test_value") },
624624
callback: tx,
625625
conn_offset: Some(session_req),
@@ -632,7 +632,7 @@ async fn test_leader_req_consensus_with_processed_session() {
632632
assert_eq!(cluster_actor.log_state().last_log_index, 0);
633633

634634
// Verify the response indicates already processed
635-
let ConsensusClientResponse::AlreadyProcessed { key, request_id: 1 } = rx.recv().await else {
635+
let ConsensusResponse::AlreadyProcessed { key, request_id: 1 } = rx.recv().await else {
636636
panic!("Expected AlreadyProcessed response");
637637
};
638638
assert_eq!(key, vec!["test_key".to_string()]);

duva/src/domains/cluster_actors/command.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ pub enum ClusterClientRequest {
5757
ReplicationState(Callback<ReplicationState>),
5858
Forget(PeerIdentifier, Callback<Option<()>>),
5959
ReplicaOf(PeerIdentifier, Callback<anyhow::Result<()>>),
60-
MakeConsensus(ConsensusReq),
60+
MakeConsensus(ConsensusRequest),
6161
ClusterNodes(Callback<Vec<ReplicationState>>),
6262
GetRoles(Callback<Vec<(PeerIdentifier, ReplicationRole)>>),
6363
SubscribeToTopologyChange(Callback<tokio::sync::broadcast::Receiver<Topology>>),
@@ -74,13 +74,6 @@ impl From<ClusterClientRequest> for ClusterCommand {
7474
}
7575
}
7676

77-
#[derive(Debug, PartialEq, Eq)]
78-
pub(crate) struct ConsensusReq {
79-
pub(crate) entry: LogEntry,
80-
pub(crate) callback: Callback<ConsensusClientResponse>,
81-
pub(crate) conn_offset: Option<ConnectionOffset>,
82-
}
83-
8477
#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)]
8578
pub struct ConnectionOffset {
8679
pub(crate) offset: u64,
@@ -92,8 +85,15 @@ impl ConnectionOffset {
9285
}
9386
}
9487

88+
#[derive(Debug, PartialEq, Eq)]
89+
pub(crate) struct ConsensusRequest {
90+
pub(crate) entry: LogEntry,
91+
pub(crate) callback: Callback<ConsensusResponse>,
92+
pub(crate) conn_offset: Option<ConnectionOffset>,
93+
}
94+
9595
#[derive(Debug)]
96-
pub(crate) enum ConsensusClientResponse {
96+
pub(crate) enum ConsensusResponse {
9797
AlreadyProcessed { key: Vec<String>, request_id: u64 },
9898
Result { res: anyhow::Result<QueryIO>, log_index: u64 },
9999
}

duva/src/domains/peers/command.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::{
22
domains::{
33
caches::cache_objects::CacheEntry,
4-
cluster_actors::{ClusterCommand, ConsensusReq, hash_ring::HashRing},
4+
cluster_actors::{ClusterCommand, ConsensusRequest, hash_ring::HashRing},
55
replications::{ReplicationId, ReplicationState, WriteOperation},
66
},
77
prelude::PeerIdentifier,
@@ -188,12 +188,12 @@ pub(crate) struct QueuedKeysToMigrate {
188188

189189
#[derive(Debug, Default)]
190190
pub(crate) struct PendingRequests {
191-
requests: VecDeque<ConsensusReq>,
191+
requests: VecDeque<ConsensusRequest>,
192192
batches: HashMap<BatchId, QueuedKeysToMigrate>,
193193
pub(crate) callbacks: Vec<Callback<()>>,
194194
}
195195
impl PendingRequests {
196-
pub(crate) fn add_req(&mut self, req: ConsensusReq) {
196+
pub(crate) fn add_req(&mut self, req: ConsensusRequest) {
197197
self.requests.push_back(req);
198198
}
199199
pub(crate) fn store_batch(&mut self, id: BatchId, batch: QueuedKeysToMigrate) {
@@ -202,7 +202,7 @@ impl PendingRequests {
202202
pub(crate) fn pop_batch(&mut self, id: &BatchId) -> Option<QueuedKeysToMigrate> {
203203
self.batches.remove(id)
204204
}
205-
pub(crate) fn extract_requests(&mut self) -> VecDeque<ConsensusReq> {
205+
pub(crate) fn extract_requests(&mut self) -> VecDeque<ConsensusRequest> {
206206
std::mem::take(&mut self.requests)
207207
}
208208

duva/src/domains/replications/consensus/log.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
domains::{
3-
cluster_actors::{ConnectionOffset, ConsensusClientResponse},
3+
cluster_actors::{ConnectionOffset, ConsensusResponse},
44
peers::identifier::PeerIdentifier,
55
},
66
make_smart_pointer,
@@ -16,13 +16,13 @@ make_smart_pointer!(LogConsensusTracker, HashMap<u64, LogConsensusVoting>);
1616
#[derive(Debug)]
1717
pub struct LogConsensusVoting {
1818
pub(crate) voters: Vec<PeerIdentifier>,
19-
pub(crate) callback: Callback<ConsensusClientResponse>,
19+
pub(crate) callback: Callback<ConsensusResponse>,
2020
pub(crate) cnt: u8,
2121
pub(crate) conn_offset: Option<ConnectionOffset>,
2222
}
2323
impl LogConsensusVoting {
2424
pub(crate) fn new(
25-
callback: Callback<ConsensusClientResponse>,
25+
callback: Callback<ConsensusResponse>,
2626
replica_count: usize,
2727
conn_offset: Option<ConnectionOffset>,
2828
) -> Self {

0 commit comments

Comments
 (0)