Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions examples/simulated_focus_motor/focus_motor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,21 @@

import magscope
from magscope.datatypes import MatrixBuffer
from magscope.hardware import HardwareManagerBase
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import Command, SetSimulatedFocusCommand
from magscope.hardware import FocusLimits, FocusMotorBase
from magscope.ipc_commands import FocusMoveCommand, SetSimulatedFocusCommand

if TYPE_CHECKING:
from magscope.camera import DummyCameraBeads


@dataclass(frozen=True)
class MoveFocusMotorCommand(Command):
target: float | None = None
speed: float | None = None


@dataclass
class FocusMotorState:
position: float
target: float
speed: float


class SimulatedFocusMotor(HardwareManagerBase):
class SimulatedFocusMotor(FocusMotorBase):
"""Simulated focus/Z motor that publishes telemetry and adjusts camera focus."""

position_min_max: Final[tuple[float, float]] = (-10000.0, 10000.0)
Expand All @@ -51,6 +44,7 @@ def __init__(self):
def connect(self):
self._is_connected = True
self._update_camera_focus(force=True)
self._publish_focus_status(force=True)

def disconnect(self):
self._is_connected = False
Expand All @@ -67,15 +61,20 @@ def fetch(self):
self._buffer.write(
np.array([[now, self._state.position, self._state.target]], dtype=float)
)
self._publish_focus_status()
def get_limits(self) -> FocusLimits:
minimum, maximum = self.position_min_max
return FocusLimits(minimum=minimum, maximum=maximum)

def get_position(self) -> float:
return self._state.position

@register_ipc_command(MoveFocusMotorCommand)
def move(self, target: float | None = None, speed: float | None = None):
if target is not None:
clipped_target = float(np.clip(target, *self.position_min_max))
self._state.target = clipped_target
if speed is not None:
clipped_speed = float(np.clip(speed, *self.speed_min_max))
self._state.speed = clipped_speed
def move_to(self, position: float) -> None:
self._state.target = position

def _apply_speed(self, speed: float) -> None:
clipped_speed = float(np.clip(speed, *self.speed_min_max))
self._state.speed = clipped_speed

def _advance_motion(self, now: float) -> bool:
dt = now - self._last_time
Expand Down Expand Up @@ -167,6 +166,9 @@ def _send_move_command(self) -> None:
target = self._to_float(self.target_text.text())
speed = self._to_float(self.speed_text.text())

if target is None:
return

if target is not None and not (SimulatedFocusMotor.position_min_max[0] <= target <= SimulatedFocusMotor.position_min_max[1]):
warn(
f"Target position {target} outside of range {SimulatedFocusMotor.position_min_max}",
Expand All @@ -179,7 +181,7 @@ def _send_move_command(self) -> None:
)
return

self.manager.send_ipc(MoveFocusMotorCommand(target=target, speed=speed))
self.manager.send_ipc(FocusMoveCommand(position=target, speed=speed))

