Skip to content

Commit db51e04

Browse files
committed
[lafere] enable basic server requests #1
1 parent 82788ee commit db51e04

File tree

12 files changed

+473
-86
lines changed

12 files changed

+473
-86
lines changed

lafere/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ name = "request_response"
1313
test = true
1414
required-features = ["connection"]
1515

16+
[[example]]
17+
name = "server_requests"
18+
test = true
19+
required-features = ["connection"]
20+
1621
[features]
1722
default = ["connection"]
1823
connection = ["tokio"]

lafere/examples/request_response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,6 @@ async fn main() {
168168
}
169169

170170
#[test]
171-
fn run_main() {
171+
fn request_response_main() {
172172
main();
173173
}

lafere/examples/server_requests.rs

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
use std::time::Duration;
2+
3+
use tokio::io;
4+
5+
use lafere::packet::{
6+
BodyBytes, BodyBytesMut, Flags, Packet, PacketBytes, PacketError,
7+
PacketHeader,
8+
};
9+
use lafere::server::Message;
10+
use lafere::{client, server};
11+
12+
use bytes::{Bytes, BytesMut, BytesRead, BytesWrite};
13+
14+
#[derive(Debug, Clone)]
15+
struct MyPacket<B> {
16+
header: MyHeader,
17+
bytes: B,
18+
}
19+
20+
#[derive(Debug, Default, Clone, PartialEq, Eq)]
21+
struct MyHeader {
22+
length: u32,
23+
flags: Flags,
24+
id: u32,
25+
}
26+
27+
impl<B> MyPacket<B>
28+
where
29+
B: PacketBytes,
30+
{
31+
pub fn empty() -> Self {
32+
Self {
33+
header: MyHeader::default(),
34+
bytes: B::new(MyHeader::LEN as usize),
35+
}
36+
}
37+
38+
pub fn body(&self) -> BodyBytes<'_> {
39+
self.bytes.body()
40+
}
41+
42+
pub fn body_mut(&mut self) -> BodyBytesMut<'_> {
43+
self.bytes.body_mut()
44+
}
45+
}
46+
47+
impl<B> Packet<B> for MyPacket<B>
48+
where
49+
B: PacketBytes,
50+
{
51+
type Header = MyHeader;
52+
53+
fn header(&self) -> &Self::Header {
54+
&self.header
55+
}
56+
57+
fn header_mut(&mut self) -> &mut Self::Header {
58+
&mut self.header
59+
}
60+
61+
fn empty() -> Self {
62+
Self {
63+
header: MyHeader::default(),
64+
bytes: B::new(MyHeader::LEN as usize),
65+
}
66+
}
67+
68+
fn from_bytes_and_header(
69+
bytes: B,
70+
header: Self::Header,
71+
) -> Result<Self, PacketError> {
72+
Ok(Self { header, bytes })
73+
}
74+
75+
fn into_bytes(mut self) -> B {
76+
self.header.length = self.bytes.body().len() as u32;
77+
self.header.write_to(self.bytes.header_mut());
78+
self.bytes
79+
}
80+
}
81+
82+
impl MyHeader {
83+
fn write_to(&self, mut bytes: BytesMut) {
84+
bytes.write_u32(self.length);
85+
bytes.write_u8(self.flags.as_u8());
86+
bytes.write_u32(self.id);
87+
}
88+
}
89+
90+
impl PacketHeader for MyHeader {
91+
const LEN: u32 = 4 + 4 + 1;
92+
93+
fn from_bytes(mut bytes: Bytes) -> Result<Self, PacketError> {
94+
Ok(Self {
95+
length: bytes.read_u32(),
96+
flags: Flags::from_u8(bytes.read_u8())?,
97+
id: bytes.read_u32(),
98+
})
99+
}
100+
101+
fn body_len(&self) -> u32 {
102+
self.length
103+
}
104+
105+
fn flags(&self) -> &Flags {
106+
&self.flags
107+
}
108+
109+
fn set_flags(&mut self, flags: Flags) {
110+
self.flags = flags;
111+
}
112+
113+
fn id(&self) -> u32 {
114+
self.id
115+
}
116+
117+
fn set_id(&mut self, id: u32) {
118+
self.id = id;
119+
}
120+
}
121+
122+
#[tokio::main]
123+
async fn main() {
124+
let (client, server) = io::duplex(1024);
125+
126+
let mut alice = client::Connection::<MyPacket<_>>::new(
127+
client,
128+
client::Config {
129+
timeout: Duration::from_secs(1),
130+
body_limit: 1024,
131+
},
132+
None,
133+
);
134+
135+
let mut bob = server::Connection::<MyPacket<_>>::new(
136+
server,
137+
server::Config {
138+
timeout: Duration::from_secs(1),
139+
body_limit: 1024,
140+
},
141+
);
142+
143+
let client_task = tokio::spawn(async move {
144+
alice.enable_server_requests().await.unwrap();
145+
146+
// listen for a request from Bob
147+
let msg = alice.receive().await.unwrap();
148+
match msg {
149+
Message::Request(req, resp_sender) => {
150+
assert_eq!(req.body().as_slice(), b"Hello, Alice!");
151+
152+
let mut resp = MyPacket::empty();
153+
resp.body_mut().write(b"Hello, Back!");
154+
resp_sender.send(resp).unwrap();
155+
}
156+
_ => unreachable!(),
157+
}
158+
159+
// send a request to Bob
160+
let mut req = MyPacket::empty();
161+
req.body_mut().write(b"Hello, Bob!");
162+
163+
let resp = alice.request(req).await.unwrap();
164+
assert_eq!(resp.body().as_slice(), b"Hello, Back!");
165+
166+
assert!(alice.receive().await.is_none());
167+
});
168+
169+
let server_task = tokio::spawn(async move {
170+
// wait for Alice to enable server requests
171+
let msg = bob.receive().await.unwrap();
172+
assert!(matches!(msg, Message::EnableServerRequests));
173+
174+
// send a request to Alice
175+
let mut req = MyPacket::empty();
176+
req.body_mut().write(b"Hello, Alice!");
177+
178+
let resp = bob.request(req).await.unwrap();
179+
assert_eq!(resp.body().as_slice(), b"Hello, Back!");
180+
181+
// listen for a request from Alice
182+
let msg = bob.receive().await.unwrap();
183+
match msg {
184+
Message::Request(req, resp_sender) => {
185+
assert_eq!(req.body().as_slice(), b"Hello, Bob!");
186+
187+
let mut resp = MyPacket::empty();
188+
resp.body_mut().write(b"Hello, Back!");
189+
resp_sender.send(resp).unwrap();
190+
}
191+
_ => unreachable!(),
192+
}
193+
});
194+
195+
tokio::try_join!(client_task, server_task).unwrap();
196+
}
197+
198+
#[test]
199+
fn server_requests_main() {
200+
main();
201+
}

