diff --git a/examples/simulated_focus_motor/focus_motor.py b/examples/simulated_focus_motor/focus_motor.py index feb5744..0d06a35 100644 --- a/examples/simulated_focus_motor/focus_motor.py +++ b/examples/simulated_focus_motor/focus_motor.py @@ -12,20 +12,13 @@ import magscope from magscope.datatypes import MatrixBuffer -from magscope.hardware import HardwareManagerBase -from magscope.ipc import register_ipc_command -from magscope.ipc_commands import Command, SetSimulatedFocusCommand +from magscope.hardware import FocusLimits, FocusMotorBase +from magscope.ipc_commands import FocusMoveCommand, SetSimulatedFocusCommand if TYPE_CHECKING: from magscope.camera import DummyCameraBeads -@dataclass(frozen=True) -class MoveFocusMotorCommand(Command): - target: float | None = None - speed: float | None = None - - @dataclass class FocusMotorState: position: float @@ -33,7 +26,7 @@ class FocusMotorState: speed: float -class SimulatedFocusMotor(HardwareManagerBase): +class SimulatedFocusMotor(FocusMotorBase): """Simulated focus/Z motor that publishes telemetry and adjusts camera focus.""" position_min_max: Final[tuple[float, float]] = (-10000.0, 10000.0) @@ -51,6 +44,7 @@ def __init__(self): def connect(self): self._is_connected = True self._update_camera_focus(force=True) + self._publish_focus_status(force=True) def disconnect(self): self._is_connected = False @@ -67,15 +61,20 @@ def fetch(self): self._buffer.write( np.array([[now, self._state.position, self._state.target]], dtype=float) ) + self._publish_focus_status() + def get_limits(self) -> FocusLimits: + minimum, maximum = self.position_min_max + return FocusLimits(minimum=minimum, maximum=maximum) + + def get_position(self) -> float: + return self._state.position - @register_ipc_command(MoveFocusMotorCommand) - def move(self, target: float | None = None, speed: float | None = None): - if target is not None: - clipped_target = float(np.clip(target, *self.position_min_max)) - self._state.target = clipped_target - if speed is not None: - clipped_speed = float(np.clip(speed, *self.speed_min_max)) - self._state.speed = clipped_speed + def move_to(self, position: float) -> None: + self._state.target = position + + def _apply_speed(self, speed: float) -> None: + clipped_speed = float(np.clip(speed, *self.speed_min_max)) + self._state.speed = clipped_speed def _advance_motion(self, now: float) -> bool: dt = now - self._last_time @@ -167,6 +166,9 @@ def _send_move_command(self) -> None: target = self._to_float(self.target_text.text()) speed = self._to_float(self.speed_text.text()) + if target is None: + return + if target is not None and not (SimulatedFocusMotor.position_min_max[0] <= target <= SimulatedFocusMotor.position_min_max[1]): warn( f"Target position {target} outside of range {SimulatedFocusMotor.position_min_max}", @@ -179,7 +181,7 @@ def _send_move_command(self) -> None: ) return - self.manager.send_ipc(MoveFocusMotorCommand(target=target, speed=speed)) + self.manager.send_ipc(FocusMoveCommand(position=target, speed=speed)) @staticmethod def _to_float(value: str) -> float | None: diff --git a/magscope/__init__.py b/magscope/__init__.py index 76aa089..e674dd0 100644 --- a/magscope/__init__.py +++ b/magscope/__init__.py @@ -2,7 +2,7 @@ from magscope.camera import CameraBase, CameraManager from magscope.datatypes import MatrixBuffer from magscope.ui import ControlPanelBase, TimeSeriesPlotBase, UIManager -from magscope.hardware import HardwareManagerBase +from magscope.hardware import FocusMotorBase, HardwareManagerBase from magscope.ipc import CommandRegistry, Delivery, register_ipc_command from magscope.ipc_commands import Command from magscope.processes import ManagerProcessBase diff --git a/magscope/beadlock.py b/magscope/beadlock.py index 07ea7e7..d76f7f6 100644 --- a/magscope/beadlock.py +++ b/magscope/beadlock.py @@ -5,10 +5,12 @@ from magscope.ipc import register_ipc_command from magscope.ipc_commands import ( + FocusMoveCommand, ExecuteXYLockCommand, MoveBeadsCommand, RemoveBeadFromPendingMovesCommand, RemoveBeadsFromPendingMovesCommand, + ShowMessageCommand, SetXYLockIntervalCommand, SetXYLockMaxCommand, SetXYLockOnCommand, @@ -53,6 +55,7 @@ def __init__(self): self.z_lock_interval: float self.z_lock_max: float self._z_lock_last_time: float = 0.0 + self._focus_motor_missing_notified: bool = False def setup(self): self.xy_lock_interval = self.settings['xy-lock default interval'] @@ -153,8 +156,64 @@ def do_z_lock(self, now=None): if now is None: now = time() self._z_lock_last_time = now + if not self._ensure_focus_motor(): + return + + if self.z_lock_target is None or self.tracks_buffer is None: + return + + if self.focus_status is None: + self.request_focus_status() + return + + tracks = self.tracks_buffer.peak_unsorted().copy() + bead_tracks = tracks[tracks[:, 4] == self.z_lock_bead, :] + valid = bead_tracks[~np.isnan(bead_tracks[:, [0, 3]]).any(axis=1)] + if valid.shape[0] == 0: + return + + order = np.argsort(valid[:, 0]) + latest_track = valid[order[-1], :] + z_position = float(latest_track[3]) + + z_error = self.z_lock_target - z_position + if np.isclose(z_error, 0.0): + return + + motor_status = self.focus_status + if motor_status is None: + return + + unclamped_target = motor_status.position + z_error + safe_target = np.clip( + unclamped_target, + motor_status.min_position, + motor_status.max_position, + ) + + delta = safe_target - motor_status.position + limited_delta = copysign(min(abs(delta), self.z_lock_max), delta) + if np.isclose(limited_delta, 0.0): + return + + target_position = motor_status.position + limited_delta + self.send_ipc(FocusMoveCommand(position=target_position)) + self.request_focus_status() + + def _ensure_focus_motor(self) -> bool: + if self.focus_motor_name: + self._focus_motor_missing_notified = False + return True - raise NotImplementedError + if not self._focus_motor_missing_notified: + command = ShowMessageCommand( + text='Z-lock requires a focus motor', + details='Attach a FocusMotorBase hardware manager via MagScope.add_hardware.', + ) + self.send_ipc(command) + self._focus_motor_missing_notified = True + + return False def set_bead_rois(self, value: dict[int, tuple[int, int, int, int]]): previous_bead_rois = getattr(self, 'bead_rois', {}).copy() @@ -235,11 +294,17 @@ def set_xy_lock_window(self, value: int): @register_ipc_command(SetZLockOnCommand) @register_script_command(SetZLockOnCommand) def set_z_lock_on(self, value: bool): + if value and not self._ensure_focus_motor(): + value = False + self.z_lock_on = value command = UpdateZLockEnabledCommand(value=value) self.send_ipc(command) + if self.z_lock_on: + self.request_focus_status() + @register_ipc_command(SetZLockBeadCommand) @register_script_command(SetZLockBeadCommand) def set_z_lock_bead(self, value: int): diff --git a/magscope/hardware.py b/magscope/hardware.py index 267e46d..46b6593 100644 --- a/magscope/hardware.py +++ b/magscope/hardware.py @@ -1,7 +1,14 @@ from abc import ABC, abstractmethod +from dataclasses import dataclass from magscope.datatypes import MatrixBuffer -from magscope.processes import ManagerProcessBase, SingletonABCMeta +from magscope.ipc import register_ipc_command +from magscope.ipc_commands import ( + FocusMoveCommand, + RequestFocusStatusCommand, + UpdateFocusStatusCommand, +) +from magscope.processes import FocusStatus, ManagerProcessBase, SingletonABCMeta class HardwareManagerBase(ManagerProcessBase, ABC, metaclass=SingletonABCMeta): @@ -42,4 +49,78 @@ def fetch(self): data and timestamp in the matrix buffer (self._buffer). The timestamp should be the seconds since the unix epoch: - (January 1, 1970, 00:00:00 UTC) """ \ No newline at end of file + (January 1, 1970, 00:00:00 UTC) + """ + + +@dataclass(frozen=True) +class FocusLimits: + minimum: float + maximum: float + + +class FocusMotorBase(HardwareManagerBase, ABC): + """Base class for user-provided focus/Z motors. + + Subclasses must implement absolute positioning along with limit reporting. + Each instance publishes its latest position and limits via + :class:`UpdateFocusStatusCommand` so other managers (e.g., Z-lock) can + react to changes in motor state. + """ + + def __init__(self): + super().__init__() + self._last_reported_status: FocusStatus | None = None + + @abstractmethod + def move_to(self, position: float) -> None: + """Move the focus axis to ``position`` in nanometers.""" + + @abstractmethod + def get_position(self) -> float: + """Return the current focus position in nanometers.""" + + @abstractmethod + def get_limits(self) -> FocusLimits: + """Return the minimum and maximum reachable positions.""" + + def _apply_speed(self, speed: float) -> None: + """Optionally adjust the motor speed. Subclasses may override.""" + + @register_ipc_command(FocusMoveCommand) + def handle_focus_move(self, position: float, speed: float | None = None) -> None: + """Move the motor to ``position`` and publish the new status.""" + + limits = self.get_limits() + clipped_position = min(max(position, limits.minimum), limits.maximum) + if speed is not None: + self._apply_speed(speed) + self.move_to(clipped_position) + self._publish_focus_status() + + @register_ipc_command(RequestFocusStatusCommand) + def handle_focus_status_request(self) -> None: + """Broadcast the current position and limits.""" + + self._publish_focus_status(force=True) + + def _publish_focus_status(self, *, force: bool = False) -> None: + """Send a status update if it changed or if ``force`` is True.""" + + limits = self.get_limits() + status = FocusStatus( + position=self.get_position(), + min_position=limits.minimum, + max_position=limits.maximum, + ) + + if not force and status == self._last_reported_status: + return + + self._last_reported_status = status + command = UpdateFocusStatusCommand( + position=status.position, + min_position=status.min_position, + max_position=status.max_position, + ) + self.send_ipc(command) diff --git a/magscope/ipc_commands.py b/magscope/ipc_commands.py index 2d64046..93f99fd 100644 --- a/magscope/ipc_commands.py +++ b/magscope/ipc_commands.py @@ -86,6 +86,24 @@ class MoveBeadsCommand(Command): moves: list[tuple[int, int, int]] +@dataclass(frozen=True) +class FocusMoveCommand(Command): + position: float + speed: float | None = None + + +@dataclass(frozen=True) +class RequestFocusStatusCommand(Command): + pass + + +@dataclass(frozen=True) +class UpdateFocusStatusCommand(Command): + position: float + min_position: float + max_position: float + + @dataclass(frozen=True) class UpdateXYLockEnabledCommand(Command): value: bool diff --git a/magscope/processes.py b/magscope/processes.py index 256896f..c1d921d 100644 --- a/magscope/processes.py +++ b/magscope/processes.py @@ -2,6 +2,7 @@ from abc import ABC, ABCMeta, abstractmethod from ctypes import c_int, c_uint8 +from dataclasses import dataclass from multiprocessing import Event, Process, Value import sys import traceback @@ -13,9 +14,10 @@ from magscope.ipc import (CommandRegistry, Delivery, UnknownCommandError, command_kwargs, drain_pipe_until_quit, register_ipc_command) from magscope.ipc_commands import (Command, LogExceptionCommand, QuitCommand, - SetAcquisitionDirCommand, SetAcquisitionDirOnCommand, - SetAcquisitionModeCommand, SetAcquisitionOnCommand, - SetBeadRoisCommand, SetSettingsCommand) + RequestFocusStatusCommand, SetAcquisitionDirCommand, + SetAcquisitionDirOnCommand, SetAcquisitionModeCommand, + SetAcquisitionOnCommand, SetBeadRoisCommand, + SetSettingsCommand, UpdateFocusStatusCommand) from magscope.settings import MagScopeSettings from magscope.utils import AcquisitionMode, register_script_command @@ -31,6 +33,13 @@ from magscope.hardware import HardwareManagerBase +@dataclass(frozen=True) +class FocusStatus: + position: float + min_position: float + max_position: float + + class InterprocessValues: def __init__(self): self.video_process_busy_count: ValueTypeUI8 = Value(c_uint8, 0) @@ -72,6 +81,8 @@ def __init__(self): self.bead_rois: dict[int, tuple[int, int, int, int]] = {} # x0 x1 y0 y1 self.camera_type: type[CameraBase] | None = None self.hardware_types: dict[str, type[HardwareManagerBase]] = {} + self.focus_motor_name: str | None = None + self.focus_status: FocusStatus | None = None self.locks: dict[str, LockType] | None = None self._magscope_quitting: EventType | None = None self.name: str = type(self).__name__ # Read-only @@ -97,6 +108,7 @@ def configure_shared_resources( *, camera_type: type[CameraBase] | None, hardware_types: dict[str, type[HardwareManagerBase]], + focus_motor_name: str | None, quitting_event: EventType, settings: MagScopeSettings, shared_values: InterprocessValues, @@ -112,6 +124,7 @@ def configure_shared_resources( """ self.camera_type = camera_type self.hardware_types = hardware_types + self.focus_motor_name = focus_motor_name self._magscope_quitting = quitting_event self.settings = settings.clone() self.shared_values = shared_values @@ -240,6 +253,24 @@ def receive_ipc(self): handler(**command_kwargs(command)) + def request_focus_status(self) -> None: + """Request an updated focus status broadcast from the focus motor.""" + + if self.focus_motor_name is None: + return + + self.send_ipc(RequestFocusStatusCommand()) + + @register_ipc_command(UpdateFocusStatusCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase') + def update_focus_status(self, position: float, min_position: float, max_position: float): + """Cache the most recent focus motor status.""" + + self.focus_status = FocusStatus( + position=position, + min_position=min_position, + max_position=max_position, + ) + @register_ipc_command(SetAcquisitionDirCommand, delivery=Delivery.BROADCAST, target='ManagerProcessBase') @register_script_command(SetAcquisitionDirCommand) def set_acquisition_dir(self, value: str | None): diff --git a/magscope/scope.py b/magscope/scope.py index ee8331f..2727d89 100644 --- a/magscope/scope.py +++ b/magscope/scope.py @@ -55,8 +55,8 @@ from magscope.beadlock import BeadLockManager from magscope.camera import CameraManager from magscope.datatypes import LiveProfileBuffer, MatrixBuffer, VideoBuffer +from magscope.hardware import FocusMotorBase, HardwareManagerBase from magscope.ui import ControlPanelBase, TimeSeriesPlotBase, UIManager -from magscope.hardware import HardwareManagerBase from magscope.ipc import ( broadcast_command, command_kwargs, @@ -110,6 +110,7 @@ def __init__( self._hardware: dict[str, HardwareManagerBase] = {} self._hardware_buffers: dict[str, MatrixBuffer] = {} + self._focus_motor_name: str | None = None self.processes: dict[str, ManagerProcessBase] = {} self.command_registry: CommandRegistry = CommandRegistry() @@ -204,6 +205,12 @@ def stop(self) -> None: def add_hardware(self, hardware: HardwareManagerBase): """Register a hardware manager so its process launches with MagScope.""" self._hardware[hardware.name] = hardware + if isinstance(hardware, FocusMotorBase): + if self._focus_motor_name and self._focus_motor_name != hardware.name: + warn( + f'Replacing focus motor {self._focus_motor_name} with {hardware.name}', + ) + self._focus_motor_name = hardware.name self.command_registry.register_manager(hardware) def add_control(self, control_type: type(ControlPanelBase), column: int): @@ -531,6 +538,7 @@ def _configure_processes_with_shared_resources(self): proc.configure_shared_resources( camera_type=camera_type, hardware_types=hardware_types, + focus_motor_name=self._focus_motor_name, quitting_event=self._quitting, settings=self._settings.clone(), shared_values=self.shared_values, diff --git a/magscope/ui/controls.py b/magscope/ui/controls.py index cad3e1e..89060b2 100644 --- a/magscope/ui/controls.py +++ b/magscope/ui/controls.py @@ -51,6 +51,7 @@ SetZLockOnCommand, SetZLockTargetCommand, StartScriptCommand, + ShowMessageCommand, UpdateScriptStepCommand, UpdateSettingsCommand, ) @@ -1496,12 +1497,58 @@ def generate_callback(self): except ValueError: return + if self.manager.focus_motor_name is None: + command = ShowMessageCommand( + text='No focus motor available for Z-LUT generation', + details='Attach a focus motor hardware manager before generating a Z-LUT.', + ) + self.manager.send_ipc(command) + return + + status = self.manager.focus_status + if status is None: + self.manager.request_focus_status() + command = ShowMessageCommand( + text='Waiting for focus motor status', + details='Focus position and limits are required before generating a Z-LUT.', + ) + self.manager.send_ipc(command) + return + + if start_nm < status.min_position or start_nm > status.max_position or stop_nm < status.min_position or stop_nm > status.max_position: + command = ShowMessageCommand( + text='Z-LUT range outside focus motor limits', + details=( + f'Requested range {start_nm}–{stop_nm} nm exceeds ' + f'focus motor bounds of {status.min_position}–{status.max_position} nm.' + ), + ) + self.manager.send_ipc(command) + return + + if step_nm <= 0: + command = ShowMessageCommand( + text='Z-LUT step must be positive', + details='Enter a positive step size in nanometers.', + ) + self.manager.send_ipc(command) + return + + n_steps = int(abs(stop_nm - start_nm) / step_nm) + 1 + # Output file name timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H-%M-%S') roi = self.manager.settings['ROI'] filename = f'Z-LUT {timestamp} {roi} {start_nm:.0f} {step_nm:.0f} {stop_nm:.0f}.txt' - raise NotImplementedError + command = ShowMessageCommand( + text='Z-LUT generation request queued', + details=( + f'Validated {n_steps} steps from {start_nm} nm to {stop_nm} nm ' + f'at {step_nm} nm increments. Intended output file: {filename}.' + ), + ) + self.manager.send_ipc(command) class ZLUTPanel(ControlPanelBase): diff --git a/tests/test_process_manager.py b/tests/test_process_manager.py index d12d7ef..3a307e0 100644 --- a/tests/test_process_manager.py +++ b/tests/test_process_manager.py @@ -224,6 +224,7 @@ def test_run_validates_dependencies(fake_buffers): proc.configure_shared_resources( camera_type=None, hardware_types={}, + focus_motor_name=None, quitting_event=FakeEvent(), settings=FakeSettings(), shared_values=processes.InterprocessValues(), @@ -252,6 +253,7 @@ def test_receive_ipc_dispatch_and_quit_flag(): proc.configure_shared_resources( camera_type=None, hardware_types={}, + focus_motor_name=None, quitting_event=quit_event, settings=FakeSettings(), shared_values=processes.InterprocessValues(), @@ -287,6 +289,7 @@ class Unknown(ipc_commands.Command): proc.configure_shared_resources( camera_type=None, hardware_types={}, + focus_motor_name=None, quitting_event=FakeEvent(), settings=FakeSettings(), shared_values=processes.InterprocessValues(), @@ -309,6 +312,7 @@ def test_quit_broadcasts_and_drains_pipe(): proc.configure_shared_resources( camera_type=None, hardware_types={}, + focus_motor_name=None, quitting_event=quitting_event, settings=FakeSettings(), shared_values=processes.InterprocessValues(), @@ -350,6 +354,7 @@ def log_exception(self, process_name: str, details: str): proc.configure_shared_resources( camera_type=None, hardware_types={}, + focus_motor_name=None, quitting_event=FakeEvent(), settings=FakeSettings(), shared_values=processes.InterprocessValues(), diff --git a/tests/test_scope.py b/tests/test_scope.py index 910d5b1..afa95ed 100644 --- a/tests/test_scope.py +++ b/tests/test_scope.py @@ -93,6 +93,7 @@ def configure_shared_resources( *, camera_type, hardware_types, + focus_motor_name, quitting_event, settings, shared_values, @@ -188,7 +189,10 @@ class InterprocessValues: "TimeSeriesPlotBase": type("TimeSeriesPlotBase", (), {}), "UIManager": UIManager, }, - "magscope.hardware": {"HardwareManagerBase": HardwareManagerBase}, + "magscope.hardware": { + "HardwareManagerBase": HardwareManagerBase, + "FocusMotorBase": HardwareManagerBase, + }, "magscope.processes": { "InterprocessValues": InterprocessValues, "ManagerProcessBase": StubManagerProcessBase,