Skip to content

batched-parallel Updates for Occurrence Consumer #82044

@roggenkemper

Description

@roggenkemper

batched-parallel mode has performance issues because it uses a threadpool. even with rate limiting, there are still performance limitations to it in its current form

there are some ideas for how to fix this:
Add a prestep that partitions messages by fingerprint and passes them to run_task_with_multiprocessing

  • similar to
    batch_processor = RunTask(
    function=batch_write_to_redis,
    next_step=commit_step,
    )
    batch_step = BatchStep(
    max_batch_size=self.max_batch_size,
    max_batch_time=self.max_batch_time,
    next_step=batch_processor,
    )
    , we could do something along the lines of https://gist.github.com/roggenkemper/a782981eed3739d9ee1f4b36160365a4
  • BatchStep to process a batch of messages, and produce a list of batches of messages, where each sublist is a list of messages with the same fingerprint
  • Unbatch returns each of those individually, so that the multiprocessing step gets batches of messages for each fingerprint, rather than individual messages
  • First, parallel step to deserialize, then batch, then process

Adding a timer to

payload = orjson.loads(item.payload.value)
could be useful too to gain insight into performance

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions