Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add multiprocessing queue shutdown
* Include docs
  • Loading branch information
EpicWink committed May 6, 2023
commit 75f6067e28c27780294eb08e1dbe1226ff9b6c8f
24 changes: 22 additions & 2 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ For an example of the usage of queues for interprocess communication see
free slot was available within that time. Otherwise (*block* is
``False``), put an item on the queue if a free slot is immediately
available, else raise the :exc:`queue.Full` exception (*timeout* is
ignored in that case).
ignored in that case). Raises :exc:`ShutDown` if the queue has been shut
down.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -863,7 +864,9 @@ For an example of the usage of queues for interprocess communication see
it blocks at most *timeout* seconds and raises the :exc:`queue.Empty`
exception if no item was available within that time. Otherwise (block is
``False``), return an item if one is immediately available, else raise the
:exc:`queue.Empty` exception (*timeout* is ignored in that case).
:exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises
:exc:`queue.ShutDown` if the queue has been shut down and is empty, or if
the queue has been shut down immediately.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -873,6 +876,19 @@ For an example of the usage of queues for interprocess communication see

Equivalent to ``get(False)``.

.. method:: shutdown(immediate=False)

Shut-down the queue, making queue gets and puts raise
:exc:`queue.ShutDown`.

By default, gets will only raise once the queue is empty. Set
*immediate* to true to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.12

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
code:
Expand Down Expand Up @@ -962,6 +978,8 @@ For an example of the usage of queues for interprocess communication see
Raises a :exc:`ValueError` if called more times than there were items
placed in the queue.

Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.


.. method:: join()

Expand All @@ -973,6 +991,8 @@ For an example of the usage of queues for interprocess communication see
it is complete. When the count of unfinished tasks drops to zero,
:meth:`~queue.Queue.join` unblocks.

Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.


Miscellaneous
~~~~~~~~~~~~~
Expand Down
26 changes: 25 additions & 1 deletion Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import types
import weakref
import errno
import ctypes

from queue import Empty, Full
from queue import Empty, Full, ShutDown

import _multiprocessing

Expand All @@ -28,6 +29,10 @@

from .util import debug, info, Finalize, register_after_fork, is_exiting

_queue_alive = 0
_queue_shutdown = 1
_queue_shutdown_immediate = 2

#
# Queue type using a pipe, buffer and thread
#
Expand All @@ -50,6 +55,9 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._shutdown_state = context._default_context.Value(
ctypes.c_uint8, lock=self._rlock
)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)
Expand Down Expand Up @@ -86,20 +94,28 @@ def _reset(self, after_fork=False):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if not self._sem.acquire(block, timeout):
raise Full

with self._notempty:
if self._shutdown_state.value != _queue_alive:
raise ShutDown
if self._thread is None:
self._start_thread()
self._buffer.append(obj)
self._notempty.notify()

def get(self, block=True, timeout=None):
if self._shutdown_state.value == _queue_shutdown_immediate:
raise ShutDown
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
if self._shutdown_state.value != _queue_alive:
raise ShutDown
res = self._recv_bytes()
self._sem.release()
else:
Expand All @@ -111,13 +127,19 @@ def get(self, block=True, timeout=None):
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if self._shutdown_state.value != _queue_alive:
raise ShutDown
raise Empty
if self._shutdown_state.value != _queue_alive :
raise ShutDown
elif not self._poll():
raise Empty
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()
if self._shutdown_state.value == _queue_shutdown:
raise ShutDown
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand Down Expand Up @@ -329,6 +351,8 @@ def task_done(self):

def join(self):
with self._cond:
if self._shutdown_state.value == _queue_shutdown_immediate:
return
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()

Expand Down
35 changes: 35 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,41 @@ def test_closed_queue_put_get_exceptions(self):
q.put('foo')
with self.assertRaisesRegex(ValueError, 'is closed'):
q.get()

def test_shutdown_empty(self):
q = multiprocessing.Queue()
q.shutdown()
try:
q.put("data")
self.fail("Didn't appear to shut-down queue")
except pyqueue.ShutDown:
pass
try:
q.get()
self.fail("Didn't appear to shut-down queue")
except pyqueue.ShutDown:
pass

def test_shutdown_nonempty(self):
q = multiprocessing.Queue()
q.put("data")
q.shutdown()
q.get()
try:
q.get()
self.fail("Didn't appear to shut-down queue")
except pyqueue.ShutDown:
pass

def test_shutdown_immediate(self):
q = multiprocessing.Queue()
q.put("data")
q.shutdown(immediate=True)
try:
q.get()
self.fail("Didn't appear to shut-down queue")
except pyqueue.ShutDown:
pass
#
#
#
Expand Down