Skip to content

Commit 63c23f5

Browse files
authored
Merge pull request #39 from cherish-ltt/v1.1.x
v1.1.14
2 parents 04e6a6a + ebef51d commit 63c23f5

File tree

18 files changed

+61
-352
lines changed

18 files changed

+61
-352
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
/target
2+
*.DS_Store
23
.DS_Store

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "lynn_tcp"
3-
version = "1.1.13"
3+
version = "1.1.14"
44
edition = "2024"
55
rust-version = "1.87"
66
authors = ["lynn_tcp Contributors"]

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,21 @@ Use `cargo add lynn_tcp` or:
3838

3939
```rust
4040
[dependencies]
41-
lynn_tcp = "=1.1.12"
41+
lynn_tcp = "=1.1.14"
4242
```
4343

4444
**server feature**
4545

4646
```rust
4747
[dependencies]
48-
lynn_tcp = { version = "=1.1.12" , features = "server" }
48+
lynn_tcp = { version = "=1.1.14" , features = "server" }
4949
```
5050

5151
**client feature**
5252

5353
```rust
5454
[dependencies]
55-
lynn_tcp = { version = "=1.1.12" , features = "client" }
55+
lynn_tcp = { version = "=1.1.14" , features = "client" }
5656
```
5757

5858
#### Server

src/app/common_api/mod.rs

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66
time::{Duration, SystemTime},
77
};
88

