Source code for ephys_link.platforms.new_scale_pathfinder_handler

"""Handle communications with New Scale's HTTP server

Implements New Scale specific API calls.

This is a subclass of :class:`ephys_link.platform_handler.PlatformHandler`.
"""

from __future__ import annotations

import json
from sys import exit
from typing import TYPE_CHECKING
from urllib import request
from urllib.error import URLError

from vbl_aquarium.models.ephys_link import (
    AngularResponse,
    BooleanStateResponse,
    CanWriteRequest,
    DriveToDepthRequest,
    DriveToDepthResponse,
    GotoPositionRequest,
    InsideBrainRequest,
    PositionalResponse,
    ShankCountResponse,
)
from vbl_aquarium.models.unity import Vector3, Vector4

from ephys_link.platform_handler import PlatformHandler

if TYPE_CHECKING:
    import socketio


[docs]class NewScalePathfinderHandler(PlatformHandler): """Handler for New Scale HTTP server""" # Valid New Scale manipulator IDs VALID_MANIPULATOR_IDS = ( "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "AA", "AB", "AC", "AD", "AE", "AF", "AG", "AH", "AI", "AJ", "AK", "AL", "AM", "AN", ) def __init__(self, port: int = 8080) -> None: """ Initialize New Scale via Pathfinder handler :param port: Port of New Scale Pathfinder HTTP server :type port: int """ super().__init__() self.num_axes = -1 self.dimensions = Vector4(x=15, y=15, z=15, w=0) self.port = port # Test connection to New Scale HTTP server try: request.urlopen(f"http://localhost:{self.port}") except URLError: print(f"New Scale Pathfinder HTTP server not online on port {self.port}") print("Please start the HTTP server and try again.") input("Press Enter to exit...") exit(1)
[docs] def query_data(self) -> dict: """Query New Scale HTTP server for data and return as dict. :return: Parsed JSON data from New Scale HTTP server. :rtype: dict """ try: return json.loads(request.urlopen(f"http://localhost:{self.port}").read()) except Exception as e: print(f"[ERROR]\t\t Unable to query for New Scale data: {type(e)} {e}\n")
[docs] def query_manipulator_data(self, manipulator_id: str) -> dict: """Query New Scale HTTP server for data on a specific manipulator. :param manipulator_id: manipulator ID. :return: Parsed JSON data for a particular manipulator. :rtype: dict :raises ValueError: if manipulator ID is not found in query. """ data_query = self.query_data()["ProbeArray"] manipulator_data = data_query[self.manipulators[manipulator_id]] # If the order of the manipulators switched (somehow) if manipulator_data["Id"] != manipulator_id: # Recalculate index and get data (manipulator_index, manipulator_data) = next( ( (index, data) for index, data in enumerate(self.query_data()["ProbeArray"]) if data["Id"] == manipulator_id ), (None, None), ) # Update index in manipulators dict if manipulator_index: self.manipulators[manipulator_id] = manipulator_index # If data query was unsuccessful if not manipulator_data: msg = f"Unable to find manipulator {manipulator_id}" raise ValueError(msg) # Return data return manipulator_data
def _get_manipulators(self) -> list: return [probe["Id"] for probe in self.query_data()["ProbeArray"]] def _register_manipulator(self, manipulator_id: str) -> None: # Check if ID is a valid New Scale manipulator ID if manipulator_id not in self.VALID_MANIPULATOR_IDS: msg = f"Invalid manipulator ID {manipulator_id}" raise ValueError(msg) # Check if ID is connected if manipulator_id not in self._get_manipulators(): msg = f"Manipulator {manipulator_id} not connected" raise ValueError(msg) # Get index of the manipulator manipulator_index = next( (index for index, data in enumerate(self.query_data()["ProbeArray"]) if data["Id"] == manipulator_id), None, ) if manipulator_index is None: msg = f"Unable to find manipulator {manipulator_id}" raise ValueError(msg) self.manipulators[manipulator_id] = manipulator_index def _unregister_manipulator(self, manipulator_id: str) -> None: del self.manipulators[manipulator_id] def _get_pos(self, manipulator_id: str) -> PositionalResponse: """Get the current position of the manipulator in mm :param manipulator_id: manipulator ID :return: Callback parameters (position in (x, y, z, w) (or an empty array on error) in mm, error message) """ manipulator_data = self.query_manipulator_data(manipulator_id) return PositionalResponse( position=Vector4( x=manipulator_data["Tip_X_ML"], y=manipulator_data["Tip_Y_AP"], z=manipulator_data["Tip_Z_DV"], w=0 ) ) def _get_angles(self, manipulator_id: str) -> AngularResponse: manipulator_data = self.query_manipulator_data(manipulator_id) # Apply PosteriorAngle to Polar to get the correct angle. adjusted_polar = manipulator_data["Polar"] - self.query_data()["PosteriorAngle"] return AngularResponse( angles=Vector3( x=adjusted_polar if adjusted_polar > 0 else 360 + adjusted_polar, y=manipulator_data["Pitch"], z=manipulator_data.get("ShankOrientation", 0), ) ) def _get_shank_count(self, manipulator_id: str) -> ShankCountResponse: for probe in self.query_data()["ProbeArray"]: if probe["Id"] == manipulator_id: return ShankCountResponse(shank_count=probe.get("ShankCount", 1)) return ShankCountResponse(error="Unable to find manipulator") async def _goto_pos(self, _: GotoPositionRequest) -> PositionalResponse: raise NotImplementedError async def _drive_to_depth(self, _: DriveToDepthRequest) -> DriveToDepthResponse: raise NotImplementedError def _set_inside_brain(self, _: InsideBrainRequest) -> BooleanStateResponse: raise NotImplementedError async def _calibrate(self, manipulator_id: str, sio: socketio.AsyncServer) -> str: raise NotImplementedError def _bypass_calibration(self, manipulator_id: str) -> str: return "" def _set_can_write(self, _: CanWriteRequest) -> BooleanStateResponse: raise NotImplementedError def _unified_space_to_platform_space(self, unified_position: list[float]) -> list[float]: raise NotImplementedError def _platform_space_to_unified_space(self, platform_position: list[float]) -> list[float]: raise NotImplementedError