Skip to content

error handling: option 3 (PoC) #542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

error handling: option 3 (PoC) #542

wants to merge 5 commits into from

Conversation

rickwierenga
Copy link
Member

we are still thinking about implementing a general purpose error handler. see the thread here. See one possible option implemented partially in #507.

In this PR i implemented option 3 of that thread: an error_handler argument for each method in every front end.

a very similar example of usage as before in #507:

from pylabrobot.liquid_handling.errors import ChannelizedError

# Simple test backend based on LiquidHandlerChatterboxBackend
# that will raise an error for tip spot A1
from pylabrobot.liquid_handling.backends.chatterbox import LiquidHandlerChatterboxBackend
class AnnoyBackend(LiquidHandlerChatterboxBackend):
  async def pick_up_tips(self, ops, use_channels, **backend_kwargs):
    if ops[0].resource.get_identifier() == "A1":
      raise ChannelizedError(
        {0: "A1 is not supported"},
      )
    await super().pick_up_tips(ops, use_channels, **backend_kwargs)

# LH setup
from pylabrobot.liquid_handling import LiquidHandler
from pylabrobot.resources.hamilton import STARDeck
backend = AnnoyBackend()
lh = LiquidHandler(backend=backend, deck=STARDeck())
await lh.setup()

# setting up deck
from pylabrobot.resources import TIP_CAR_480_A00, HTF
tip_car = TIP_CAR_480_A00("tip_car")
tip_car[0] = tip_rack = HTF("tips")
lh.deck.assign_child_resource(tip_car, rails=1)

# - NEW -
# error handling: pass using the new `error_handler` keyword argument:
from pylabrobot.liquid_handling.error_handlers import try_next_tip_spot
error_handler = try_next_tip_spot(iter(tip_rack["B1:H1"]))
await lh.pick_up_tips(tip_rack["A1"], error_handler=error_handler)

let's discuss whether this is a good option in the error handling thread on the forum. specific implementation details are to be discussed after we decide what the api should look like

@rickwierenga
Copy link
Member Author

rickwierenga commented May 30, 2025

example of choose_handler, extending the example from above:

# updated AnnoyBackend:
class AnnoyBackend(LiquidHandlerChatterboxBackend):
  async def pick_up_tips(self, ops, use_channels, **backend_kwargs):
    print("called with", ops, use_channels)
    if 1 in use_channels:
      raise ValueError("Channel 1 is not available")
    if ops[0].resource.get_identifier() == "A1":
      raise ChannelizedError(
        {0: "A1 is not supported"},
      )
    await super().pick_up_tips(ops, use_channels, **backend_kwargs)

############################

# custom error handler
async def different_channel(func, error, **kwargs):
  kwargs["use_channels"] = [0]
  return await func(**kwargs) 

# setting an error handler on LH and picking up a tip from A1
from pylabrobot.error_handling import choose_handler
from pylabrobot.liquid_handling.error_handlers import try_next_tip_spot
error_handler = choose_handler(
  {
    ValueError: different_channel,
    ChannelizedError: try_next_tip_spot(iter(tip_rack["B1:H1"])),
  }
)
await lh.pick_up_tips(tip_rack["A1"], use_channels=[1], error_handler=error_handler)

@rickwierenga
Copy link
Member Author

rickwierenga commented Jun 3, 2025

added until_success and basic_retry_handler, which can be used to simply retry a call with the exact same arguments and run a function until there's success (with an optional max_tries).

Here's an example:

# updated AnnoyBackend:
class AnnoyBackend(LiquidHandlerChatterboxBackend):
  def __init__(self):
    super().__init__()
    self.n = 0
  async def pick_up_tips(self, ops, use_channels, **backend_kwargs):
    self.n += 1
    if self.n < 4:
      raise ValueError("n is too low: {n}".format(n=self.n))
    return await super().pick_up_tips(ops, use_channels, **backend_kwargs)

############################

# use until_success with basic_retry_handler
from pylabrobot.error_handling import until_success, basic_retry_handler
error_handler = until_success(basic_retry_handler, max_tries=2)
await lh.pick_up_tips(tip_rack["A1"], use_channels=[1], error_handler=error_handler)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant