Skip to content

Pool workers to handle incoming deliveries, ACKs etc. #906

@stebet

Description

@stebet

After reading through a few issues (#876, #874) and doing some profiling I think I've found what the underlying issue is. Would love to get input from @danielmarbach, @bollhals and @bording on this.

Creating a new IModel, currently requires creating a new ConsumerDispatcher, which creates a WorkPool to handle incoming delivers, ACKs etc. This is all backed by a Channel and a Task that reads from that channel and executes it. The problem is, every time a new IModel is created, that means a new Task is created, which requires scheduling, which doesn't happen immediately, especielly under high churn. This effectively also makes it pretty expensive to both create and dispose of an IModel instance. You can easily see this by profiling a program that does nothing more than create and dispose a lot of IModel instances in the latest versions.

I think it would be very beneficial to keep a pool of WorkPool-like worker tasks on the Connection itself, which could be grabbed in the IConsumerDispatchers and those could then be handed the Work items to execute when these take place instead of scheduling a new Task for every model. The new concurrent processing configuration by @danielmarbach might need a little extra thought to handle parallel processing of deliveries per IModel using a shared pool, but if the Semaphores are still kept on the dispatchers which are per model, and those in turn decide how many tasks in parallel to hand over to the shared worker pool, I think that should still work fine.

So basically I'm proposing something like this:

  • Connection will have a shared pool of workers (inital size = Environment.ProcessorCount?, max size = something sane) to handle incoming actions/promises.
  • Model has a dispatcher which knows how many actions/promises it can process concurrently
  • When a model has work to do (BasicDeliver, BasicAck etc.) it increments it's Semaphore, grabs a worker from the connection worker pool (or creates a new one if none is available perhaps?), and schedules the work handing over it's semaphore as well, and when the worker is done executing, the Semaphore is released, and the worker returned to the pool. That means that if a new worker was created, the pool effectively grows larger as needed as well, and unless a lot of deliveries are taking place we should seldomly need to spin up new worker task, or in the worst case, if we have emptied the pool, the task is scheduled to the ThreadPool just like it was before.

This makes sense in my head at least, and could also increase throughput for high-churn scenarios.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions