@@ -2478,14 +2478,12 @@ async def consolidate_tip_inventory(self, tip_racks: List[TipRack]):
2478
2478
"""
2479
2479
2480
2480
def merge_sublists (lists : List [List [int ]], max_len : int ) -> List [List [int ]]:
2481
- """
2482
- Merge adjacent sublists if combined length <= max_len,
2483
- without splitting sublists."""
2481
+ """Merge adjacent sublists if combined length <= max_len, without splitting sublists."""
2484
2482
merged : List [List [int ]] = []
2485
2483
buffer : List [int ] = []
2486
2484
2487
2485
for sublist in lists :
2488
- if not sublist :
2486
+ if len ( sublist ) == 0 :
2489
2487
continue # skip empty sublists
2490
2488
2491
2489
if len (buffer ) + len (sublist ) <= max_len :
@@ -2495,23 +2493,22 @@ def merge_sublists(lists: List[List[int]], max_len: int) -> List[List[int]]:
2495
2493
merged .append (buffer )
2496
2494
buffer = sublist # start new buffer
2497
2495
2498
- if buffer :
2496
+ if len ( buffer ) > 0 :
2499
2497
merged .append (buffer )
2500
2498
2501
2499
return merged
2502
2500
2503
2501
def divide_list_into_chunks (
2504
2502
list_l : List [Any ], chunk_size : int
2505
2503
) -> Generator [List [Any ], None , None ]:
2506
- """
2507
- Divides a list into smaller chunks of a specified size.
2504
+ """Divides a list into smaller chunks of a specified size.
2508
2505
2509
2506
Parameters:
2510
- - list_l (List[Any]) : The list to be divided into chunks.
2511
- - chunk_size (int) : The size of each chunk.
2507
+ - list_l: The list to be divided into chunks.
2508
+ - chunk_size: The size of each chunk.
2512
2509
2513
2510
Returns:
2514
- - Generator[List[Any], None, None]: A generator that yields chunks of the list.
2511
+ A generator that yields chunks of the list.
2515
2512
"""
2516
2513
for i in range (0 , len (list_l ), chunk_size ):
2517
2514
yield list_l [i : i + chunk_size ]
@@ -2520,33 +2517,35 @@ def divide_list_into_chunks(
2520
2517
2521
2518
for idx , tip_rack in enumerate (tip_racks ):
2522
2519
# Only consider partially-filled tip_racks
2523
- tip_status = [tip_spot .tracker .has_tip for tip_spot in tip_rack .children ]
2524
- partially_filled = any (tip_status ) and not all (tip_status )
2525
-
2526
- if partially_filled :
2527
- tipspots_w_tips = [i for b , i in zip (tip_status , tip_rack .children ) if b ]
2520
+ tip_status = [tip_spot .tracker .has_tip for tip_spot in tip_rack .get_all_items ()]
2528
2521
2529
- # Identify model by hashed unique physical characteristics
2530
- current_model = hash ( tipspots_w_tips [ 0 ]. tracker . get_tip ())
2522
+ if not ( any ( tip_status ) and not all ( tip_status )):
2523
+ continue # ignore non-partially-filled tip_racks
2531
2524
2532
- num_empty_tipspots = len (tip_status ) - len ( tipspots_w_tips )
2525
+ tipspots_w_tips = [ tip_spot for has_tip , tip_spot in zip (tip_status , tip_rack . children ) if has_tip ]
2533
2526
2534
- sanity_check = all (
2535
- hash (tip_spot .tracker .get_tip ()) == current_model for tip_spot in tipspots_w_tips [1 :]
2527
+ # Identify model by hashed unique physical characteristics
2528
+ current_model = hash (tipspots_w_tips [0 ].tracker .get_tip ())
2529
+ if not all (
2530
+ hash (tip_spot .tracker .get_tip ()) == current_model for tip_spot in tipspots_w_tips [1 :]
2531
+ ):
2532
+ raise ValueError (
2533
+ f"Tip rack { tip_rack .name } has mixed tip models, cannot consolidate: "
2534
+ f"{ [tip_spot .tracker .get_tip () for tip_spot in tipspots_w_tips ]} "
2536
2535
)
2537
2536
2538
- if sanity_check :
2539
- clusters_by_model .setdefault (current_model , []).append ((tip_rack , num_empty_tipspots ))
2537
+ num_empty_tipspots = len ( tip_status ) - len ( tipspots_w_tips )
2538
+ clusters_by_model .setdefault (current_model , []).append ((tip_rack , num_empty_tipspots ))
2540
2539
2541
- # Sort partially-filled tipracks by minimal fill_len
2540
+ # Sort partially-filled tipracks from most to least empty
2542
2541
for model , rack_list in clusters_by_model .items ():
2543
2542
rack_list .sort (key = lambda x : x [1 ])
2544
2543
2545
2544
# Consolidate one tip model at a time across all tip_racks of that model
2546
2545
for model , rack_list in clusters_by_model .items ():
2547
- print (f"Consolidating:\n - { ', ' .join ([rack .name for rack , num in rack_list ])} " )
2546
+ print (f"Consolidating: - { ', ' .join ([rack .name for rack , _ in rack_list ])} " )
2548
2547
2549
- all_tip_spots_list = [tip for tip_rack , _ in rack_list for tip in tip_rack .children ]
2548
+ all_tip_spots_list = [tip_spot for tip_rack , _ in rack_list for tip_spot in tip_rack .children ]
2550
2549
2551
2550
# 1: Record current tip state
2552
2551
current_tip_presence_list = [tip_spot .has_tip () for tip_spot in all_tip_spots_list ]
@@ -2555,11 +2554,7 @@ def divide_list_into_chunks(
2555
2554
total_length = len (all_tip_spots_list )
2556
2555
num_tips_per_model = sum (current_tip_presence_list )
2557
2556
2558
- target_tip_presence_list = [
2559
- # True if i < num_tips_per_model else False for i in range(total_length)
2560
- i < num_tips_per_model
2561
- for i in range (total_length )
2562
- ]
2557
+ target_tip_presence_list = [i < num_tips_per_model for i in range (total_length )]
2563
2558
2564
2559
# 3: Calculate tip_spots involved in tip movement
2565
2560
tip_movement_list = [
@@ -2572,23 +2567,24 @@ def divide_list_into_chunks(
2572
2567
tip_target_indices = [i for i , v in enumerate (tip_movement_list ) if v == - 1 ]
2573
2568
all_target_tip_spots = [all_tip_spots_list [idx ] for idx in tip_target_indices ]
2574
2569
2570
+ # Only continue if tip_racks are not already consolidated
2571
+ if len (all_target_tip_spots ) == 0 :
2572
+ print ("Tips already optimally consolidated!" )
2573
+ continue
2574
+
2575
2575
# 4: Cluster target tip_spots by BOTH parent tip_rack & x-coordinate
2576
2576
sorted_tip_spots = sorted (
2577
- all_target_tip_spots , key = lambda tip : (str ( tip .parent ) , round (tip .location .x , 3 ))
2577
+ all_target_tip_spots , key = lambda tip : (tip .parent . name , round (tip .location .x , 3 ))
2578
2578
)
2579
2579
2580
2580
target_tip_clusters_by_parent_x : Dict [Tuple [str , float ], List [TipSpot ]] = {}
2581
2581
2582
2582
for tip_spot in sorted_tip_spots :
2583
- key = (str ( tip_spot .parent ) , round (tip_spot .location .x , 3 ))
2583
+ key = (tip_spot .parent . name , round (tip_spot .location .x , 3 ))
2584
2584
if key not in target_tip_clusters_by_parent_x :
2585
2585
target_tip_clusters_by_parent_x [key ] = []
2586
2586
target_tip_clusters_by_parent_x [key ].append (tip_spot )
2587
2587
2588
- # Only continue if tip_racks are not already consolidated
2589
- if len (target_tip_clusters_by_parent_x ) > 0 :
2590
- raise ValueError (f"No channel capable of handling tips on deck: { current_tip_model } " )
2591
-
2592
2588
current_tip_model = all_origin_tip_spots [0 ].tracker .get_tip ()
2593
2589
2594
2590
# Ensure there are channels that can pick up the tip model
@@ -2600,31 +2596,29 @@ def divide_list_into_chunks(
2600
2596
]
2601
2597
)
2602
2598
2603
- # 5: Optimise speed
2604
- if num_channels_available > 0 :
2605
- # by aggregating drop columns i.e. same drop column should not be visited twice!
2606
- if num_channels_available >= 8 : # physical constraint of tip_rack's having 8 rows
2607
- merged_target_tip_clusters = merge_sublists (
2608
- target_tip_clusters_by_parent_x .values (), max_len = 8
2609
- )
2610
-
2611
- else : # by chunking drop tip_spots list into size of available channels
2612
- merged_target_tip_clusters = list (
2613
- divide_list_into_chunks (all_target_tip_spots , chunk_size = num_channels_available )
2614
- )
2599
+ # 5: Optimize speed
2600
+ if num_channels_available == 0 :
2601
+ raise ValueError (f"No channel capable of handling tips on deck: { current_tip_model } " )
2615
2602
2616
- len_transfers = len (merged_target_tip_clusters )
2603
+ # by aggregating drop columns i.e. same drop column should not be visited twice!
2604
+ if num_channels_available >= 8 : # physical constraint of tip_rack's having 8 rows
2605
+ merged_target_tip_clusters = merge_sublists (
2606
+ target_tip_clusters_by_parent_x .values (), max_len = 8
2607
+ )
2608
+ else : # by chunking drop tip_spots list into size of available channels
2609
+ merged_target_tip_clusters = list (
2610
+ divide_list_into_chunks (all_target_tip_spots , chunk_size = num_channels_available )
2611
+ )
2617
2612
2618
- # 6: Execute tip movement/consolidation
2619
- for idx , target_tip_spots in enumerate (merged_target_tip_clusters ):
2620
- print (f" - tip transfer cycle: { idx } / { len_transfers - 1 } " )
2621
- num_channels = len (target_tip_spots )
2622
- use_channels = list (range (num_channels ))
2613
+ len_transfers = len (merged_target_tip_clusters )
2623
2614
2624
- origin_tip_spots = [all_origin_tip_spots .pop (0 ) for idx in range (num_channels )]
2615
+ # 6: Execute tip movement/consolidation
2616
+ for idx , target_tip_spots in enumerate (merged_target_tip_clusters ):
2617
+ print (f" - tip transfer cycle: { idx + 1 } / { len_transfers } " )
2618
+ num_channels = len (target_tip_spots )
2619
+ use_channels = list (range (num_channels ))
2625
2620
2626
- await self . pick_up_tips ( origin_tip_spots , use_channels = use_channels )
2621
+ origin_tip_spots = [ all_origin_tip_spots . pop ( 0 ) for _ in range ( num_channels )]
2627
2622
2628
- await self .drop_tips (target_tip_spots , use_channels = use_channels )
2629
- else :
2630
- print ("Tips already optimally consolidated!" )
2623
+ await self .pick_up_tips (origin_tip_spots , use_channels = use_channels )
2624
+ await self .drop_tips (target_tip_spots , use_channels = use_channels )
0 commit comments