From c8ce715a82fd8034ef1d809b262346c15f2490c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Fran=C3=A7ois=20Natali?= Date: Tue, 17 Apr 2012 18:45:57 +0200 Subject: [PATCH] Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt. --- Doc/library/multiprocessing.rst | 6 +++ Lib/multiprocessing/managers.py | 19 +++++++++ Lib/multiprocessing/synchronize.py | 19 +++++++++ Lib/test/test_multiprocessing.py | 67 ++++++++++++++++++++++++++++++ Misc/NEWS | 2 + 5 files changed, 113 insertions(+) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 1c7b9b95b95..b9dfd19b316 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -897,6 +897,9 @@ object -- see :ref:`multiprocessing-managers`. If *lock* is specified then it should be a :class:`Lock` or :class:`RLock` object from :mod:`multiprocessing`. + .. versionchanged:: 3.3 + The :meth:`wait_for` method was added. + .. class:: Event() A clone of :class:`threading.Event`. @@ -1281,6 +1284,9 @@ their parent process exits. The manager classes are defined in the If *lock* is supplied then it should be a proxy for a :class:`threading.Lock` or :class:`threading.RLock` object. + .. versionchanged:: 3.3 + The :meth:`wait_for` method was added. + .. method:: Event() Create a shared :class:`threading.Event` object and return a proxy for it. diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index eaf912c1245..d1c9d4578ea 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -48,6 +48,7 @@ from traceback import format_exc from multiprocessing import Process, current_process, active_children, Pool, util, connection from multiprocessing.process import AuthenticationString from multiprocessing.forking import exit, Popen, ForkingPickler +from time import time as _time # # Register some things for pickling @@ -996,6 +997,24 @@ class ConditionProxy(AcquirerProxy): return self._callmethod('notify') def notify_all(self): return self._callmethod('notify_all') + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + class EventProxy(BaseProxy): _exposed_ = ('is_set', 'set', 'clear', 'wait') diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py index e35bbff185c..532ac5c1dd2 100644 --- a/Lib/multiprocessing/synchronize.py +++ b/Lib/multiprocessing/synchronize.py @@ -43,6 +43,7 @@ import _multiprocessing from multiprocessing.process import current_process from multiprocessing.util import register_after_fork, debug from multiprocessing.forking import assert_spawning, Popen +from time import time as _time # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. @@ -290,6 +291,24 @@ class Condition(object): while self._wait_semaphore.acquire(False): pass + def wait_for(self, predicate, timeout=None): + result = predicate() + if result: + return result + if timeout is not None: + endtime = _time() + timeout + else: + endtime = None + waittime = None + while not result: + if endtime is not None: + waittime = endtime - _time() + if waittime <= 0: + break + self.wait(waittime) + result = predicate() + return result + # # Event # diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/test_multiprocessing.py index 2bcdb4e07c0..bbde366e5db 100644 --- a/Lib/test/test_multiprocessing.py +++ b/Lib/test/test_multiprocessing.py @@ -887,6 +887,73 @@ class _TestCondition(BaseTestCase): self.assertEqual(res, False) self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) + @classmethod + def _test_waitfor_f(cls, cond, state): + with cond: + state.value = 0 + cond.notify() + result = cond.wait_for(lambda : state.value==4) + if not result or state.value != 4: + sys.exit(1) + + @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') + def test_waitfor(self): + # based on test in test/lock_tests.py + cond = self.Condition() + state = self.Value('i', -1) + + p = self.Process(target=self._test_waitfor_f, args=(cond, state)) + p.daemon = True + p.start() + + with cond: + result = cond.wait_for(lambda : state.value==0) + self.assertTrue(result) + self.assertEqual(state.value, 0) + + for i in range(4): + time.sleep(0.01) + with cond: + state.value += 1 + cond.notify() + + p.join(5) + self.assertFalse(p.is_alive()) + self.assertEqual(p.exitcode, 0) + + @classmethod + def _test_waitfor_timeout_f(cls, cond, state, success): + with cond: + expected = 0.1 + dt = time.time() + result = cond.wait_for(lambda : state.value==4, timeout=expected) + dt = time.time() - dt + # borrow logic in assertTimeout() from test/lock_tests.py + if not result and expected * 0.6 < dt < expected * 10.0: + success.value = True + + @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') + def test_waitfor_timeout(self): + # based on test in test/lock_tests.py + cond = self.Condition() + state = self.Value('i', 0) + success = self.Value('i', False) + + p = self.Process(target=self._test_waitfor_timeout_f, + args=(cond, state, success)) + p.daemon = True + p.start() + + # Only increment 3 times, so state == 4 is never reached. + for i in range(3): + time.sleep(0.01) + with cond: + state.value += 1 + cond.notify() + + p.join(5) + self.assertTrue(success.value) + class _TestEvent(BaseTestCase): diff --git a/Misc/NEWS b/Misc/NEWS index 54da24dac11..f1837e732cf 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -39,6 +39,8 @@ Core and Builtins Library ------- +- Issue #14087: multiprocessing: add Condition.wait_for(). Patch by sbt. + - Issue #14452: SysLogHandler no longer inserts a UTF-8 BOM into the message. - Issue #14386: Expose the dict_proxy internal type as types.MappingProxyType.