Source code for ephys_link.platform_handler
"""Handle communications with platform specific API
Handles relaying WebSocket messages to the appropriate platform API functions and
conducting error checks on the input and output values
Function names here are the same as the WebSocket events. They are called when the
server receives an event from a client. In general, each function does the following:
1. Receive extracted arguments from :mod:`ephys_link.server`
2. Call and check the appropriate platform API function (overloaded by each platform)
3. Log/handle successes and failures
4. Return the callback parameters to :mod:`ephys_link.server`
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from ephys_link import common as com
if TYPE_CHECKING:
import socketio
[docs]class PlatformHandler(ABC):
"""An abstract class that defines the interface for a manipulator handler."""
def __init__(self):
"""Initialize the manipulator handler with a dictionary of manipulators."""
# Registered manipulators are stored as a dictionary of IDs (string) to
# manipulator objects
self.manipulators = {}
self.num_axes = 4
# Platform axes dimensions in mm
self.dimensions = [20, 20, 20, 20]
# Platform Handler Methods
[docs] def reset(self) -> bool:
"""Reset handler.
:return: True if successful, False otherwise.
:rtype: bool
"""
stop_result = self.stop()
self.manipulators.clear()
return stop_result
[docs] def stop(self) -> bool:
"""Stop handler.
:return: True if successful, False otherwise.
:rtype: bool
"""
try:
for manipulator in self.manipulators.values():
if hasattr(manipulator, "stop"):
manipulator.stop()
except Exception as e:
print(f"[ERROR]\t\t Could not stop manipulators: {e}\n")
return False
else:
return True
[docs] def get_manipulators(self) -> com.GetManipulatorsOutputData:
"""Get all registered manipulators.
:return: Result of connected manipulators, platform information, and error message (if any).
:rtype: :class:`ephys_link.common.GetManipulatorsOutputData`
"""
error = "Error getting manipulators"
try:
devices = self._get_manipulators()
error = ""
except Exception as e:
print(f"[ERROR]\t\t Getting manipulators: {type(e)}: {e}\n")
return com.GetManipulatorsOutputData([], self.num_axes, self.dimensions, error)
else:
return com.GetManipulatorsOutputData(devices, self.num_axes, self.dimensions, error)
[docs] def register_manipulator(self, manipulator_id: str) -> str:
"""Register a manipulator.
:param manipulator_id: The ID of the manipulator to register.
:type manipulator_id: str
:return: Error message on error, empty string otherwise.
:rtype: str
"""
# Check if manipulator is already registered
if manipulator_id in self.manipulators:
print(f"[ERROR]\t\t Manipulator already registered:" f" {manipulator_id}\n")
return "Manipulator already registered"
try:
# Register manipulator
self._register_manipulator(manipulator_id)
except ValueError as ve:
# Manipulator not found in UMP
print(f"[ERROR]\t\t Manipulator not found: {manipulator_id}: {ve}\n")
return "Manipulator not found"
except Exception as e:
# Other error
print(f"[ERROR]\t\t Registering manipulator: {manipulator_id}")
print(f"{type(e)}: {e}\n")
return "Error registering manipulator"
else:
com.dprint(f"[SUCCESS]\t Registered manipulator: {manipulator_id}\n")
return ""
[docs] def unregister_manipulator(self, manipulator_id: str) -> str:
"""Unregister a manipulator.
:param manipulator_id: The ID of the manipulator to unregister.
:type manipulator_id: str
:return: Error message on error, empty string otherwise.
:rtype: str
"""
# Check if manipulator is not registered
if manipulator_id not in self.manipulators:
print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n")
return "Manipulator not registered"
try:
# Unregister manipulator
self._unregister_manipulator(manipulator_id)
except Exception as e:
# Other error
print(f"[ERROR]\t\t Unregistering manipulator: {manipulator_id}")
print(f"{e}\n")
return "Error unregistering manipulator"
else:
com.dprint(f"[SUCCESS]\t Unregistered manipulator: {manipulator_id}\n")
return ""
[docs] def get_pos(self, manipulator_id: str) -> com.PositionalOutputData:
"""Get the current position of a manipulator.
:param manipulator_id: The ID of the manipulator to get the position of.
:type manipulator_id: str
:return: Positional information for the manipulator and error message (if any).
:rtype: :class:`ephys_link.common.PositionalOutputData`
"""
try:
# Check calibration status
if (
hasattr(self.manipulators[manipulator_id], "get_calibrated")
and not self.manipulators[manipulator_id].get_calibrated()
):
print(f"[ERROR]\t\t Calibration not complete: {manipulator_id}\n")
return com.PositionalOutputData([], "Manipulator not calibrated")
# Get position and convert to unified space
manipulator_pos = self._get_pos(manipulator_id)
# Shortcut return for Pathfinder
if self.num_axes == -1:
return manipulator_pos
# Error check and convert position to unified space
if manipulator_pos["error"] != "":
return manipulator_pos
return com.PositionalOutputData(self._platform_space_to_unified_space(manipulator_pos["position"]), "")
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}")
return com.PositionalOutputData([], "Manipulator not registered")
[docs] def get_angles(self, manipulator_id: str) -> com.AngularOutputData:
"""Get the current position of a manipulator.
:param manipulator_id: The ID of the manipulator to get the angles of.
:type manipulator_id: str
:return: Angular information for the manipulator and error message (if any).
:rtype: :class:`ephys_link.common.AngularOutputData`
"""
try:
# Check calibration status
if (
hasattr(self.manipulators[manipulator_id], "get_calibrated")
and not self.manipulators[manipulator_id].get_calibrated()
):
print(f"[ERROR]\t\t Calibration not complete: {manipulator_id}\n")
return com.AngularOutputData([], "Manipulator not calibrated")
# Get position
return self._get_angles(manipulator_id)
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}")
return com.AngularOutputData([], "Manipulator not registered")
[docs] def get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData:
"""Get the number of shanks on the probe
:param manipulator_id: The ID of the manipulator to get the number of shanks of.
:type manipulator_id: str
:return: Number of shanks on the probe.
:rtype: :class:`ephys_link.common.ShankCountOutputData`
"""
return self._get_shank_count(manipulator_id)
[docs] async def goto_pos(self, manipulator_id: str, position: list[float], speed: float) -> com.PositionalOutputData:
"""Move manipulator to position
:param manipulator_id: The ID of the manipulator to move
:type manipulator_id: str
:param position: The position to move to in (x, y, z, w) in mm
:type position: list[float]
:param speed: The speed to move at (in mm/s)
:type speed: float
:return: Resulting position of the manipulator and error message (if any).
:rtype: :class:`ephys_link.common.PositionalOutputData`
"""
try:
# Check calibration status
if not self.manipulators[manipulator_id].get_calibrated():
print(f"[ERROR]\t\t Calibration not complete: {manipulator_id}\n")
return com.PositionalOutputData([], "Manipulator not calibrated")
# Check write state
if not self.manipulators[manipulator_id].get_can_write():
print(f"[ERROR]\t\t Cannot write to manipulator: {manipulator_id}")
return com.PositionalOutputData([], "Cannot write to manipulator")
# Convert position to platform space, move, and convert final position back to
# unified space
end_position = await self._goto_pos(manipulator_id, self._unified_space_to_platform_space(position), speed)
if end_position["error"] != "":
return end_position
return com.PositionalOutputData(self._platform_space_to_unified_space(end_position["position"]), "")
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n")
return com.PositionalOutputData([], "Manipulator not registered")
[docs] async def drive_to_depth(self, manipulator_id: str, depth: float, speed: float) -> com.DriveToDepthOutputData:
"""Drive manipulator to depth
:param manipulator_id: The ID of the manipulator to drive
:type manipulator_id: str
:param depth: The depth to drive to in mm
:type depth: float
:param speed: The speed to drive at (in mm/s)
:type speed: float
:return: Resulting depth of the manipulator and error message (if any).
:rtype: :class:`ephys_link.common.DriveToDepthOutputData`
"""
try:
# Check calibration status
if not self.manipulators[manipulator_id].get_calibrated():
print(f"[ERROR]\t\t Calibration not complete: {manipulator_id}\n")
return com.DriveToDepthOutputData(0, "Manipulator not calibrated")
# Check write state
if not self.manipulators[manipulator_id].get_can_write():
print(f"[ERROR]\t\t Cannot write to manipulator: {manipulator_id}")
return com.DriveToDepthOutputData(0, "Cannot write to manipulator")
end_depth = await self._drive_to_depth(
manipulator_id,
self._unified_space_to_platform_space([0, 0, 0, depth])[3],
speed,
)
if end_depth["error"] != "":
return end_depth
return com.DriveToDepthOutputData(
self._platform_space_to_unified_space([0, 0, 0, end_depth["depth"]])[3],
"",
)
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n")
return com.DriveToDepthOutputData(0, "Manipulator " "not registered")
[docs] def set_inside_brain(self, manipulator_id: str, inside: bool) -> com.StateOutputData:
"""Set manipulator inside brain state (restricts motion)
:param manipulator_id: The ID of the manipulator to set the state of
:type manipulator_id: str
:param inside: True if inside brain, False if outside
:type inside: bool
:return: New inside brain state of the manipulator and error message (if any).
:rtype: :class:`ephys_link.common.StateOutputData`
"""
try:
# Check calibration status
if (
hasattr(self.manipulators[manipulator_id], "get_calibrated")
and not self.manipulators[manipulator_id].get_calibrated()
):
print("[ERROR]\t\t Calibration not complete\n")
return com.StateOutputData(False, "Manipulator not calibrated")
return self._set_inside_brain(manipulator_id, inside)
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator {manipulator_id} not registered\n")
return com.StateOutputData(False, "Manipulator not " "registered")
except Exception as e:
# Other error
print(f"[ERROR]\t\t Set manipulator {manipulator_id} inside brain " f"state")
print(f"{e}\n")
return com.StateOutputData(False, "Error setting " "inside brain")
[docs] async def calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str:
"""Calibrate manipulator
:param manipulator_id: ID of manipulator to calibrate
:type manipulator_id: str
:param sio: SocketIO object (to call sleep)
:type sio: :class:`socketio.AsyncServer`
:return: Error message on error, empty string otherwise.
:rtype: str
"""
try:
# Check write state
if not self.manipulators[manipulator_id].get_can_write():
print(f"[ERROR]\t\t Cannot write to manipulator: {manipulator_id}")
return "Cannot write to manipulator"
return await self._calibrate(manipulator_id, sio)
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator {manipulator_id} not registered\n")
return "Manipulator not registered"
except Exception as e:
# Other error
print(f"[ERROR]\t\t Calibrate manipulator {manipulator_id}")
print(f"{e}\n")
return "Error calibrating manipulator"
[docs] def bypass_calibration(self, manipulator_id: str) -> str:
"""Bypass calibration of manipulator
:param manipulator_id: ID of manipulator to bypass calibration
:type manipulator_id: str
:return: Error message on error, empty string otherwise.
:rtype: str
"""
try:
# Bypass calibration
return self._bypass_calibration(manipulator_id)
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator {manipulator_id} not registered\n")
return "Manipulator not registered"
except Exception as e:
# Other error
print(f"[ERROR]\t\t Bypass calibration of manipulator {manipulator_id}")
print(f"{e}\n")
return "Error bypassing calibration"
[docs] def set_can_write(
self,
manipulator_id: str,
can_write: bool,
hours: float,
sio: socketio.AsyncServer,
) -> com.StateOutputData:
"""Set manipulator can_write state (enables/disabled moving manipulator)
:param manipulator_id: The ID of the manipulator to set the state of
:type manipulator_id: str
:param can_write: True if allowed to move, False if outside
:type can_write: bool
:param hours: The number of hours to allow writing (0 = forever)
:type hours: float
:param sio: SocketIO object from server to emit reset event
:type sio: :class:`socketio.AsyncServer`
:return: New can_write state of the manipulator and error message (if any).
:rtype: :class:`ephys_link.common.StateOutputData`
"""
try:
return self._set_can_write(manipulator_id, can_write, hours, sio)
except KeyError:
# Manipulator not found in registered manipulators
print(f"[ERROR]\t\t Manipulator not registered: {manipulator_id}\n")
return com.StateOutputData(False, "Manipulator not " "registered")
except Exception as e:
# Other error
print(f"[ERROR]\t\t Set manipulator {manipulator_id} can_write state")
print(f"{e}\n")
return com.StateOutputData(False, "Error setting " "can_write")
# Platform specific methods to override
@abstractmethod
def _get_manipulators(self) -> list:
raise NotImplementedError
@abstractmethod
def _register_manipulator(self, manipulator_id: str) -> None:
raise NotImplementedError
@abstractmethod
def _unregister_manipulator(self, manipulator_id: str) -> None:
raise NotImplementedError
@abstractmethod
def _get_pos(self, manipulator_id: str) -> com.PositionalOutputData:
raise NotImplementedError
@abstractmethod
def _get_angles(self, manipulator_id: str) -> com.AngularOutputData:
raise NotImplementedError
@abstractmethod
def _get_shank_count(self, manipulator_id: str) -> com.ShankCountOutputData:
raise NotImplementedError
@abstractmethod
async def _goto_pos(self, manipulator_id: str, position: list[float], speed: float) -> com.PositionalOutputData:
raise NotImplementedError
@abstractmethod
async def _drive_to_depth(self, manipulator_id: str, depth: float, speed: float) -> com.DriveToDepthOutputData:
raise NotImplementedError
@abstractmethod
def _set_inside_brain(self, manipulator_id: str, inside: bool) -> com.StateOutputData:
raise NotImplementedError
@abstractmethod
async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str:
"""Calibrate manipulator
:param manipulator_id: ID of manipulator to calibrate
:type manipulator_id: str
:param sio: SocketIO object (to call sleep)
:type sio: :class:`socketio.AsyncServer`
:return: Callback parameters (manipulator ID, error message)
:rtype: str
"""
@abstractmethod
def _bypass_calibration(self, manipulator_id: str) -> str:
raise NotImplementedError
@abstractmethod
def _set_can_write(
self,
manipulator_id: str,
can_write: bool,
hours: float,
sio: socketio.AsyncServer,
) -> com.StateOutputData:
raise NotImplementedError
@abstractmethod
def _platform_space_to_unified_space(self, platform_position: list[float]) -> list[float]:
"""Convert position in platform space to position in unified manipulator space
:param platform_position: Position in platform space (x, y, z, w) in mm
:type platform_position: list[float]
:return: Position in unified manipulator space (x, y, z, w) in mm
:rtype: list[float]
"""
raise NotImplementedError
@abstractmethod
def _unified_space_to_platform_space(self, unified_position: list[float]) -> list[float]:
"""Convert position in unified manipulator space to position in platform space
:param unified_position: Position in unified manipulator space (x, y, z, w) in mm
:type unified_position: list[float]
:return: Position in platform space (x, y, z, w) in mm
:rtype: list[float]
"""
raise NotImplementedError