Source code for ephys_link.platforms.new_scale_manipulator

"""New Scale Manipulator class

Handles logic for calling New Scale API functions. Also includes extra logic for safe
function calls, error handling, managing per-manipulator attributes, and returning the
appropriate callback parameters like in :mod:`ephys_link.new_scale_handler`.
"""

from __future__ import annotations

import asyncio
import threading
from typing import TYPE_CHECKING

from vbl_aquarium.models.ephys_link import (
    CanWriteRequest,
    DriveToDepthRequest,
    DriveToDepthResponse,
    GotoPositionRequest,
    PositionalResponse,
)
from vbl_aquarium.models.unity import Vector4

import ephys_link.common as com
from ephys_link.common import vector4_to_array
from ephys_link.platform_manipulator import (
    HOURS_TO_SECONDS,
    MM_TO_UM,
    POSITION_POLL_DELAY,
    PlatformManipulator,
)

if TYPE_CHECKING:
    # noinspection PyUnresolvedReferences
    from NstMotorCtrl import NstCtrlAxis

# Constants
ACCELERATION_MULTIPLIER = 5
CUTOFF_MULTIPLIER = 0.005
AT_TARGET_FLAG = 0x040000


[docs]class NewScaleManipulator(PlatformManipulator): def __init__( self, manipulator_id: str, x_axis: NstCtrlAxis, y_axis: NstCtrlAxis, z_axis: NstCtrlAxis, ) -> None: """Construct a new Manipulator object :param manipulator_id: Manipulator ID :param x_axis: X axis object :param y_axis: Y axis object :param z_axis: Z axis object """ super().__init__() self._id = manipulator_id self._movement_tolerance = 0.01 self._x = x_axis self._y = y_axis self._z = z_axis self._axes = [self._x, self._y, self._z] # Set to CL control self._x.SetCL_Enable(True) self._y.SetCL_Enable(True) self._z.SetCL_Enable(True)
[docs] def query_all_axes(self): """Query all axes for their position and status""" for axis in self._axes: axis.QueryPosStatus()
[docs] def get_pos(self) -> PositionalResponse: """Get the current position of the manipulator and convert it into mm. :return: Position of manipulator in (x, y, z, z) in mm (or an empty array on error) and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ self.query_all_axes() # Get position data and convert from µm to mm try: # com.dprint(f"[SUCCESS]\t Got position of manipulator {self._id}\n") return PositionalResponse( position=Vector4( x=self._x.CurPosition / MM_TO_UM, y=self._y.CurPosition / MM_TO_UM, z=self._z.CurPosition / MM_TO_UM, w=self._z.CurPosition / MM_TO_UM, ) ) except Exception as e: print(f"[ERROR]\t\t Getting position of manipulator {self._id}") print(f"{e}\n") return PositionalResponse(error="Error getting position")
[docs] async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: """Move manipulator to position. :param request: The goto request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` :return: Resulting position of manipulator in (x, y, z, z) in mm (or an empty array on error) and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ # Check if able to write if not self._can_write: print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return PositionalResponse(error="Manipulator movement canceled") # Stop current movement if self._is_moving: for axis in self._axes: axis.Stop() self._is_moving = False try: target_position_um = request.position * MM_TO_UM # Restrict target position to just z-axis if inside brain if self._inside_brain: z_axis = target_position_um.z target_position_um = target_position_um.model_copy( update={**self.get_pos().position.model_dump(), "z": z_axis} ) # Mark movement as started self._is_moving = True # Send move command speed_um = request.speed * MM_TO_UM for i in range(3): self._axes[i].SetCL_Speed( speed_um, speed_um * ACCELERATION_MULTIPLIER, speed_um * CUTOFF_MULTIPLIER, ) self._axes[i].MoveAbsolute(vector4_to_array(target_position_um)[i]) # Check and wait for completion (while able to write) self.query_all_axes() while ( not (self._x.CurStatus & AT_TARGET_FLAG) or not (self._y.CurStatus & AT_TARGET_FLAG) or not (self._z.CurStatus & AT_TARGET_FLAG) ) and self._can_write: await asyncio.sleep(POSITION_POLL_DELAY) self.query_all_axes() # Get position final_position = self.get_pos().position # Mark movement as finished self._is_moving = False # Return success unless write was disabled during movement (meaning a stop occurred) if not self._can_write: com.dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return PositionalResponse(error="Manipulator movement canceled") # Return error if movement did not reach target. if not all( abs(axis) < self._movement_tolerance for axis in vector4_to_array(final_position - request.position)[:3] ): com.dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target position.") com.dprint(f"\t\t\t Expected: {request.position}, Got: {final_position}") return PositionalResponse(error="Manipulator did not reach target position") # Made it to the target. com.dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position" f" {final_position}\n") return PositionalResponse(position=final_position) except Exception as e: print(f"[ERROR]\t\t Moving manipulator {self._id} to position {request.position}") print(f"{e}\n") return PositionalResponse(error="Error moving manipulator")
[docs] async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: """Drive the manipulator to a certain depth. :param request: The drive to depth request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` :return: Resulting depth of manipulator in mm (or 0 on error) and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ # Check if able to write if not self._can_write: print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return DriveToDepthResponse(error="Manipulator movement canceled") # Stop current movement if self._is_moving: for axis in self._axes: axis.Stop() self._is_moving = False try: target_depth_um = request.depth * MM_TO_UM # Mark movement as started self._is_moving = True # Send move command to just z axis speed_um = request.speed * MM_TO_UM self._z.SetCL_Speed( speed_um, speed_um * ACCELERATION_MULTIPLIER, speed_um * CUTOFF_MULTIPLIER, ) self._z.MoveAbsolute(target_depth_um) # Check and wait for completion (while able to write) self._z.QueryPosStatus() while not (self._z.CurStatus & AT_TARGET_FLAG) and self._can_write: await asyncio.sleep(0.1) self._z.QueryPosStatus() # Get position final_depth = self.get_pos().position.w # Mark movement as finished self._is_moving = False # Return success unless write was disabled during movement (meaning a stop occurred) if not self._can_write: com.dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return DriveToDepthResponse(error="Manipulator movement canceled") # Return error if movement did not reach target. if not abs(final_depth - request.depth) < self._movement_tolerance: com.dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target depth") com.dprint(f"\t\t\t Expected: {request.depth}, Got: {final_depth}") return DriveToDepthResponse(error="Manipulator did not reach target depth") # Made it to the target. com.dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position" f" {final_depth}\n") return DriveToDepthResponse(depth=final_depth) except Exception as e: print(f"[ERROR]\t\t Moving manipulator {self._id} to depth {request.depth}") print(f"{e}\n") # Return 0 and error message on failure return DriveToDepthResponse(error="Error driving manipulator")
[docs] def calibrate(self) -> bool: """Calibrate the manipulator. :return: None """ return self._x.CalibrateFrequency() and self._y.CalibrateFrequency() and self._z.CalibrateFrequency()
[docs] def get_calibrated(self) -> bool: """Return the calibration state of the manipulator. :return: True if the manipulator is calibrated, False otherwise. :rtype: bool """ return self._calibrated
[docs] def set_calibrated(self) -> None: """Set the manipulator to calibrated :return: None """ self._calibrated = True
[docs] def set_can_write(self, request: CanWriteRequest) -> None: """Set if the manipulator can move. :param request: The can write request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.CanWriteRequest` :return: None """ self._can_write = request.can_write if request.can_write and request.hours > 0: if self._reset_timer: self._reset_timer.cancel() self._reset_timer = threading.Timer(request.hours * HOURS_TO_SECONDS, self.reset_can_write) self._reset_timer.start()
[docs] def reset_can_write(self) -> None: """Reset the :attr:`can_write` flag.""" self._can_write = False
[docs] def get_can_write(self) -> bool: """Return if the manipulator can move. :return: True if the manipulator can move, False otherwise. :rtype: bool """ return self._can_write
[docs] def set_inside_brain(self, inside: bool) -> None: """Set if the manipulator is inside the brain. :param inside: True if the manipulator is inside the brain, False otherwise. :type inside: bool :return: None """ self._inside_brain = inside
[docs] def stop(self) -> None: """Stop all axes on manipulator. :returns None """ for axis in self._axes: axis.Stop() self._can_write = False com.dprint(f"[SUCCESS]\t Stopped manipulator {self._id}\n")