@staticmethod
def _to_float(value: str) -> float | None:
Expand Down
2 changes: 1 addition & 1 deletion magscope/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from magscope.camera import CameraBase, CameraManager
from magscope.datatypes import MatrixBuffer
from magscope.ui import ControlPanelBase, TimeSeriesPlotBase, UIManager
from magscope.hardware import HardwareManagerBase
from magscope.hardware import FocusMotorBase, HardwareManagerBase
from magscope.ipc import CommandRegistry, Delivery, register_ipc_command
from magscope.ipc_commands import Command
from magscope.processes import ManagerProcessBase
Expand Down
67 changes: 66 additions & 1 deletion magscope/beadlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (
FocusMoveCommand,
ExecuteXYLockCommand,
MoveBeadsCommand,
RemoveBeadFromPendingMovesCommand,
RemoveBeadsFromPendingMovesCommand,
ShowMessageCommand,
SetXYLockIntervalCommand,
SetXYLockMaxCommand,
SetXYLockOnCommand,
Expand Down Expand Up @@ -53,6 +55,7 @@ def __init__(self):
self.z_lock_interval: float
self.z_lock_max: float
self._z_lock_last_time: float = 0.0
self._focus_motor_missing_notified: bool = False

def setup(self):
self.xy_lock_interval = self.settings['xy-lock default interval']
Expand Down Expand Up @@ -153,8 +156,64 @@ def do_z_lock(self, now=None):
if now is None:
now = time()
self._z_lock_last_time = now
if not self._ensure_focus_motor():
return

if self.z_lock_target is None or self.tracks_buffer is None:
return

if self.focus_status is None:
self.request_focus_status()
return

tracks = self.tracks_buffer.peak_unsorted().copy()
bead_tracks = tracks[tracks[:, 4] == self.z_lock_bead, :]
valid = bead_tracks[~np.isnan(bead_tracks[:, [0, 3]]).any(axis=1)]
if valid.shape[0] == 0:
return

order = np.argsort(valid[:, 0])
latest_track = valid[order[-1], :]
z_position = float(latest_track[3])

z_error = self.z_lock_target - z_position
if np.isclose(z_error, 0.0):
return

motor_status = self.focus_status
if motor_status is None:
return

unclamped_target = motor_status.position + z_error
safe_target = np.clip(
unclamped_target,
motor_status.min_position,
motor_status.max_position,
)

delta = safe_target - motor_status.position
limited_delta = copysign(min(abs(delta), self.z_lock_max), delta)
if np.isclose(limited_delta, 0.0):
return

target_position = motor_status.position + limited_delta
self.send_ipc(FocusMoveCommand(position=target_position))
self.request_focus_status()

def _ensure_focus_motor(self) -> bool:
if self.focus_motor_name:
self._focus_motor_missing_notified = False
return True

raise NotImplementedError
if not self._focus_motor_missing_notified:
command = ShowMessageCommand(
text='Z-lock requires a focus motor',
details='Attach a FocusMotorBase hardware manager via MagScope.add_hardware.',
)
self.send_ipc(command)
self._focus_motor_missing_notified = True

return False

def set_bead_rois(self, value: dict[int, tuple[int, int, int, int]]):
previous_bead_rois = getattr(self, 'bead_rois', {}).copy()
Expand Down Expand Up @@ -235,11 +294,17 @@ def set_xy_lock_window(self, value: int):
@register_ipc_command(SetZLockOnCommand)
@register_script_command(SetZLockOnCommand)
def set_z_lock_on(self, value: bool):
if value and not self._ensure_focus_motor():
value = False

self.z_lock_on = value

command = UpdateZLockEnabledCommand(value=value)
self.send_ipc(command)

if self.z_lock_on:
self.request_focus_status()

@register_ipc_command(SetZLockBeadCommand)
@register_script_command(SetZLockBeadCommand)
def set_z_lock_bead(self, value: int):
Expand Down
85 changes: 83 additions & 2 deletions magscope/hardware.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass

from magscope.datatypes import MatrixBuffer
from magscope.processes import ManagerProcessBase, SingletonABCMeta
from magscope.ipc import register_ipc_command
from magscope.ipc_commands import (
FocusMoveCommand,
RequestFocusStatusCommand,
UpdateFocusStatusCommand,
)
from magscope.processes import FocusStatus, ManagerProcessBase, SingletonABCMeta


class HardwareManagerBase(ManagerProcessBase, ABC, metaclass=SingletonABCMeta):
Expand Down Expand Up @@ -42,4 +49,78 @@ def fetch(self):
data and timestamp in the matrix buffer (self._buffer).

The timestamp should be the seconds since the unix epoch:
(January 1, 1970, 00:00:00 UTC) """
(January 1, 1970, 00:00:00 UTC)
"""


@dataclass(frozen=True)
class FocusLimits:
minimum: float
maximum: float


class FocusMotorBase(HardwareManagerBase, ABC):
"""Base class for user-provided focus/Z motors.

Subclasses must implement absolute positioning along with limit reporting.
Each instance publishes its latest position and limits via
:class:`UpdateFocusStatusCommand` so other managers (e.g., Z-lock) can
react to changes in motor state.
"""

def __init__(self):
super().__init__()
self._last_reported_status: FocusStatus | None = None

@abstractmethod
def move_to(self, position: float) -> None:
"""Move the focus axis to ``position`` in nanometers."""

@abstractmethod
def get_position(self) -> float:
"""Return the current focus position in nanometers."""

@abstractmethod
def get_limits(self) -> FocusLimits:
"""Return the minimum and maximum reachable positions."""

def _apply_speed(self, speed: float) -> None:
"""Optionally adjust the motor speed. Subclasses may override."""

@register_ipc_command(FocusMoveCommand)
def handle_focus_move(self, position: float, speed: float | None = None) -> None:
"""Move the motor to ``position`` and publish the new status."""

limits = self.get_limits()
clipped_position = min(max(position, limits.minimum), limits.maximum)
if speed is not None:
self._apply_speed(speed)
self.move_to(clipped_position)
self._publish_focus_status()

@register_ipc_command(RequestFocusStatusCommand)
def handle_focus_status_request(self) -> None:
"""Broadcast the current position and limits."""

self._publish_focus_status(force=True)

def _publish_focus_status(self, *, force: bool = False) -> None:
"""Send a status update if it changed or if ``force`` is True."""

limits = self.get_limits()
status = FocusStatus(
position=self.get_position(),
min_position=limits.minimum,
max_position=limits.maximum,
)

if not force and status == self._last_reported_status:
return

self._last_reported_status = status
command = UpdateFocusStatusCommand(
position=status.position,
min_position=status.min_position,
max_position=status.max_position,
)
self.send_ipc(command)
18 changes: 18 additions & 0 deletions magscope/ipc_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ class MoveBeadsCommand(Command):
moves: list[tuple[int, int, int]]


@dataclass(frozen=True)
class FocusMoveCommand(Command):
position: float
speed: float | None = None


@dataclass(frozen=True)
class RequestFocusStatusCommand(Command):
pass


@dataclass(frozen=True)
class UpdateFocusStatusCommand(Command):
position: float
min_position: float
max_position: float


@dataclass(frozen=True)
class UpdateXYLockEnabledCommand(Command):
value: bool
Expand Down
Loading