When you want to shutdown multiprocessor pool, I find this way the best. I tried cancel_futures, and so on, but they all suffers some weird behaviors due to processes terminated forcefully.
Instead, we let all workers finish gracefully
First, define a stop latch:
from threading import ValueProxy
class StopLatch:
def __init__(self, value: ValueProxy):
self._value = value
def stop(self):
self._value.set(1)
def should_stop(self) -> bool:
return self._value.get() == 1
When submitting task, pass it:
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor
with Manager() as manager:
value = manager.Value("i", 0)
stop_latch = StopLatch(value)
with ProcessPoolExecutor(
max_workers=max_workers, initializer=get_process_initializer()
) as executor:
...
executor.submit(..., stop_latch=stop_latch)
...
In your busy worker function, check the stop latch:
for ....
if stop_latch.should_stop():
raise RuntimeError()
That’s it!