lafere/src/client.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
use crate::error::{RequestError, TaskError};
2+
use crate::handler::server::Receiver;
23
use crate::handler::{
34
Configurator, StreamReceiver, StreamSender, TaskHandle, client::Sender,
45
};
56
use crate::packet::{Packet, PlainBytes};
67
use crate::plain;
8+
use crate::server::Request;
79
use crate::util::{ByteStream, PinnedFuture};
810

911
#[cfg(feature = "encrypted")]
@@ -45,7 +47,10 @@ impl<S> ReconStrat<S> {
4547

4648
/// A connection to a server
4749
pub struct Connection<P> {
48-
sender: Sender<P, Config>,
50+
sender: Sender<P>,
51+
receiver: Receiver<P>,
52+
receiver_enabled: bool,
53+
config: Configurator<Config>,
4954
task: TaskHandle,
5055
}
5156

@@ -82,19 +87,45 @@ impl<P> Connection<P> {
8287
}
8388

8489
/// Creates a new Stream.
85-
pub(crate) fn new_raw(sender: Sender<P, Config>, task: TaskHandle) -> Self {
86-
Self { sender, task }
90+
pub(crate) fn new_raw(
91+
sender: Sender<P>,
92+
receiver: Receiver<P>,
93+
config: Configurator<Config>,
94+
task: TaskHandle,
95+
) -> Self {
96+
Self {
97+
sender,
98+
receiver,
99+
receiver_enabled: false,
100+
config,
101+
task,
102+
}
87103
}
88104

89105
/// Update the connection configuration
90106
pub fn update_config(&self, cfg: Config) {
91-
self.sender.update_config(cfg);
107+
self.config.update(cfg);
92108
}
93109

94110
/// Get's a `Configurator` which allows to configure this connection
95111
/// without needing to have access to the connection
96112
pub fn configurator(&self) -> Configurator<Config> {
97-
self.sender.configurator()
113+
self.config.clone()
114+
}
115+
116+
pub async fn enable_server_requests(&mut self) -> Result<(), RequestError> {
117+
self.sender.enable_server_requests().await?;
118+
self.receiver_enabled = true;
119+
120+
Ok(())
121+
}
122+
123+
/// ## Panics
124+
/// - If server requests are not enabled
125+
pub async fn receive(&mut self) -> Option<Request<P>> {
126+
assert!(self.receiver_enabled);
127+
128+
self.receiver.receive().await
98129
}
99130

100131
/// Send a request waiting until a response is available or the connection

lafere/src/encrypted/stream.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use super::handshake::{client_handshake, server_handshake};
22
use crate::client::{Config as ClientConfig, Connection as Client, ReconStrat};
33
use crate::error::TaskError;
4-
use crate::handler::TaskHandle;
54
use crate::handler::handler::{
65
Handler, PacketStream, bg_stream, bg_stream_reconnect,
76
};
7+
use crate::handler::{Configurator, TaskHandle};
88
use crate::packet::builder::{PacketReceiver, PacketReceiverError};
99
use crate::packet::{EncryptedBytes, Packet};
1010
use crate::server::{Config as ServerConfig, Connection as Server};
@@ -30,7 +30,8 @@ where
3030
P: Packet<EncryptedBytes> + Send + 'static,
3131
P::Header: Send,
3232
{
33-
let (sender, _, mut cfg_rx, mut bg_handler) = Handler::new(cfg, false);
33+
let (sender, receiver, mut bg_handler) = Handler::new(false);
34+
let (cfg_tx, mut cfg_rx) = Configurator::new(cfg);
3435

3536
let (tx_close, mut rx_close) = oneshot::channel();
3637
let task = tokio::spawn(async move {
@@ -56,7 +57,7 @@ where
5657
task,
5758
};
5859

59-
Client::new_raw(sender, task)
60+
Client::new_raw(sender, receiver, cfg_tx, task)
6061
}
6162

6263
pub fn server<S, P>(
@@ -69,7 +70,8 @@ where
6970
P: Packet<EncryptedBytes> + Send + 'static,
7071
P::Header: Send,
7172
{
72-
let (_, receiver, mut cfg_rx, mut bg_handler) = Handler::new(cfg, true);
73+
let (sender, receiver, mut bg_handler) = Handler::new(true);
74+
let (cfg_tx, mut cfg_rx) = Configurator::new(cfg);
7375

7476
let (tx_close, mut rx_close) = oneshot::channel();
7577
let task = tokio::spawn(async move {
@@ -100,7 +102,7 @@ where
100102
task,
101103
};
102104

103-
Server::new_raw(receiver, task)
105+
Server::new_raw(sender, receiver, cfg_tx, task)
104106
}
105107

106108
/// inner manages a stream

lafere/src/handler/client.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
1-
use super::{Configurator, StreamReceiver, StreamSender};
1+
use super::{StreamReceiver, StreamSender};
22
use crate::error::RequestError;
33
use crate::handler::handler::InternalRequest;
4-
use crate::util::watch;
54

65
use tokio::sync::{mpsc, oneshot};
76

87
/// A sender that sends messages to the handler.
9-
pub struct Sender<P, C> {
8+
pub struct Sender<P> {
109
pub(super) inner: mpsc::Sender<InternalRequest<P>>,
11-
pub(super) cfg: watch::Sender<C>,
1210
}
1311

14-
impl<P, C> Sender<P, C> {
12+
impl<P> Sender<P> {
13+
// can only be sent from a client
14+
pub async fn enable_server_requests(&self) -> Result<(), RequestError> {
15+
self.inner
16+
.send(InternalRequest::EnableServerRequests)
17+
.await
18+
.map_err(|_| RequestError::ConnectionAlreadyClosed)
19+
}
20+
1521
/// Send a request waiting until a response is available.
1622
pub async fn request(&self, packet: P) -> Result<P, RequestError> {
1723
let (tx, rx) = oneshot::channel();
@@ -50,21 +56,12 @@ impl<P, C> Sender<P, C> {
5056

5157
Ok(StreamReceiver::new(rx))
5258
}
53-
54-
pub fn update_config(&self, cfg: C) {
55-
self.cfg.send(cfg);
56-
}
57-
58-
pub fn configurator(&self) -> Configurator<C> {
59-
Configurator::new(self.cfg.clone())
60-
}
6159
}
6260

63-
impl<P, C> Clone for Sender<P, C> {
61+
impl<P> Clone for Sender<P> {
6462
fn clone(&self) -> Self {
6563
Self {
6664
inner: self.inner.clone(),
67-
cfg: self.cfg.clone(),
6865
}
6966
}
7067
}

0 commit comments

Comments
 (0)