9-
use bytes::Bytes;
9+
use bytes::BytesMut;
1010
use tokio::{
1111
io::{AsyncReadExt, ReadHalf, split},
1212
net::TcpStream,
@@ -16,12 +16,11 @@ use tokio::{
1616
use tracing::{error, info, warn};
1717

1818
use crate::{
19-
app::ReactorEventSender,
19+
app::{ReactorEventSender, event_api::event_api::ReactorEvent},
2020
const_config::{
2121
DEFAULT_MAX_RECEIVE_BYTES_SIZE, DEFAULT_MESSAGE_HEADER_MARK, DEFAULT_MESSAGE_TAIL_MARK,
2222
SERVER_MESSAGE_HEADER_MARK, SERVER_MESSAGE_TAIL_MARK,
2323
},
24-
dto_factory::input_dto::{IHandlerCombinedTrait, MsgSelect},
2524
handler::{ClientsContext, HandlerContext},
2625
lynn_tcp_dependents::{HandlerResult, InputBufVO},
2726
vo_factory::big_buf::BigBufReader,
@@ -127,7 +126,7 @@ pub(crate) async fn check_handler_result(
127126

128127
#[inline(always)]
129128
async fn send_response(
130-
response: &Bytes,
129+
response: &BytesMut,
131130
addrs: &Vec<SocketAddr>,
132131
clients: &ClientsStructType,
133132
) -> Option<Vec<SocketAddr>> {
@@ -161,41 +160,26 @@ pub(crate) async fn input_dto_build(
161160
handler_method: Arc<AsyncFunc>,
162161
reactor_event_sender: ReactorEventSender,
163162
) {
164-
tokio::spawn(async move {
165-
// Attempt to acquire a permit from the semaphore.
166-
let result_permit = process_permit.try_acquire();
167-
match result_permit {
168-
Ok(permit) => {
169-
// If the permit is acquired successfully, create a new `MsgSelect` instance and spawn a handler task.
170-
let result = MsgSelect::new(
171-
addr,
172-
HandlerContext::new(
173-
input_buf_vo,
174-
ClientsContext::new(ClientsStruct(clients.clone())),
175-
),
176-
);
177-
spawn_handler(result, clients, handler_method, reactor_event_sender).await;
178-
// Release the permit after the handler task is completed.
179-
drop(permit);
180-
}
181-
Err(_) => {
182-
// If the permit cannot be acquired, log a warning.
183-
warn!("addr:{} PROCESS_PERMIT_SIZE is full", addr)
184-
}
163+
// Attempt to acquire a permit from the semaphore.
164+
let result_permit = process_permit.try_acquire();
165+
match result_permit {
166+
Ok(permit) => {
167+
reactor_event_sender.push(ReactorEvent::crate_excute_task_event((
168+
handler_method,
169+
HandlerContext::new(
170+
input_buf_vo,
171+
ClientsContext::new(ClientsStruct(clients.clone())),
172+
),
173+
clients,
174+
)));
175+
// Release the permit after the handler task is completed.
176+
drop(permit);
185177
}
186-
});
187-
}
188-
189-
#[inline(always)]
190-
async fn spawn_handler(
191-
mut result: MsgSelect,
192-
clients: ClientsStructType,
193-
handler_method: Arc<AsyncFunc>,
194-
reactor_event_sender: ReactorEventSender,
195-
) {
196-
result
197-
.execute(clients, handler_method, reactor_event_sender)
198-
.await;
178+
Err(_) => {
179+
// If the permit cannot be acquired, log a warning.
180+
warn!("addr:{} PROCESS_PERMIT_SIZE is full", addr)
181+
}
182+
}
199183
}
200184

201185
#[inline(always)]
@@ -260,13 +244,13 @@ pub(crate) async fn push_read_half(
260244
let guard = router_map_async.deref();
261245
if let Some(map) = guard {
262246
if map.contains_key(&method_id) {
263-
let a = map.get(&method_id).unwrap();
247+
let handler_method = map.get(&method_id).unwrap();
264248
input_dto_build(
265249
addr,
266250
input_buf_vo,
267251
process_permit.clone(),
268252
clients.clone(),
269-
a.clone(),
253+
handler_method.clone(),
270254
reactor_event_sender.clone(),
271255
)
272256
.await;

src/app/lynn_server_config.rs

Lines changed: 4 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use std::net::{SocketAddr, ToSocketAddrs};
22

33
use crate::const_config::{
44
DEFAULT_ADDR, DEFAULT_CHECK_HEART_INTERVAL, DEFAULT_CHECK_HEART_TIMEOUT_TIME,
5-
DEFAULT_MAX_CONNECTIONS, DEFAULT_MAX_REACTOR_TASKPOOL_SIZE, DEFAULT_MAX_RECEIVE_BYTES_SIZE,
6-
DEFAULT_MESSAGE_HEADER_MARK, DEFAULT_MESSAGE_TAIL_MARK, DEFAULT_PROCESS_PERMIT_SIZE,
5+
DEFAULT_MAX_CONNECTIONS, DEFAULT_MAX_REACTOR_TASKPOOL_SIZE, DEFAULT_MESSAGE_HEADER_MARK,
6+
DEFAULT_MESSAGE_TAIL_MARK, DEFAULT_PROCESS_PERMIT_SIZE,
77
};
88

99
/// Represents the configuration for the Lynn server.
@@ -18,8 +18,6 @@ pub struct LynnServerConfig<'a> {
1818
server_max_connections: Option<&'a usize>,
1919
// The maximum number of threads for the server.
2020
server_max_reactor_taskpool_size: &'a usize,
21-
// The maximum number of bytes the server can receive.
22-
server_max_receive_bytes_reader_size: &'a usize,
2321
// The permit size for a single process.
2422
server_single_processs_permit: &'a usize,
2523
// The interval for checking heartbeats.
@@ -36,50 +34,6 @@ pub struct LynnServerConfig<'a> {
3634
///
3735
/// This implementation includes a constructor for the default configuration and methods to get the configuration parameters.
3836
impl<'a> LynnServerConfig<'a> {
39-
/// Creates a new LynnServerConfig instance with the given parameters.
40-
///
41-
/// # Parameters
42-
///
43-
/// * `server_ipv4` - The IPv4 address of the server.
44-
/// * `server_single_channel_size` - The size of a single channel.
45-
/// * `server_max_connections` - The maximum number of connections for the server.
46-
/// * `server_max_threadpool_size` - The maximum number of threads for the server.
47-
/// * `server_max_receive_bytes_reader_size` - The maximum number of bytes the server can receive.
48-
/// * `server_single_processs_permit` - The permit size for a single process.
49-
/// * `server_check_heart_interval` - The interval for checking heartbeats.
50-
/// * `server_check_heart_timeout_time` - The timeout time for checking heartbeats.
51-
///
52-
/// # Returns
53-
///
54-
/// A new LynnServerConfig instance.
55-
fn new<T>(
56-
server_addr: T,
57-
server_max_connections: Option<&'a usize>,
58-
server_max_reactor_taskpool_size: &'a usize,
59-
server_max_receive_bytes_reader_size: &'a usize,
60-
server_single_processs_permit: &'a usize,
61-
server_check_heart_interval: &'a u64,
62-
server_check_heart_timeout_time: &'a u64,
63-
message_header_mark: &'a u16,
64-
message_tail_mark: &'a u16,
65-
) -> Self
66-
where
67-
T: ToSocketAddrs,
68-
{
69-
let server_addr = server_addr.to_socket_addrs().unwrap().next().unwrap();
70-
Self {
71-
server_addr,
72-
server_max_connections,
73-
server_max_reactor_taskpool_size,
74-
server_max_receive_bytes_reader_size,
75-
server_single_processs_permit,
76-
server_check_heart_interval,
77-
server_check_heart_timeout_time,
78-
message_header_mark,
79-
message_tail_mark,
80-
}
81-
}
82-
8337
/// Creates a default LynnServerConfig instance.
8438
///
8539
/// # Returns
@@ -90,7 +44,6 @@ impl<'a> LynnServerConfig<'a> {
9044
server_addr: *DEFAULT_ADDR,
9145
server_max_connections: Some(&DEFAULT_MAX_CONNECTIONS),
9246
server_max_reactor_taskpool_size: &DEFAULT_MAX_REACTOR_TASKPOOL_SIZE,
93-
server_max_receive_bytes_reader_size: &DEFAULT_MAX_RECEIVE_BYTES_SIZE,
9447
server_single_processs_permit: &DEFAULT_PROCESS_PERMIT_SIZE,
9548
server_check_heart_interval: &DEFAULT_CHECK_HEART_INTERVAL,
9649
server_check_heart_timeout_time: &DEFAULT_CHECK_HEART_TIMEOUT_TIME,
@@ -144,29 +97,10 @@ impl<'a> LynnServerConfig<'a> {
14497
self.server_max_connections
14598
}
14699

147-
/// Gets the maximum number of threads for the server.
148-
///
149-
/// # Returns
150-
///
151-
/// The maximum number of threads for the server.
152-
#[deprecated(note = "use 'get_server_max_reactor_taskpool_size'", since = "1.1.12")]
153-
pub(crate) fn get_server_max_threadpool_size(&self) -> &usize {
154-
self.server_max_reactor_taskpool_size
155-
}
156-
157100
pub(crate) fn get_server_max_reactor_taskpool_size(&self) -> &usize {
158101
self.server_max_reactor_taskpool_size
159102
}
160103

161-
/// Gets the maximum number of bytes the server can receive.
162-
///
163-
/// # Returns
164-
///
165-
/// The maximum number of bytes the server can receive.
166-
pub(crate) fn get_server_max_receive_bytes_reader_size(&self) -> &usize {
167-
self.server_max_receive_bytes_reader_size
168-
}
169-
170104
/// Gets the mark for the message header.
171105
///
172106
/// # Returns
@@ -343,24 +277,6 @@ impl<'a> LynnServerConfigBuilder<'a> {
343277
self
344278
}
345279

346-
/// Sets the maximum number of bytes the server can receive.
347-
///
348-
/// # Parameters
349-
///
350-
/// * `server_max_receive_bytes_reader_size` - The maximum number of bytes the server can receive.
351-
///
352-
/// # Returns
353-
///
354-
/// The updated `LynnServerConfigBuilder` instance.
355-
pub fn with_server_max_receive_bytes_reader_size(
356-
mut self,
357-
server_max_receive_bytes_reader_size: &'a usize,
358-
) -> Self {
359-
self.lynn_config.server_max_receive_bytes_reader_size =
360-
server_max_receive_bytes_reader_size;
361-
self
362-
}
363-
364280
/// Builds the `LynnServerConfig` instance.
365281
///
366282
/// # Returns
@@ -379,7 +295,7 @@ impl<'a> LynnServerConfigBuilder<'a> {
379295
/// # Returns
380296
///
381297
/// The updated `LynnServerConfigBuilder` instance.
382-
pub(crate) fn with_message_header_mark(mut self, msg_header_mark: &'a u16) -> Self {
298+
pub fn with_message_header_mark(mut self, msg_header_mark: &'a u16) -> Self {
383299
self.lynn_config.message_header_mark = msg_header_mark;
384300
self
385301
}
@@ -393,7 +309,7 @@ impl<'a> LynnServerConfigBuilder<'a> {
393309
/// # Returns
394310
///
395311
/// The updated `LynnServerConfigBuilder` instance.
396-
pub(crate) fn with_message_tail_mark(mut self, msg_tail_mark: &'a u16) -> Self {
312+
pub fn with_message_tail_mark(mut self, msg_tail_mark: &'a u16) -> Self {
397313
self.lynn_config.message_tail_mark = msg_tail_mark;
398314
self
399315
}

src/app/lynn_server_user.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{sync::Arc, time::SystemTime};
22

3-
use bytes::Bytes;
3+
use bytes::BytesMut;
44
use tokio::{
55
io::{AsyncWriteExt, WriteHalf},
66
net::TcpStream,
@@ -15,8 +15,6 @@ use tracing::error;
1515
pub(crate) struct LynnUser {
1616
/// The write_half used to send data to the client.
1717
write_half: *mut WriteHalf<TcpStream>,
18-
/// An optional user ID.
19-
user_id: Option<u64>,
2018
/// The last time the user communicated.
2119
last_communicate_time: Arc<RwLock<SystemTime>>,
2220
mutex: Mutex<()>,
@@ -45,7 +43,6 @@ impl LynnUser {
4543
) -> Self {
4644
Self {
4745
write_half: Box::into_raw(Box::new(write_half)),
48-
user_id: None,
4946
last_communicate_time,
5047
mutex: Mutex::new(()),
5148
}
@@ -60,7 +57,7 @@ impl LynnUser {
6057
self.last_communicate_time.clone()
6158
}
6259

63-
pub(crate) async fn send_response(&self, response: &Bytes) {
60+
pub(crate) async fn send_response(&self, response: &BytesMut) {
6461
let _lock = self.mutex.lock().await;
6562
if !self.write_half.is_null() {
6663
if let Some(write_half) = unsafe { self.write_half.as_mut() } {

src/app/mod.rs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ mod tcp_reactor;
66
use std::{
77
collections::HashMap,
88
net::{SocketAddr, ToSocketAddrs},
9-
ops::DerefMut,
109
sync::Arc,
1110
};
1211

@@ -233,19 +232,6 @@ impl<'a> LynnServer<'a> {
233232
self.router_maps.0 = None;
234233
}
235234

236-
/// Removes a client from the server.
237-
///
238-
/// # Parameters
239-
///
240-
/// * `addr` - The address of the client to remove.
241-
async fn remove_client(&mut self, addr: SocketAddr) {
242-
let mut clients = self.clients.0.write().await;
243-
let guard = clients.deref_mut();
244-
if guard.contains_key(&addr) {
245-
guard.remove(&addr);
246-
}
247-
}
248-
249235
/// Checks the heartbeat of connected clients and removes those that have not sent messages for a long time.
250236
async fn check_heart(&self) {
251237
let clients = self.clients.0.clone();

src/app/tcp_reactor/event.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ impl EventManager {
131131
}
132132
}
133133

134+
#[inline(always)]
134135
fn get_event(
135136
local_queue: &Worker<ReactorEvent>,
136137
global_queue: &ReactorEventSender,

src/app/tcp_reactor/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ pub(crate) type NewSocketEventSender = Sender<(
3131
)>;
3232

3333
pub(super) struct TcpReactor {
34-
pub(crate) core_reactor: CoreReactor,
35-
pub(crate) event_manager: EventManager,
34+
core_reactor: CoreReactor,
35+
event_manager: EventManager,
3636
}
3737

3838
impl TcpReactor {

0 commit comments

Comments
 (0)