@@ -2477,10 +2477,10 @@ async def consolidate_tip_inventory(self, tip_racks: List[TipRack]):
2477
2477
visits to the same drop columns.
2478
2478
"""
2479
2479
2480
- def merge_sublists (lists : List [List [int ]], max_len : int ) -> List [List [int ]]:
2480
+ def merge_sublists (lists : List [List [TipSpot ]], max_len : int ) -> List [List [TipSpot ]]:
2481
2481
"""Merge adjacent sublists if combined length <= max_len, without splitting sublists."""
2482
- merged : List [List [int ]] = []
2483
- buffer : List [int ] = []
2482
+ merged : List [List [TipSpot ]] = []
2483
+ buffer : List [TipSpot ] = []
2484
2484
2485
2485
for sublist in lists :
2486
2486
if len (sublist ) == 0 :
@@ -2499,8 +2499,8 @@ def merge_sublists(lists: List[List[int]], max_len: int) -> List[List[int]]:
2499
2499
return merged
2500
2500
2501
2501
def divide_list_into_chunks (
2502
- list_l : List [Any ], chunk_size : int
2503
- ) -> Generator [List [Any ], None , None ]:
2502
+ list_l : List [TipSpot ], chunk_size : int
2503
+ ) -> Generator [List [TipSpot ], None , None ]:
2504
2504
"""Divides a list into smaller chunks of a specified size.
2505
2505
2506
2506
Parameters:
@@ -2523,7 +2523,7 @@ def divide_list_into_chunks(
2523
2523
continue # ignore non-partially-filled tip_racks
2524
2524
2525
2525
tipspots_w_tips = [
2526
- tip_spot for has_tip , tip_spot in zip (tip_status , tip_rack .children ) if has_tip
2526
+ tip_spot for has_tip , tip_spot in zip (tip_status , tip_rack .get_all_items () ) if has_tip
2527
2527
]
2528
2528
2529
2529
# Identify model by hashed unique physical characteristics
@@ -2547,7 +2547,9 @@ def divide_list_into_chunks(
2547
2547
for model , rack_list in clusters_by_model .items ():
2548
2548
print (f"Consolidating: - { ', ' .join ([rack .name for rack , _ in rack_list ])} " )
2549
2549
2550
- all_tip_spots_list = [tip_spot for tip_rack , _ in rack_list for tip_spot in tip_rack .children ]
2550
+ all_tip_spots_list = [
2551
+ tip_spot for tip_rack , _ in rack_list for tip_spot in tip_rack .get_all_items ()
2552
+ ]
2551
2553
2552
2554
# 1: Record current tip state
2553
2555
current_tip_presence_list = [tip_spot .has_tip () for tip_spot in all_tip_spots_list ]
@@ -2575,14 +2577,17 @@ def divide_list_into_chunks(
2575
2577
continue
2576
2578
2577
2579
# 4: Cluster target tip_spots by BOTH parent tip_rack & x-coordinate
2578
- sorted_tip_spots = sorted (
2579
- all_target_tip_spots , key = lambda tip : (tip .parent .name , round (tip .location .x , 3 ))
2580
- )
2580
+ def key_for_tip_spot (tip_spot : TipSpot ) -> Tuple [str , float ]:
2581
+ """Key function to sort tip spots by parent name and x-coordinate."""
2582
+ assert tip_spot .parent is not None and tip_spot .location is not None
2583
+ return (tip_spot .parent .name , round (tip_spot .location .x , 3 ))
2584
+
2585
+ sorted_tip_spots = sorted (all_target_tip_spots , key = key_for_tip_spot )
2581
2586
2582
2587
target_tip_clusters_by_parent_x : Dict [Tuple [str , float ], List [TipSpot ]] = {}
2583
2588
2584
2589
for tip_spot in sorted_tip_spots :
2585
- key = (tip_spot . parent . name , round ( tip_spot . location . x , 3 ) )
2590
+ key = key_for_tip_spot (tip_spot )
2586
2591
if key not in target_tip_clusters_by_parent_x :
2587
2592
target_tip_clusters_by_parent_x [key ] = []
2588
2593
target_tip_clusters_by_parent_x [key ].append (tip_spot )
@@ -2605,7 +2610,7 @@ def divide_list_into_chunks(
2605
2610
# by aggregating drop columns i.e. same drop column should not be visited twice!
2606
2611
if num_channels_available >= 8 : # physical constraint of tip_rack's having 8 rows
2607
2612
merged_target_tip_clusters = merge_sublists (
2608
- target_tip_clusters_by_parent_x .values (), max_len = 8
2613
+ list ( target_tip_clusters_by_parent_x .values () ), max_len = 8
2609
2614
)
2610
2615
else : # by chunking drop tip_spots list into size of available channels
2611
2616
merged_target_tip_clusters = list (
0 commit comments