Skip to content

Commit 82788ee

Browse files
committed
[lafere] refactor to prepare to allow server requests #1
1 parent 47f8c51 commit 82788ee

File tree

11 files changed

+850
-1043
lines changed

11 files changed

+850
-1043
lines changed

lafere/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl<S> ReconStrat<S> {
4545

4646
/// A connection to a server
4747
pub struct Connection<P> {
48-
sender: Sender<P>,
48+
sender: Sender<P, Config>,
4949
task: TaskHandle,
5050
}
5151

@@ -82,7 +82,7 @@ impl<P> Connection<P> {
8282
}
8383

8484
/// Creates a new Stream.
85-
pub(crate) fn new_raw(sender: Sender<P>, task: TaskHandle) -> Self {
85+
pub(crate) fn new_raw(sender: Sender<P, Config>, task: TaskHandle) -> Self {
8686
Self { sender, task }
8787
}
8888

lafere/src/encrypted/stream.rs

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,54 @@
11
use super::handshake::{client_handshake, server_handshake};
22
use crate::client::{Config as ClientConfig, Connection as Client, ReconStrat};
33
use crate::error::TaskError;
4-
use crate::handler::{SendBack, TaskHandle, client, server};
4+
use crate::handler::TaskHandle;
5+
use crate::handler::handler::{
6+
Handler, PacketStream, bg_stream, bg_stream_reconnect,
7+
};
58
use crate::packet::builder::{PacketReceiver, PacketReceiverError};
69
use crate::packet::{EncryptedBytes, Packet};
710
use crate::server::{Config as ServerConfig, Connection as Server};
8-
use crate::util::{ByteStream, TimeoutReader, watch};
11+
use crate::util::{ByteStream, TimeoutReader};
912

1013
use std::io;
1114

1215
use tokio::io::AsyncWriteExt;
1316
use tokio::sync::oneshot;
14-
use tokio::time::{Duration, MissedTickBehavior, interval};
17+
use tokio::time::Duration;
1518

1619
use crypto::cipher::Key;
1720
use crypto::signature as sign;
1821

1922
pub fn client<S, P>(
2023
stream: S,
2124
cfg: ClientConfig,
22-
mut recon_strat: Option<ReconStrat<S>>,
25+
recon_strat: Option<ReconStrat<S>>,
2326
sign: sign::PublicKey,
2427
) -> Client<P>
2528
where
2629
S: ByteStream,
2730
P: Packet<EncryptedBytes> + Send + 'static,
2831
P::Header: Send,
2932
{
30-
let (sender, mut cfg_rx, mut bg_handler) = client::Handler::new(cfg);
33+
let (sender, _, mut cfg_rx, mut bg_handler) = Handler::new(cfg, false);
3134

3235
let (tx_close, mut rx_close) = oneshot::channel();
3336
let task = tokio::spawn(async move {
34-
client_bg_reconnect!(client_bg_stream(
37+
bg_stream_reconnect(
3538
stream,
36-
bg_handler,
37-
cfg_rx,
38-
rx_close,
39+
&mut bg_handler,
40+
&mut cfg_rx,
41+
&mut rx_close,
42+
|stream: &mut EncryptedPacketStream<_, _>, cfg| {
43+
stream.stream.set_timeout(cfg.timeout);
44+
stream.builder.set_body_limit(cfg.body_limit);
45+
},
3946
recon_strat,
4047
|stream, cfg| {
41-
PacketStream::client(stream, sign.clone(), cfg).await
42-
}
43-
));
48+
EncryptedPacketStream::client(stream, sign.clone(), cfg)
49+
},
50+
)
51+
.await
4452
});
4553

4654
let task = TaskHandle {
@@ -61,17 +69,22 @@ where
6169
P: Packet<EncryptedBytes> + Send + 'static,
6270
P::Header: Send,
6371
{
64-
let (receiver, mut cfg_rx, mut bg_handler) = server::Handler::new(cfg);
72+
let (_, receiver, mut cfg_rx, mut bg_handler) = Handler::new(cfg, true);
6573

6674
let (tx_close, mut rx_close) = oneshot::channel();
6775
let task = tokio::spawn(async move {
6876
let stream =
69-
PacketStream::server(stream, sign, cfg_rx.newest()).await?;
70-
let r = server_bg_stream(
77+
EncryptedPacketStream::server(stream, sign, cfg_rx.newest())
78+
.await?;
79+
let r = bg_stream(
7180
stream,
7281
&mut bg_handler,
7382
&mut cfg_rx,
7483
&mut rx_close,
84+
|stream, cfg| {
85+
stream.stream.set_timeout(cfg.timeout);
86+
stream.builder.set_body_limit(cfg.body_limit);
87+
},
7588
)
7689
.await;
7790

@@ -91,7 +104,7 @@ where
91104
}
92105

93106
/// inner manages a stream
94-
struct PacketStream<S, P>
107+
struct EncryptedPacketStream<S, P>
95108
where
96109
S: ByteStream,
97110
P: Packet<EncryptedBytes>,
@@ -103,15 +116,11 @@ where
103116
builder: PacketReceiver<P, EncryptedBytes>,
104117
}
105118

106-
impl<S, P> PacketStream<S, P>
119+
impl<S, P> EncryptedPacketStream<S, P>
107120
where
108121
S: ByteStream,
109122
P: Packet<EncryptedBytes>,
110123
{
111-
fn timeout(&self) -> Duration {
112-
self.stream.timeout()
113-
}
114-
115124
async fn client(
116125
stream: S,
117126
sign: sign::PublicKey,
@@ -141,6 +150,16 @@ where
141150
builder: PacketReceiver::new(cfg.body_limit),
142151
})
143152
}
153+
}
154+
155+
impl<S, P> PacketStream<P, EncryptedBytes> for EncryptedPacketStream<S, P>
156+
where
157+
S: ByteStream,
158+
P: Packet<EncryptedBytes>,
159+
{
160+
fn timeout(&self) -> Duration {
161+
self.stream.timeout()
162+
}
144163

145164
async fn send(&mut self, packet: P) -> io::Result<()> {
146165
let mut bytes = packet.into_bytes();
@@ -172,24 +191,10 @@ where
172191
}
173192
}
174193

175-
bg_stream!(
176-
client_bg_stream,
177-
client::Handler<P, EncryptedBytes>,
178-
EncryptedBytes,
179-
ClientConfig
180-
);
181-
bg_stream!(
182-
server_bg_stream,
183-
server::Handler<P, EncryptedBytes>,
184-
EncryptedBytes,
185-
ServerConfig
186-
);
187-
188194
#[cfg(test)]
189195
mod tests {
190196
use super::*;
191-
use crate::packet::test::TestPacket;
192-
use crate::server::Message;
197+
use crate::{packet::test::TestPacket, server::Request};
193198
use crypto::signature::Keypair;
194199

195200
use tokio::net::{TcpListener, TcpStream};
@@ -239,7 +244,7 @@ mod tests {
239244
// let's receive a request message
240245
let req = bob.receive().await.unwrap();
241246
match req {
242-
Message::Request(req, resp) => {
247+
Request::Request(req, resp) => {
243248
assert_eq!(req.num1, 1);
244249
assert_eq!(req.num2, 2);
245250

@@ -252,7 +257,7 @@ mod tests {
252257

253258
let req = bob.receive().await.unwrap();
254259
match req {
255-
Message::RequestReceiver(req, stream) => {
260+
Request::RequestReceiver(req, stream) => {
256261
assert_eq!(req.num1, 5);
257262
assert_eq!(req.num2, 6);
258263

@@ -268,7 +273,7 @@ mod tests {
268273

269274
let req = bob.receive().await.unwrap();
270275
match req {
271-
Message::RequestSender(req, mut stream) => {
276+
Request::RequestSender(req, mut stream) => {
272277
assert_eq!(req.num1, 11);
273278
assert_eq!(req.num2, 12);
274279

0 commit comments

Comments
 (0)