1- //use tokio_util::codec::Framed;
2- //use tokio::net::TcpStream;
31use futures:: { SinkExt , StreamExt } ;
2+ use tokio:: time;
3+ use tracing:: { debug, info, warn, instrument} ;
44
55use crate :: core:: packet:: Packet ;
6- //use crate::core::codec::PacketCodec;
76use crate :: protocol:: message:: Message ;
87// Import secure handshake functions
98use crate :: protocol:: handshake:: { client_secure_handshake_init, client_secure_handshake_verify, client_derive_session_key} ;
9+ use crate :: protocol:: heartbeat:: { build_ping, is_pong} ;
10+ use crate :: protocol:: keepalive:: KeepAliveManager ;
1011use crate :: service:: secure:: SecureConnection ;
1112use crate :: transport:: remote;
1213use crate :: error:: { Result , ProtocolError } ;
14+ use crate :: utils:: timeout:: { with_timeout_error, DEFAULT_TIMEOUT , HANDSHAKE_TIMEOUT } ;
1315
1416/// High-level protocol client with post-handshake encryption
1517pub struct Client {
1618 conn : SecureConnection ,
19+ keep_alive : KeepAliveManager ,
1720}
1821
1922impl Client {
20- /// Connect and perform secure handshake
23+ /// Connect and perform secure handshake with timeout
24+ #[ instrument( skip( addr) , fields( address = %addr) ) ]
2125 pub async fn connect ( addr : & str ) -> Result < Self > {
22- let mut framed = remote:: connect ( addr) . await ?;
26+ // Connect with timeout
27+ let mut framed = with_timeout_error (
28+ async {
29+ remote:: connect ( addr) . await
30+ } ,
31+ DEFAULT_TIMEOUT
32+ ) . await ?;
2333
2434 // --- Legacy Handshake Support ---
2535 // Commented out legacy code for reference
@@ -37,9 +47,17 @@ impl Client {
3747 payload : init_bytes,
3848 } ) . await ?;
3949
40- // Step 2: Receive server response
41- let packet = framed. next ( ) . await . ok_or ( ProtocolError :: Timeout ) ??;
42- let response: Message = bincode:: deserialize ( & packet. payload ) ?;
50+ // Step 2: Receive server response with timeout
51+ let response = with_timeout_error (
52+ async {
53+ let packet = framed. next ( ) . await
54+ . ok_or ( ProtocolError :: ConnectionClosed ) ?
55+ . map_err ( |e| ProtocolError :: TransportError ( e. to_string ( ) ) ) ?;
56+ bincode:: deserialize :: < Message > ( & packet. payload )
57+ . map_err ( |e| ProtocolError :: DeserializeError ( e. to_string ( ) ) )
58+ } ,
59+ HANDSHAKE_TIMEOUT
60+ ) . await ?;
4361
4462 // Step 3: Verify server response and send confirmation
4563 // Extract data from server response
@@ -62,23 +80,99 @@ impl Client {
6280 // Step 4: Derive shared session key
6381 let key = client_derive_session_key ( ) ?;
6482 let conn = SecureConnection :: new ( framed, key) ;
83+ let keep_alive = KeepAliveManager :: new ( ) ;
6584
66- Ok ( Self { conn } )
85+ info ! ( "Connection established successfully" ) ;
86+ Ok ( Self { conn, keep_alive } )
6787 }
6888
6989 /// Securely send a message
90+ #[ instrument( skip( self , msg) ) ]
7091 pub async fn send ( & mut self , msg : Message ) -> Result < ( ) > {
71- self . conn . secure_send ( msg) . await
92+ let result = self . conn . secure_send ( msg) . await ;
93+ if result. is_ok ( ) {
94+ self . keep_alive . update_send ( ) ;
95+ }
96+ result
7297 }
7398
7499 /// Securely receive a message
100+ #[ instrument( skip( self ) ) ]
75101 pub async fn recv ( & mut self ) -> Result < Message > {
76- self . conn . secure_recv ( ) . await
102+ let result = self . conn . secure_recv ( ) . await ;
103+ if result. is_ok ( ) {
104+ self . keep_alive . update_recv ( ) ;
105+ }
106+ result
107+ }
108+
109+ /// Send a keep-alive ping to the server
110+ #[ instrument( skip( self ) ) ]
111+ pub async fn send_keepalive ( & mut self ) -> Result < ( ) > {
112+ debug ! ( "Sending keep-alive ping" ) ;
113+ let ping = build_ping ( ) ;
114+ self . send ( ping) . await
115+ }
116+
117+ /// Wait for messages with keep-alive handling
118+ #[ instrument( skip( self ) ) ]
119+ pub async fn recv_with_keepalive ( & mut self , timeout_duration : std:: time:: Duration ) -> Result < Message > {
120+ let mut ping_interval = time:: interval ( self . keep_alive . ping_interval ( ) ) ;
121+
122+ let timeout = time:: sleep ( timeout_duration) ;
123+ tokio:: pin!( timeout) ;
124+
125+ loop {
126+ tokio:: select! {
127+ // Check if we need to send a ping
128+ _ = ping_interval. tick( ) => {
129+ if self . keep_alive. should_ping( ) {
130+ self . send_keepalive( ) . await ?;
131+ }
132+
133+ // Check if connection is dead
134+ if self . keep_alive. is_connection_dead( ) {
135+ warn!( dead_seconds = ?self . keep_alive. time_since_last_recv( ) . as_secs( ) ,
136+ "Connection appears dead" ) ;
137+ return Err ( ProtocolError :: ConnectionTimeout ) ;
138+ }
139+ }
140+
141+ // Try to receive a message
142+ recv_result = self . conn. secure_recv:: <Message >( ) => {
143+ match recv_result {
144+ Ok ( msg) => {
145+ self . keep_alive. update_recv( ) ;
146+
147+ // Filter out pong messages, return everything else
148+ if !is_pong( & msg) {
149+ return Ok ( msg) ;
150+ } else {
151+ debug!( "Received pong response" ) ;
152+ // Continue waiting for non-pong messages
153+ }
154+ }
155+ Err ( ProtocolError :: Timeout ) => {
156+ // Timeout is expected, just continue the loop
157+ continue ;
158+ }
159+ Err ( e) => return Err ( e) ,
160+ }
161+ }
162+
163+ // User-provided timeout
164+ _ = & mut timeout => {
165+ return Err ( ProtocolError :: Timeout ) ;
166+ }
167+ }
168+ }
77169 }
78170
79- /// Send a message and wait for a response
171+ /// Send a message and wait for a response with keep-alive handling
172+ #[ instrument( skip( self , msg) ) ]
80173 pub async fn send_and_wait ( & mut self , msg : Message ) -> Result < Message > {
81174 self . send ( msg) . await ?;
82- self . recv ( ) . await
175+ // Use a reasonably long timeout for waiting for a response
176+ self . recv_with_keepalive ( std:: time:: Duration :: from_secs ( 30 ) ) . await
83177 }
84178}
0 commit comments