Skip to content

Build lh.probe_tip_presence_via_pickup() #524

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions pylabrobot/liquid_handling/liquid_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2276,6 +2276,85 @@ def assign_child_resource(
"lh.deck.assign_child_resource() instead."
)

async def probe_tip_presence_via_pickup(
self,
tip_spots: List[TipSpot],
use_channels: Optional[List[int]] = None
) -> List[int]:
"""
Probe tip presence by attempting pickup on each TipSpot.

Args:
tip_spots: TipSpots to probe.
use_channels: Channels to use (must match tip_spots length).

Returns:
List[int]: 1 if tip is present, 0 otherwise.
"""
Comment on lines +2283 to +2293
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't you want to return a bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found it doesn't matter: The important point is that the returns of this method (a list of True/False or 1/0) will be used in simple array/matrix additions/subtractions.
For that purpose True/False or 1/0 are equivalent.

I am happy to change it to True / False :)


if use_channels is None:
use_channels = list(range(self.backend.num_channels))

if len(use_channels) > self.backend.num_channels:
raise ValueError(
"Liquid handler given more channels to use than exist: "
f"Given {len(use_channels)} channels to use but liquid handler "
f"only has {self.backend.num_channels}."
)

if len(use_channels) != len(tip_spots):
raise ValueError(
f"Length mismatch: received {len(use_channels)} channels for "
f"{len(tip_spots)} tip spots. One channel must be assigned per tip spot."
)

presence_flags = [1] * len(tip_spots)
z_height = tip_spots[0].get_absolute_location(z="top").z + 5

# Step 1: Cluster tip spots by x-coordinate
clusters_by_x = {}
for idx, tip_spot in enumerate(tip_spots):
x = tip_spot.location.x
clusters_by_x.setdefault(x, []).append((tip_spot, use_channels[idx], idx))

sorted_clusters = [clusters_by_x[x] for x in sorted(clusters_by_x)]

# Step 2: Probe each cluster
for cluster in sorted_clusters:
tip_subset, channel_subset, index_subset = zip(*cluster)

try:
await self.pick_up_tips(
list(tip_subset),
Comment on lines +2327 to +2328
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is list conversion necessary?

use_channels=list(channel_subset),
minimum_traverse_height_at_beginning_of_a_command=z_height,
z_position_at_end_of_a_command=z_height
)
except ChannelizedError as e:
for ch in e.errors:
if ch in channel_subset:
failed_local_idx = channel_subset.index(ch)
presence_flags[index_subset[failed_local_idx]] = 0

# Step 3: Drop tips immediately after probing
successful = [
(spot, ch) for spot, ch, i in cluster
if presence_flags[i] == 1
]
if successful:
Comment on lines +2340 to +2344
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe if any(successful) so it's more readable

try:
await self.drop_tips(
[spot for spot, _ in successful],
use_channels=[ch for _, ch in successful],
# minimum_traverse_height_at_beginning_of_a_command=z_height,
z_position_at_end_of_a_command=z_height
)
except Exception as e:
print(f"Warning: drop_tips failed for cluster at x={cluster[0][0].location.x}: {e}")
Comment on lines +2351 to +2353
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do you know it's this cluster?


return presence_flags



class OperationCallback(Protocol):
def __call__(self, handler: "LiquidHandler", *args: Any, **kwargs: Any) -> None:
Expand Down
Loading