Open
Description
Bug report
Bug description:
I am running a small processor that handles large tasks. When a task encounters an exception I want to store all errors (including other tasks errors) and cancel any other queued work.
However, it seems like the ProcessPoolExecutor's _ExecutorManagerThread
does not notify any waiting threads of the future cancelations: making the idiom as_completed
hang indefinitely.
Minimal Reproducable Example
import concurrent.futures
import time
def task(n: int) -> int:
if n == 2:
raise Exception("Not gonna do it")
else:
time.sleep(0.1)
return n
def main() -> None:
with concurrent.futures.ProcessPoolExecutor(
max_workers=2,
) as executor:
futures = [executor.submit(task, i) for i in range(1, 16)]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
except Exception as e:
print(f"Exception: {e}")
executor.shutdown(wait=False, cancel_futures=True)
continue
print(f"Result: {result}")
# Look for 'CANCELLED' here:
print("Other futures states:", [f._state for f in futures])
if __name__ == "__main__":
main()
Workaround
A workaround is to break after the first future that completed with a failure and do a post hoc gathering of other exceptions of non-cancelled futures.
Suggested Fix
In
cpython/Lib/concurrent/futures/process.py
Lines 520 to 522 in a68ddea
One could replace it with:
for work_id, work_item in self.pending_work_items.items():
canceled = work_item.future.cancel()
if canceled:
work_item.future.set_running_or_notify_cancel()
else:
new_pending_work_items[work_id] = work_item
Similar like:
CPython versions tested on:
3.13
Operating systems tested on:
macOS