diff --git a/src/mqtt_client.rs b/src/mqtt_client.rs index 36e90b3..4659054 100644 --- a/src/mqtt_client.rs +++ b/src/mqtt_client.rs @@ -489,6 +489,7 @@ impl<'buf, TcpStack: TcpClientStack, Clock: embedded_time::Clock, Broker: crate: fn update(&mut self) -> Result<(), Error> { if self.network.socket_was_closed() { + self.network.reset(); info!("Handling closed socket"); self.sm.process_event(Events::TcpDisconnect).unwrap(); self.network.allocate_socket()?; @@ -780,6 +781,7 @@ impl<'buf, TcpStack: TcpClientStack, Clock: embedded_time::Clock, Broker: crate: if received > 0 { debug!("Received {} bytes", received); } else { + self.client.update()?; return Ok(None); } } @@ -787,6 +789,7 @@ impl<'buf, TcpStack: TcpClientStack, Clock: embedded_time::Clock, Broker: crate: let packet = self.packet_reader.received_packet()?; info!("Received {:?}", packet); if let Some(result) = self.client.handle_packet(packet, &mut f)? { + self.client.update()?; return Ok(Some(result)); } } diff --git a/src/network_manager.rs b/src/network_manager.rs index 7e7afd0..5d17942 100644 --- a/src/network_manager.rs +++ b/src/network_manager.rs @@ -36,13 +36,13 @@ where } } - pub fn socket_was_closed(&mut self) -> bool { - let was_closed = self.connection_died; - if was_closed { - self.pending_write.take(); - } + pub fn socket_was_closed(&self) -> bool { + self.connection_died + } + + pub fn reset(&mut self) { + self.pending_write.take(); self.connection_died = false; - was_closed } /// Determine if there is a pending packet write that needs to be completed.