Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions pylabrobot/liquid_handling/backends/opentrons_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,69 @@ async def list_connected_modules(self) -> List[dict]:
"""List all connected temperature modules."""
return cast(List[dict], ot_api.modules.list_connected_modules())

def _pipette_id_for_channel(self, channel: int) -> str:
pipettes = []
if self.left_pipette is not None:
pipettes.append(self.left_pipette["pipetteId"])
if self.right_pipette is not None:
pipettes.append(self.right_pipette["pipetteId"])
if channel < 0 or channel >= len(pipettes):
raise NoChannelError(f"Channel {channel} not available on this OT-2 setup.")
return pipettes[channel]

async def prepare_for_manual_channel_operation(self, channel: int):
"""Validate channel exists (no-op otherwise for OT-2)."""

_ = self._pipette_id_for_channel(channel)

async def move_channel_x(self, channel: int, x: float):
"""Move a channel to an absolute x coordinate using savePosition to seed pose."""

pipette_id = self._pipette_id_for_channel(channel)
try:
res = ot_api.lh.save_position(pipette_id=pipette_id)
pos = res["data"]["result"]["position"]
current = Coordinate(pos["x"], pos["y"], pos["z"])
except Exception as exc: # noqa: BLE001
raise NoChannelError("Failed to query current pipette position") from exc

target = Coordinate(x=x, y=current.y, z=current.z)
await self.move_pipette_head(
location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id
)

async def move_channel_y(self, channel: int, y: float):
"""Move a channel to an absolute y coordinate using savePosition to seed pose."""

pipette_id = self._pipette_id_for_channel(channel)
try:
res = ot_api.lh.save_position(pipette_id=pipette_id)
pos = res["data"]["result"]["position"]
current = Coordinate(pos["x"], pos["y"], pos["z"])
except Exception as exc: # noqa: BLE001
raise NoChannelError("Failed to query current pipette position") from exc

target = Coordinate(x=current.x, y=y, z=current.z)
await self.move_pipette_head(
location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id
)

async def move_channel_z(self, channel: int, z: float):
"""Move a channel to an absolute z coordinate using savePosition to seed pose."""

pipette_id = self._pipette_id_for_channel(channel)
try:
res = ot_api.lh.save_position(pipette_id=pipette_id)
pos = res["data"]["result"]["position"]
current = Coordinate(pos["x"], pos["y"], pos["z"])
except Exception as exc: # noqa: BLE001
raise NoChannelError("Failed to query current pipette position") from exc

target = Coordinate(x=current.x, y=current.y, z=z)
await self.move_pipette_head(
location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id
)

async def move_pipette_head(
self,
location: Coordinate,
Expand Down