Source code for ephys_link.platforms.ump3_manipulator

"""Sensapex uMp-3 Manipulator class

Extends from :class:`ephys_link.platforms.sensapex_manipulator.SensapexManipulator` to support the uMp-3 manipulator.
"""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING

from vbl_aquarium.models.ephys_link import (
    DriveToDepthRequest,
    DriveToDepthResponse,
    GotoPositionRequest,
    PositionalResponse,
)
from vbl_aquarium.models.unity import Vector4

from ephys_link.common import dprint, vector4_to_array
from ephys_link.platform_manipulator import (
    MM_TO_UM,
    POSITION_POLL_DELAY,
)
from ephys_link.platforms.sensapex_manipulator import SensapexManipulator

if TYPE_CHECKING:
    from sensapex import SensapexDevice


[docs]class UMP3Manipulator(SensapexManipulator): """Representation of a single Sensapex manipulator :param device: A Sensapex device :type device: :class: `sensapex.SensapexDevice` """ def __init__(self, device: SensapexDevice) -> None: """Construct a new Manipulator object :param device: A Sensapex device """ super().__init__(device) # Device functions
[docs] def get_pos(self) -> PositionalResponse: """Get the current position of the manipulator and convert it into mm. :return: Position in (x, y, z, x) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ try: position = [axis / MM_TO_UM for axis in self._device.get_pos(1)] # Add the depth axis to the end of the position. position.append(position[0]) # com.dprint(f"[SUCCESS]\t Got position of manipulator {self._id}\n") return PositionalResponse( position=Vector4(**dict(zip(Vector4.model_fields.keys(), position, strict=False))) ) except Exception as e: print(f"[ERROR]\t\t Getting position of manipulator {self._id}") print(f"{e}\n") return PositionalResponse(error="Error getting position")
[docs] async def goto_pos(self, request: GotoPositionRequest) -> PositionalResponse: """Move manipulator to position. :param request: The goto request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.GotoPositionRequest` :return: Resulting position in (x, y, z, x) (or an empty array on error) in mm and error message (if any). :rtype: :class:`ephys_link.common.PositionalOutputData` """ # Check if able to write if not self._can_write: print(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return PositionalResponse(error="Manipulator movement canceled") # Stop current movement if self._is_moving: self._device.stop() self._is_moving = False try: target_position_um = request.position * MM_TO_UM # Restrict target position to just depth-axis if inside brain if self._inside_brain: d_axis = target_position_um.x target_position_um = target_position_um.model_copy( update={**self.get_pos().position.model_dump(), "x": d_axis} ) # Mark movement as started self._is_moving = True # Send move command movement = self._device.goto_pos( vector4_to_array(target_position_um), request.speed * MM_TO_UM, ) # Wait for movement to finish while not movement.finished: await asyncio.sleep(POSITION_POLL_DELAY) # Get position final_position = self.get_pos().position # Mark movement as finished self._is_moving = False # Return success unless write was disabled during movement (meaning a stop occurred) if not self._can_write: dprint(f"[ERROR]\t\t Manipulator {self._id} movement canceled") return PositionalResponse(error="Manipulator movement canceled") # Return error if movement did not reach target. if not all( abs(axis) < self._movement_tolerance for axis in vector4_to_array(final_position - request.position) ): dprint(f"[ERROR]\t\t Manipulator {self._id} did not reach target position") dprint(f"\t\t\t Expected: {request.position}, Got: {final_position}") return PositionalResponse(error="Manipulator did not reach target position") # Made it to the target. dprint(f"[SUCCESS]\t Moved manipulator {self._id} to position" f" {final_position}\n") return PositionalResponse(position=final_position) except Exception as e: print(f"[ERROR]\t\t Moving manipulator {self._id} to position {request.position}") print(f"{e}\n") return PositionalResponse(error="Error moving manipulator")
[docs] async def drive_to_depth(self, request: DriveToDepthRequest) -> DriveToDepthResponse: """Drive the manipulator to a certain depth. :param request: The drive to depth request parsed from the server. :type request: :class:`vbl_aquarium.models.ephys_link.DriveToDepthRequest` :return: Resulting depth in mm (or 0 on error) and error message (if any). :rtype: :class:`ephys_link.common.DriveToDepthOutputData` """ # Get position before this movement target_pos = self.get_pos().position target_pos = target_pos.model_copy(update={"x": request.depth}) movement_result = await self.goto_pos(GotoPositionRequest(**request.model_dump(), position=target_pos)) return DriveToDepthResponse(depth=movement_result.position.w, error=movement_result.error)