Skip to content

Commit 9084234

Browse files
committed
fix: connection drop and logging issues
1 parent 1215a6c commit 9084234

File tree

7 files changed

+132
-11
lines changed

7 files changed

+132
-11
lines changed

main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,12 @@ def signal_handler(sig, frame):
156156
# Endlosschleife, wenn kein Timeout gesetzt ist
157157
while True:
158158
time.sleep(1)
159+
if not controller.is_running:
160+
logger.error("Controller threads are dead. Exiting...")
161+
break
162+
163+
controller.disconnect()
164+
sys.exit(1)
159165

160166
except SignalduinoConnectionError as e:
161167
# Wird ausgelöst, wenn die Verbindung beim Start fehlschlägt (z.B. falscher Port, Gerät nicht angeschlossen)

sd_protocols/message_unsynced.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def demodulate_mu(self, msg_data: Dict[str, Any], msg_type: str = "MU") -> List[
4545
mu_protocols = self.get_keys('clockabs')
4646

4747
for pid in mu_protocols:
48+
self._logging(f"MU checking PID {pid}", 5)
4849
# Prepare working copy of raw_data and patterns
4950
# (Perl does this per protocol iteration because filterfunc might modify them)
5051
current_raw_data = raw_data
@@ -94,6 +95,7 @@ def demodulate_mu(self, msg_data: Dict[str, Any], msg_type: str = "MU") -> List[
9495

9596
# Check one, zero, float
9697
for key in ['one', 'zero', 'float']:
98+
# print(f"DEBUG: Checking {key} for PID {pid}")
9799
prop_val = self.get_property(pid, key)
98100
if not prop_val:
99101
continue
@@ -200,13 +202,16 @@ def demodulate_mu(self, msg_data: Dict[str, Any], msg_type: str = "MU") -> List[
200202
regex_pattern = f"(?:{re.escape(start_str)})((?:{signal_group_inner}){{{length_min},}}{reconstruct_part})"
201203

202204
try:
205+
# print(f"DEBUG: Compiling regex for {pid}: {regex_pattern[:50]}...")
203206
matcher = re.compile(regex_pattern)
204207
except re.error as e:
205208
self._logging(f"MU Demod: Invalid regex for {pid}: {e}", 3)
206209
continue
207210

208211
# Perl iterates with /g
212+
# print(f"DEBUG: Executing finditer for {pid}")
209213
for match in matcher.finditer(current_raw_data):
214+
# print(f"DEBUG: Match found for {pid}")
210215
data_part = match.group(1)
211216

212217
# Check length max

sd_protocols/pattern_utils.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ def pattern_exists(search_pattern: List[float], pattern_list: Dict[str, float],
8787
# 2. Generate cartesian product of candidates
8888
# This gives us all possible assignments of Pattern IDs to the Unique Search Values
8989
# e.g. search=[1, -1], candidates(1)=['0'], candidates(-1)=['1'] -> product=[['0', '1']]
90+
91+
# Check for explosion risk
92+
total_combinations = 1
93+
for c in candidates_list:
94+
total_combinations *= len(c)
95+
96+
if total_combinations > 10000:
97+
if debug_callback:
98+
debug_callback(f"Too many combinations: {total_combinations}. Aborting pattern match.")
99+
print(f"DEBUG: Too many combinations: {total_combinations} for {search_pattern}")
100+
return -1
101+
90102
product = cartesian_product(candidates_list)
91103

92104
if debug_callback:

signalduino/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
SDUINO_WRITEQUEUE_NEXT = 0.3
1111
SDUINO_WRITEQUEUE_TIMEOUT = 2
1212

13-
SDUINO_STATUS_HEARTBEAT_INTERVAL = 600.0 # 10 minutes
13+
SDUINO_STATUS_HEARTBEAT_INTERVAL = 10.0 # 10 seconds
1414

1515
SDUINO_DISPATCH_VERBOSE = 5
1616
SDUINO_MC_DISPATCH_VERBOSE = 5

signalduino/controller.py

Lines changed: 92 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import queue
44
import re
55
import threading
6+
import time
67
import os # NEU: Import für Umgebungsvariablen
78
from datetime import datetime, timedelta, timezone
89
from typing import Any, Callable, List, Optional, Pattern
@@ -50,6 +51,8 @@ def __init__(
5051
self._writer_thread: Optional[threading.Thread] = None
5152

5253
self._heartbeat_timer: Optional[threading.Timer] = None # NEU: Heartbeat Timer initialisieren
54+
self._init_timer_xq: Optional[threading.Timer] = None
55+
self._init_timer_start: Optional[threading.Timer] = None
5356

5457
self._stop_event = threading.Event()
5558
self._raw_message_queue: queue.Queue[str] = queue.Queue()
@@ -59,6 +62,9 @@ def __init__(
5962

6063
self.init_retry_count = 0
6164
self.init_reset_flag = False
65+
66+
self._keep_alive = False
67+
self._monitor_thread: Optional[threading.Thread] = None
6268

6369
def connect(self) -> None:
6470
"""Opens the transport and starts the worker threads."""
@@ -83,13 +89,33 @@ def connect(self) -> None:
8389
self._writer_thread = threading.Thread(target=self._writer_loop, name="sd-writer")
8490
self._writer_thread.start()
8591

86-
def disconnect(self) -> None:
92+
self._keep_alive = True
93+
if not self._monitor_thread or not self._monitor_thread.is_alive():
94+
self._monitor_thread = threading.Thread(target=self._monitor_loop, name="sd-monitor", daemon=True)
95+
self._monitor_thread.start()
96+
97+
@property
98+
def is_running(self) -> bool:
99+
"""Checks if the controller is running and threads are alive."""
100+
if self._stop_event.is_set():
101+
return False
102+
103+
# If threads are initialized, they must be alive
104+
if self._reader_thread and not self._reader_thread.is_alive():
105+
return False
106+
if self._parser_thread and not self._parser_thread.is_alive():
107+
return False
108+
if self._writer_thread and not self._writer_thread.is_alive():
109+
return False
110+
111+
return True
112+
113+
def disconnect(self, reconnect: bool = False) -> None:
87114
"""Stops the worker threads and closes the transport."""
88-
if not self.transport.is_open:
89-
self.logger.warning("disconnect() called but transport is not open.")
90-
return
115+
if not reconnect:
116+
self._keep_alive = False
91117

92-
self.logger.info("Disconnecting...")
118+
self.logger.info("Disconnecting... (reconnect=%s)", reconnect)
93119
self._stop_event.set()
94120

95121
# NEU: MQTT Publisher stoppen
@@ -100,6 +126,14 @@ def disconnect(self) -> None:
100126
self._heartbeat_timer.cancel()
101127
self._heartbeat_timer = None
102128

129+
if self._init_timer_xq:
130+
self._init_timer_xq.cancel()
131+
self._init_timer_xq = None
132+
133+
if self._init_timer_start:
134+
self._init_timer_start.cancel()
135+
self._init_timer_start = None
136+
103137
# Wake up threads that might be waiting on queues
104138
self._raw_message_queue.put("")
105139
self._write_queue.put(QueuedCommand("", 0))
@@ -111,29 +145,60 @@ def disconnect(self) -> None:
111145
if self._writer_thread:
112146
self._writer_thread.join(timeout=1)
113147

114-
self.transport.close()
148+
try:
149+
self.transport.close()
150+
except Exception as e:
151+
self.logger.warning("Error closing transport: %s", e)
115152
self.logger.info("Transport closed.")
116153

154+
def _monitor_loop(self) -> None:
155+
"""Monitors connection state and reconnects if enabled."""
156+
self.logger.info("Monitor loop started.")
157+
while True:
158+
time.sleep(5)
159+
if self._keep_alive and not self.is_running:
160+
self.logger.warning("Connection lost. Attempting auto-reconnect...")
161+
try:
162+
# Ensure everything is stopped before reconnecting
163+
self.disconnect(reconnect=True)
164+
time.sleep(2)
165+
self.connect()
166+
# Trigger init sequence
167+
self.initialize()
168+
except Exception as e:
169+
self.logger.error("Auto-reconnect failed: %s", e)
170+
117171
def initialize(self) -> None:
118172
"""Starts the initialization process."""
119173
self.logger.info("Initializing device...")
120174
self.init_retry_count = 0
121175
self.init_reset_flag = False
122176

177+
if self._stop_event.is_set():
178+
self.logger.warning("initialize called but stop event is set.")
179+
return
180+
123181
# Schedule Disable Receiver (XQ) and wait briefly
124-
threading.Timer(SDUINO_INIT_WAIT_XQ, self._send_xq).start()
182+
self._init_timer_xq = threading.Timer(SDUINO_INIT_WAIT_XQ, self._send_xq)
183+
self._init_timer_xq.start()
125184

126185
# Schedule StartInit (Get Version)
127-
threading.Timer(SDUINO_INIT_WAIT, self._start_init).start()
186+
self._init_timer_start = threading.Timer(SDUINO_INIT_WAIT, self._start_init)
187+
self._init_timer_start.start()
128188

129189
def _send_xq(self) -> None:
190+
if self._stop_event.is_set():
191+
return
130192
try:
131193
self.logger.debug("Sending XQ to disable receiver during init")
132194
self.commands.disable_receiver()
133195
except Exception as e:
134196
self.logger.warning("Failed to send XQ: %s", e)
135197

136198
def _start_init(self) -> None:
199+
if self._stop_event.is_set():
200+
return
201+
137202
self.logger.info("StartInit, get version, retry = %d", self.init_retry_count)
138203

139204
if self.init_retry_count == 0:
@@ -160,6 +225,9 @@ def _start_init(self) -> None:
160225
self._check_version_resp(response)
161226

162227
def _check_version_resp(self, msg: Optional[str]) -> None:
228+
if self._stop_event.is_set():
229+
return
230+
163231
if msg:
164232
self.logger.info("Initialized %s", msg.strip())
165233
self.init_reset_flag = False
@@ -263,6 +331,8 @@ def _parser_loop(self) -> None:
263331
except queue.Empty:
264332
continue
265333
except Exception:
334+
import traceback
335+
traceback.print_exc()
266336
if not self._stop_event.is_set():
267337
self.logger.exception("Unhandled exception in parser loop")
268338
self.logger.debug("Parser loop finished.")
@@ -422,7 +492,7 @@ def _publish_status_heartbeat(self) -> None:
422492
try:
423493
# 1. Heartbeat/Alive message (Retain: True)
424494
self.mqtt_publisher.publish_simple("status/alive", "online", retain=True)
425-
self.logger.debug("Published heartbeat status.")
495+
self.logger.info("Heartbeat executed. Status: alive")
426496

427497
# 2. Status data (version, ram, uptime)
428498
# Fetch data from device (non-blocking call, runs in timer thread)
@@ -443,6 +513,13 @@ def _publish_status_heartbeat(self) -> None:
443513
except Exception as e:
444514
self.logger.warning("Could not get free RAM for heartbeat: %s", e)
445515
status_data["free_ram"] = "error"
516+
# NEU: Wenn Heartbeat wegen Verbindungsfehler fehlschlägt, überprüfen und Disconnect initiieren.
517+
# Dies ist der erste Schritt zur Selbstheilung / Reconnect-Vorbereitung.
518+
if not self.transport.is_open and not self._stop_event.is_set():
519+
self.logger.error(
520+
"Heartbeat failed: Transport is closed. Triggering disconnect to stop worker threads."
521+
)
522+
self.disconnect(reconnect=True) # Stoppt Threads, setzt self._stop_event, erlaubt Reconnect
446523

447524
# Uptime
448525
try:
@@ -455,6 +532,12 @@ def _publish_status_heartbeat(self) -> None:
455532
except Exception as e:
456533
self.logger.warning("Could not get uptime for heartbeat: %s", e)
457534
status_data["uptime"] = "error"
535+
# NEU: Auch hier prüfen und Disconnect initiieren, falls Verbindung noch nicht bemerkt wurde
536+
if not self.transport.is_open and not self._stop_event.is_set():
537+
self.logger.error(
538+
"Heartbeat failed: Transport is closed. Triggering disconnect to stop worker threads."
539+
)
540+
self.disconnect(reconnect=True) # Stoppt Threads, setzt self._stop_event, erlaubt Reconnect
458541

459542
# Publish all collected data to a single status/data topic
460543
if status_data:

signalduino/parser/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def __init__(
2626
):
2727
self.protocols = protocols or SDProtocols()
2828
self.logger = logger or logging.getLogger(__name__)
29+
self.protocols.register_log_callback(self._log_adapter)
2930
self.rfmode = rfmode
3031
self.ms_parser = MSParser(self.protocols, self.logger)
3132
self.mu_parser = MUParser(self.protocols, self.logger)
@@ -47,6 +48,20 @@ def parse_line(self, line: str) -> List[DecodedMessage]:
4748

4849
return list(parser.parse(frame))
4950

51+
def _log_adapter(self, message: str, level: int):
52+
"""Adapts SDProtocols custom log levels to python logging."""
53+
# FHEM levels: 1=Error, 2=Warn, 3=Info, 4=More Info, 5=Debug
54+
if level <= 1:
55+
self.logger.error(message)
56+
elif level == 2:
57+
self.logger.warning(message)
58+
elif level == 3:
59+
self.logger.info(message)
60+
elif level == 4:
61+
self.logger.debug(message) # or info? keeping debug for now
62+
else:
63+
self.logger.debug(message)
64+
5065
def _select_parser(self, message_type: str | None):
5166
if not message_type:
5267
return None

signalduino/transport.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def readline(self, timeout: Optional[float] = None) -> Optional[str]:
8484
class TCPTransport(BaseTransport):
8585
"""TCP transport talking to firmware via sockets."""
8686

87-
def __init__(self, host: str, port: int, read_timeout: float = 0.5):
87+
def __init__(self, host: str, port: int, read_timeout: float = 10.0):
8888
self.host = host
8989
self.port = port
9090
self.read_timeout = read_timeout

0 commit comments

Comments
 (0)