Skip to content

Conversation

@Ostrzyciel
Copy link
Contributor

GitHub issue resolved: #5120

This is an issue reported ~a year ago where RDF4J would deadlock while processing federated joins with large intermediate result sets.

I found that the issue is with BatchingServiceIteration, which is used within a federated join. When the constructor for this iteration is run, it eagerly sends out HTTP requests for the join to the other endpoint. Normally, this succeeds and the iteration results are consumed. Note that the consumer of the iteration is on the same thread as the code that starts the HTTP requests.

The assumption here (I think) is that we can send all of the requests and forget about them. However, the HTTP pool has a limited size (25 by default) and after exhausting it, we are stuck with the iteration waiting for some request to finish. However, a request can only finish if the results are consumed by the parent iteration – this can’t happen, because it’s on the same thread.

I don’t think any other operator in RDF4J does eager evaluation, so this is quite understandably very broken.

To fix this, I created a secondary queue for federated requests, and then a thread that reads from this queue and sends the request to the HTTP pool. The queue is unbounded, so this may materialize the other side of the join in memory pretty aggressively. However, (1) this was the original behavior to start with, so I'm not making this worse; (2) I don't see an easy way around this without refactoring the federated join code in a major way to allow async processing. Maybe someone has a better idea.

In any case, this does work. I ran it with the repro test case (https://github.com/tkuhn/rdf4j-timeout-test) and now the queries complete without issues, even if I issue a lot of them in parallel.


PR Author Checklist (see the contributor guidelines for more details):

  • my pull request is self-contained
  • I've added tests for the changes I made
  • I've applied code formatting (you can use mvn process-resources to format from the command line)
  • I've squashed my commits where necessary
  • every commit message starts with the issue number (GH-xxxx) followed by a meaningful description of the change

};
if (queueConsumerTask != null) {
// This should never happen, but just to be sure...
throw new QueryEvaluationException("Queue consumer task already running");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably happen if the queue is closed asynchronously because of a timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may only happen if run() is invoked more than once, which should not happen, it's only called in the constructor.

Actually, run() could be made protected, but that's a separate issue.

queueConsumerTask = ForkJoinPool.commonPool().submit(queueConsumer);

// Produce batches of input bindings and send them to the consumer via the request queue
while (!isClosed() && leftIter.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to check if the current thread is interrupted? To make it easier to cancel a query that is stuck. Unless this is already handled elsewhere in such a way that we don't need to check here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interruption state is checked in requestQueue.put(...) and then also handled in JoinExecutorBase. This should be fine, I think.

// The request queue is of unbounded size, so this will not block. On the other hand,
// if the consumer is slow, we may accumulate a large number of pending requests here.
// I don't have a better solution at the moment, though.
requestQueue.put(materializedIter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question about spilling to disk. It's probably out of scope for this PR, since you are primarily fixing a bug.

Would this be a good place in the code to check for low mem and spill to disk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would, I think. If we had a huge intermediate result set, we could store the queue on disk.

requestQueue.close();

// Wait for completion of the consumer and propagate exceptions
queueConsumerTask.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we catch the ExecutionException and rethrow the nested exception?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are we injecting a particular thread pool at any point or are we relying on the common ForkJoinPool? If we are then I've experienced deadlocks with that, I believe it happens when a thread steals a task while waiting for its own task to unblock. Honestly don't quite understand what was going on, but the main gist was to not use the common ForkJoinPool when there are locks or IO/Network calls in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we catch the ExecutionException and rethrow the nested exception?

Yes, I fixed that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, are we injecting a particular thread pool at any point or are we relying on the common ForkJoinPool? If we are then I've experienced deadlocks with that, I believe it happens when a thread steals a task while waiting for its own task to unblock. Honestly don't quite understand what was going on, but the main gist was to not use the common ForkJoinPool when there are locks or IO/Network calls in the code.

Yeah, I made it use the common thread pool. What you are describing sounds like thread exhaustion due to synchronous I/O handling mixed with app logic in the same pool. Good catch, these should be separated (and really, there should be well-defined pools for non-blocking vs blocking operations).

I created a new single thread executor for this instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants