Skip to content

Commit e2273d1

Browse files
committed
borrow decode
1 parent 86afad9 commit e2273d1

File tree

8 files changed

+80
-40
lines changed

8 files changed

+80
-40
lines changed

duva-client/src/broker/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use duva::prelude::tokio::sync::mpsc::Receiver;
1414
use duva::prelude::tokio::sync::mpsc::Sender;
1515
use duva::prelude::uuid::Uuid;
1616
use duva::prelude::{
17-
ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses, ReplicationId,
17+
BytesMut, ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses,
18+
ReplicationId,
1819
};
1920
use duva::prelude::{PeerIdentifier, tokio};
2021
use duva::prelude::{Topology, anyhow};
@@ -115,8 +116,10 @@ impl Broker {
115116
conn_req: ConnectionRequest,
116117
) -> anyhow::Result<(ServerStreamReader, ServerStreamWriter, ConnectionResponse)> {
117118
stream.serialized_write(ConnectionRequests::Authenticate(conn_req)).await?; // client_id not exist
118-
119-
let ConnectionResponses::Authenticated(response) = stream.deserialized_read().await? else {
119+
let mut buffer = BytesMut::new();
120+
let ConnectionResponses::Authenticated(response) =
121+
stream.deserialized_read(&mut buffer).await?
122+
else {
120123
bail!("Authentication failed");
121124
};
122125

@@ -161,7 +164,10 @@ impl Broker {
161164
async fn discover_leader_from(&mut self, follower: PeerIdentifier) -> anyhow::Result<()> {
162165
let mut stream = TcpStream::connect(follower.as_str()).await?;
163166
stream.serialized_write(ConnectionRequests::Discovery).await?;
164-
let ConnectionResponses::Discovery { leader_id } = stream.deserialized_read().await? else {
167+
let mut buffer = BytesMut::new();
168+
let ConnectionResponses::Discovery { leader_id } =
169+
stream.deserialized_read(&mut buffer).await?
170+
else {
165171
bail!("Discovery failed!");
166172
};
167173

duva-client/src/broker/read_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::broker::BrokerMessage;
22
use duva::domains::TSerdeRead;
33

4-
use duva::prelude::ReplicationId;
54
use duva::prelude::tokio::{self, net::tcp::OwnedReadHalf, sync::oneshot};
5+
use duva::prelude::{BytesMut, ReplicationId};
66

77
pub struct ServerStreamReader(pub(crate) OwnedReadHalf);
88
impl ServerStreamReader {
@@ -15,9 +15,9 @@ impl ServerStreamReader {
1515

1616
let future = async move {
1717
let controller_sender = controller_sender.clone();
18-
1918
loop {
20-
match self.0.deserialized_reads().await {
19+
let mut buffer = BytesMut::new();
20+
match self.0.deserialized_reads(&mut buffer).await {
2121
Ok(server_responses) => {
2222
for res in server_responses {
2323
if controller_sender

duva/src/adapters/io/tokio_stream.rs

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ use crate::domains::{
66
TSerdeWrite,
77
};
88

9+
use async_trait::async_trait;
10+
use bincode::BorrowDecode;
11+
912
use bytes::BytesMut;
1013
use std::fmt::Debug;
1114
use std::io::ErrorKind;
@@ -65,7 +68,8 @@ impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerd
6568
Ok(parsed_values)
6669
}
6770
async fn receive_connection_msgs(&mut self) -> Result<String, IoError> {
68-
self.deserialized_read().await
71+
let mut buffer = BytesMut::new();
72+
self.deserialized_read(&mut buffer).await
6973
}
7074
}
7175

@@ -92,35 +96,53 @@ impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSer
9296
}
9397
}
9498

95-
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeRead for T {
96-
async fn deserialized_read<U>(&mut self) -> Result<U, IoError>
99+
#[async_trait]
100+
impl<T> TSerdeRead for T
101+
where
102+
T: AsyncReadExt + Unpin + Sync + Send + Debug + 'static,
103+
{
104+
async fn deserialized_read<'a, U>(&mut self, buffer: &'a mut BytesMut) -> Result<U, IoError>
97105
where
98-
U: bincode::Decode<()>,
106+
U: BorrowDecode<'a, ()> + Send,
99107
{
100-
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
101-
self.read_bytes(&mut buffer).await?;
108+
self.read_bytes(buffer).await?;
102109

103-
let (request, _) = bincode::decode_from_slice(&buffer, SERDE_CONFIG)
104-
.map_err(|e| IoError::Custom(e.to_string()))?;
110+
let (request, _) = bincode::borrow_decode_from_slice(&buffer[..], SERDE_CONFIG).unwrap();
105111

106112
Ok(request)
107113
}
108114

109-
async fn deserialized_reads<U>(&mut self) -> Result<Vec<U>, IoError>
115+
async fn deserialized_reads<'a, U>(
116+
&mut self,
117+
buffer: &'a mut BytesMut,
118+
) -> Result<Vec<U>, IoError>
110119
where
111-
U: bincode::Decode<()>,
120+
U: BorrowDecode<'a, ()> + Send,
112121
{
113-
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
114-
self.read_bytes(&mut buffer).await?;
122+
// Read data from socket into buffer
123+
self.read_bytes(buffer).await?;
115124

116125
let mut parsed_values = Vec::new();
117126

118-
while !buffer.is_empty() {
119-
let (request, size) = bincode::decode_from_slice(&buffer, SERDE_CONFIG)
120-
.map_err(|e| IoError::Custom(e.to_string()))?;
121-
parsed_values.push(request);
122-
buffer = buffer.split_off(size);
127+
// Zero-copy slicing logic
128+
let mut slice = &buffer[..];
129+
130+
// Note: In a real protocol, you need a loop that checks if
131+
// there is enough data for a full frame before decoding.
132+
// This simple loop assumes the buffer contains perfect packets.
133+
while !slice.is_empty() {
134+
match bincode::borrow_decode_from_slice(slice, SERDE_CONFIG) {
135+
Ok((request, size)) => {
136+
parsed_values.push(request);
137+
slice = &slice[size..];
138+
},
139+
Err(_) => {
140+
// Stop if we can't decode anymore (partial packet)
141+
break;
142+
},
143+
}
123144
}
145+
124146
Ok(parsed_values)
125147
}
126148
}
@@ -226,7 +248,8 @@ pub mod test_tokio_stream_impl {
226248
let mut mock = MockAsyncStream::new(vec![encoded_msg.into()]);
227249

228250
// 2. Act
229-
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads().await;
251+
let mut buffer = BytesMut::new();
252+
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads(&mut buffer).await;
230253

231254
// 3. Assert
232255
let deserialized = result.unwrap();
@@ -247,7 +270,8 @@ pub mod test_tokio_stream_impl {
247270
let mut mock = MockAsyncStream::new(vec![raw_data]);
248271

249272
// 2. Act
250-
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads().await;
273+
let mut buffer = BytesMut::new();
274+
let result: Result<Vec<TestMessage>, IoError> = mock.deserialized_reads(&mut buffer).await;
251275

252276
// 3. Assert
253277
let deserialized = result.unwrap();

duva/src/domains/interface.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ use crate::domains::{
77
connections::connection_types::{ReadConnected, WriteConnected},
88
},
99
};
10+
use bincode::BorrowDecode;
11+
use bincode::config::Configuration;
1012
use bytes::BytesMut;
1113

1214
#[async_trait::async_trait]
@@ -33,14 +35,18 @@ pub trait TSerdeWrite {
3335
) -> impl std::future::Future<Output = Result<(), IoError>> + Send;
3436
}
3537

38+
#[async_trait::async_trait]
3639
pub trait TSerdeRead {
37-
fn deserialized_read<U: bincode::Decode<()>>(
38-
&mut self,
39-
) -> impl std::future::Future<Output = Result<U, IoError>> + Send;
40+
async fn deserialized_read<'a, U>(&mut self, buffer: &'a mut BytesMut) -> Result<U, IoError>
41+
where
42+
U: BorrowDecode<'a, ()> + Send; // 'U' lives as long as 'buffer'
4043

41-
fn deserialized_reads<U: bincode::Decode<()>>(
44+
async fn deserialized_reads<'a, U>(
4245
&mut self,
43-
) -> impl std::future::Future<Output = Result<Vec<U>, IoError>> + Send;
46+
buffer: &'a mut BytesMut,
47+
) -> Result<Vec<U>, IoError>
48+
where
49+
U: BorrowDecode<'a, ()> + Send;
4450
}
4551

4652
pub(crate) trait TAsyncReadWrite {

duva/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::domains::replications::*;
1212
use crate::domains::{TSerdeRead, TSerdeWrite};
1313
use crate::signals::SignalHandler;
1414
use anyhow::{Context, Result};
15+
use bytes::BytesMut;
1516
pub use config::Environment;
1617
use domains::IoError;
1718
use domains::caches::cache_manager::CacheManager;
@@ -235,7 +236,8 @@ impl StartUpFacade {
235236
async fn handle_client_stream(&self, stream: tokio::net::TcpStream) -> anyhow::Result<()> {
236237
let (mut read_half, write_half) = stream.into_split();
237238
let mut writer = ClientStreamWriter(write_half);
238-
let request = read_half.deserialized_read().await?;
239+
let mut buffer = BytesMut::new();
240+
let request = read_half.deserialized_read(&mut buffer).await?;
239241

240242
match request {
241243
ConnectionRequests::Discovery => {

duva/src/presentation/clients/authenticate.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@ impl ConnectionRequest {
1919
Ok((client_id.to_string(), self.request_id))
2020
}
2121
}
22-
#[derive(Debug, Clone, PartialEq, Eq, bincode::Decode, bincode::Encode)]
22+
#[derive(Debug, Clone, PartialEq, Eq, bincode::BorrowDecode, bincode::Encode)]
2323
pub enum ConnectionRequests {
2424
Discovery,
2525
Authenticate(ConnectionRequest),
2626
}
27-
#[derive(Debug, Clone, bincode::Decode, bincode::Encode)]
27+
#[derive(Debug, Clone, bincode::BorrowDecode, bincode::Encode)]
2828
pub enum ConnectionResponses {
2929
Discovery { leader_id: PeerIdentifier },
3030
Authenticated(ConnectionResponse),
3131
}
3232

33-
#[derive(Debug, Clone, Default, bincode::Decode, bincode::Encode)]
33+
#[derive(Debug, Clone, Default, bincode::BorrowDecode, bincode::Encode)]
3434
pub struct ConnectionResponse {
3535
pub client_id: String,
3636
pub request_id: u64,

duva/src/presentation/clients/request.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,19 @@ use anyhow::Context;
1212
use chrono::{DateTime, Utc};
1313
use std::str::FromStr;
1414

15-
#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)]
15+
#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)]
1616
pub struct SessionRequest {
1717
pub conn_offset: u64,
1818
pub action: ClientAction,
1919
}
2020

21-
#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)]
21+
#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)]
2222
pub enum ClientAction {
2323
NonMutating(NonMutatingAction),
2424
Mutating(LogEntry),
2525
}
2626

27-
#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::Decode)]
27+
#[derive(Clone, Debug, PartialEq, Eq, bincode::Encode, bincode::BorrowDecode)]
2828
pub enum NonMutatingAction {
2929
Ping,
3030
Echo(String),
@@ -306,7 +306,7 @@ pub struct ClientRequest {
306306
pub(crate) conn_id: String,
307307
}
308308

309-
#[derive(Clone, Debug, bincode::Decode, bincode::Encode)]
309+
#[derive(Clone, Debug, bincode::BorrowDecode, bincode::Encode)]
310310
pub enum ServerResponse {
311311
WriteRes { res: QueryIO, log_index: u64, conn_offset: u64 },
312312
ReadRes { res: QueryIO, conn_offset: u64 },

duva/src/presentation/clients/stream.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use crate::prelude::ConnectionResponse;
1111
use crate::prelude::ConnectionResponses;
1212
use crate::presentation::clients::request::{ClientAction, ServerResponse, SessionRequest};
1313

14+
use bytes::BytesMut;
1415
use tokio::{
1516
net::tcp::{OwnedReadHalf, OwnedWriteHalf},
1617
sync::mpsc::Sender,
@@ -31,7 +32,8 @@ impl ClientStreamReader {
3132
) {
3233
loop {
3334
// * extract queries
34-
let query_ios = self.r.deserialized_reads::<SessionRequest>().await;
35+
let mut buffer = BytesMut::new();
36+
let query_ios = self.r.deserialized_reads::<SessionRequest>(&mut buffer).await;
3537
if let Err(err) = query_ios {
3638
info!("{}", err);
3739
if err.should_break() {

0 commit comments

Comments
 (0)