Source code for ephys_link.platforms.sensapex_manipulator

"""Sensapex Manipulator class

Handles logic for calling Sensapex API functions. Also includes extra logic for safe
function calls, error handling, managing per-manipulator attributes, and returning the
appropriate callback parameters like in :mod:`ephys_link.sensapex_handler`.
"""

from __future__ import annotations

import asyncio
import threading
from typing import TYPE_CHECKING

from vbl_aquarium.models.ephys_link import (
    CanWriteRequest,
    DriveToDepthRequest,
    DriveToDepthResponse,
    GotoPositionRequest,
    PositionalResponse,
)
from vbl_aquarium.models.unity import Vector4

from ephys_link.common import dprint, vector4_to_array
from ephys_link.platform_manipulator import (
    HOURS_TO_SECONDS,
    MM_TO_UM,
    POSITION_POLL_DELAY,
    PlatformManipulator,
)

if TYPE_CHECKING:
    from sensapex import SensapexDevice


[docs]class SensapexManipulator(PlatformManipulator): """Representation of a single Sensapex manipulator :param device: A Sensapex device :type device: :class: `sensapex.SensapexDevice` """ def __init__(self, device: SensapexDevice) -> None: """Construct a new Manipulator object :param device: A Sensapex device """ super().__init__() self._device = device self._id = device.dev_id # Device functions
[docs] def get_pos(self) -> PositionalResponse: """Get the current position of the manipulator and convert it into mm. :return: Position in (x, y, z, w) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ try: # com.dprint(f"[SUCCESS]\t Got position of manipulator {self._id}\n") return PositionalResponse( position=Vector4( **dict( zip( Vector4.model_fields.keys(), [axis / MM_TO_UM for axis in self._device.get_pos(1)], strict=False, ) ) ) ) except Exception as e: print(f"[ERROR]\t\t Getting position of manipulator {self._id}") print(f"{e}\n") return PositionalResponse(error="Error getting position")
[docs] async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: """Move manipulator to position. :param request: The goto request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` :return: Resulting position in (x, y, z, w) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ # Check if able to write if not self._can_write: print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return PositionalResponse(error="Manipulator movement canceled") # Stop current movement if self._is_moving: self._device.stop() self._is_moving = False try: target_position_um = request.position * MM_TO_UM # Restrict target position to just depth-axis if inside brain if self._inside_brain: d_axis = target_position_um.w target_position_um = target_position_um.model_copy( update={**self.get_pos().position.model_dump(), "w": d_axis} ) # Mark movement as started self._is_moving = True # Send move command movement = self._device.goto_pos( vector4_to_array(target_position_um), request.speed * MM_TO_UM, ) # Wait for movement to finish while not movement.finished: await asyncio.sleep(POSITION_POLL_DELAY) # Get position final_position = self.get_pos().position # Mark movement as finished. self._is_moving = False # Return success unless write was disabled during movement (meaning a stop occurred). if not self._can_write: dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return PositionalResponse(error="Manipulator movement canceled") # Return error if movement did not reach target. if not all( abs(axis) < self._movement_tolerance for axis in vector4_to_array(final_position - request.position) ): dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target position") dprint(f"\t\t\t Expected: {request.position}, Got: {final_position}") return PositionalResponse(error="Manipulator did not reach target position") # Made it to the target. dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position {final_position}\n") return PositionalResponse(position=final_position) except Exception as e: print(f"[ERROR]\t\t Moving manipulator {self._id} to position {request.position}") print(f"{e}\n") return PositionalResponse(error="Error moving manipulator")
[docs] async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: """Drive the manipulator to a certain depth. :param request: The drive to depth request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` :return: Resulting depth in mm (or 0 on error) and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ # Get position before this movement target_pos = self.get_pos().position target_pos = target_pos.model_copy(update={"w": request.depth}) movement_result = await self.goto_pos(GotoPositionRequest(**request.model_dump(), position=target_pos)) return DriveToDepthResponse(depth=movement_result.position.w, error=movement_result.error)
[docs] def set_inside_brain(self, inside: bool) -> None: """Set if the manipulator is inside the brain. Used to signal that the brain should move at :const:`INSIDE_BRAIN_SPEED_LIMIT` :param inside: True if the manipulator is inside the brain, False otherwise. :type inside: bool :return: None """ self._inside_brain = inside
[docs] def get_can_write(self) -> bool: """Return if the manipulator can move. :return: True if the manipulator can move, False otherwise. :rtype: bool """ return self._can_write
[docs] def set_can_write(self, request: CanWriteRequest) -> None: """Set if the manipulator can move. :param request: The can write request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.CanWriteRequest` :return: None """ self._can_write = request.can_write if request.can_write and request.hours > 0: if self._reset_timer: self._reset_timer.cancel() self._reset_timer = threading.Timer(request.hours * HOURS_TO_SECONDS, self.reset_can_write) self._reset_timer.start()
[docs] def reset_can_write(self) -> None: """Reset the :attr:`can_write` flag.""" self._can_write = False
# Calibration
[docs] def call_calibrate(self) -> None: """Calibrate the manipulator. :return: None """ self._device.calibrate_zero_position()
[docs] def get_calibrated(self) -> bool: """Return the calibration state of the manipulator. :return: True if the manipulator is calibrated, False otherwise. :rtype: bool """ return self._calibrated
[docs] def set_calibrated(self) -> None: """Set the manipulator to be calibrated. :return: None """ self._calibrated = True
[docs] def stop(self) -> None: """Stop the manipulator :return: None """ self._can_write = False self._device.stop() dprint(f"[SUCCESS]\t Stopped manipulator {self._id}")