cpython/Doc/includes/mp_pool.py
Georg Brandl 7baf625bab Merged revisions 74613,74615,74619-74620,74622 via svnmerge from
svn+ssh://svn.python.org/python/branches/py3k

................
  r74613 | georg.brandl | 2009-09-01 09:34:27 +0200 (Di, 01 Sep 2009) | 1 line

  #6814: remove traces of xrange().
................
  r74615 | georg.brandl | 2009-09-01 09:42:40 +0200 (Di, 01 Sep 2009) | 9 lines

  Recorded merge of revisions 74614 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r74614 | georg.brandl | 2009-09-01 09:40:54 +0200 (Di, 01 Sep 2009) | 1 line

    #6813: better documentation for numberless string formats.
  ........
................
  r74619 | georg.brandl | 2009-09-01 10:02:03 +0200 (Di, 01 Sep 2009) | 1 line

  #6754: remove old struct member nb_inplace_divide.
................
  r74620 | georg.brandl | 2009-09-01 10:03:26 +0200 (Di, 01 Sep 2009) | 1 line

  #6732: fix return value of module init function in example.
................
  r74622 | georg.brandl | 2009-09-01 10:11:14 +0200 (Di, 01 Sep 2009) | 73 lines

  Merged revisions 74542,74544-74548,74550,74554-74555,74578,74588,74590,74603,74616-74618,74621 via svnmerge from
  svn+ssh://pythondev@svn.python.org/python/trunk

  ........
    r74542 | georg.brandl | 2009-08-23 23:28:56 +0200 (So, 23 Aug 2009) | 1 line

    Restore alphabetic order.
  ........
    r74544 | georg.brandl | 2009-08-24 19:12:30 +0200 (Mo, 24 Aug 2009) | 1 line

    #6775: fix python.org URLs in README.
  ........
    r74545 | georg.brandl | 2009-08-24 19:14:29 +0200 (Mo, 24 Aug 2009) | 1 line

    #6772: mention utf-8 as utf8 alias.
  ........
    r74546 | georg.brandl | 2009-08-24 19:20:40 +0200 (Mo, 24 Aug 2009) | 1 line

    #6725: spell "namespace" consistently.
  ........
    r74547 | georg.brandl | 2009-08-24 19:22:05 +0200 (Mo, 24 Aug 2009) | 1 line

    #6718: fix example.
  ........
    r74548 | georg.brandl | 2009-08-24 19:24:27 +0200 (Mo, 24 Aug 2009) | 1 line

    #6677: mention "deleting" as an alias for removing files.
  ........
    r74550 | georg.brandl | 2009-08-24 19:48:40 +0200 (Mo, 24 Aug 2009) | 1 line

    #6677: note that rmdir only removes empty directories.
  ........
    r74554 | georg.brandl | 2009-08-27 20:59:02 +0200 (Do, 27 Aug 2009) | 1 line

    Typo fix.
  ........
    r74555 | georg.brandl | 2009-08-27 21:02:43 +0200 (Do, 27 Aug 2009) | 1 line

    #6787: reference fix.
  ........
    r74578 | tarek.ziade | 2009-08-29 15:33:21 +0200 (Sa, 29 Aug 2009) | 1 line

    fixed #6801: symmetric_difference_update also accepts pipe
  ........
    r74588 | georg.brandl | 2009-08-30 10:35:01 +0200 (So, 30 Aug 2009) | 1 line

    #6803: fix old name.
  ........
    r74590 | georg.brandl | 2009-08-30 13:51:53 +0200 (So, 30 Aug 2009) | 1 line

    #6801: fix copy-paste oversight.
  ........
    r74603 | georg.brandl | 2009-08-31 08:38:29 +0200 (Mo, 31 Aug 2009) | 1 line

    other -> others where multiple arguments are accepted.
  ........
    r74616 | georg.brandl | 2009-09-01 09:46:26 +0200 (Di, 01 Sep 2009) | 1 line

    #6808: clarification.
  ........
    r74617 | georg.brandl | 2009-09-01 09:53:37 +0200 (Di, 01 Sep 2009) | 1 line

    #6765: hint that log(x, base) is not very sophisticated.
  ........
    r74618 | georg.brandl | 2009-09-01 10:00:47 +0200 (Di, 01 Sep 2009) | 1 line

    #6810: add a link to the section about frame objects instead of just a description where to find it.
  ........
    r74621 | georg.brandl | 2009-09-01 10:06:03 +0200 (Di, 01 Sep 2009) | 1 line

    #6638: fix wrong parameter name and markup a class.
  ........
................
2009-09-01 08:13:16 +00:00

315 lines
7.0 KiB
Python

#
# A test of `multiprocessing.Pool` class
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
def f(x):
return 1.0 / (x-5.0)
def pow3(x):
return x**3
def noop(x):
pass
#
# Test code
#
def test():
print('cpu_count() = %d\n' % multiprocessing.cpu_count())
#
# Create pool
#
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
pool = multiprocessing.Pool(PROCESSES)
print('pool = %s' % pool)
print()
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Simple benchmarks
#
N = 100000
print('def pow3(x): return x**3')
t = time.time()
A = list(map(pow3, range(N)))
print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
(N, time.time() - t))
t = time.time()
B = pool.map(pow3, range(N))
print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
(N, time.time() - t))
t = time.time()
C = list(pool.imap(pow3, range(N), chunksize=N//8))
print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
' seconds' % (N, N//8, time.time() - t))
assert A == B == C, (len(A), len(B), len(C))
print()
L = [None] * 1000000
print('def noop(x): pass')
print('L = [None] * 1000000')
t = time.time()
A = list(map(noop, L))
print('\tmap(noop, L):\n\t\t%s seconds' % \
(time.time() - t))
t = time.time()
B = pool.map(noop, L)
print('\tpool.map(noop, L):\n\t\t%s seconds' % \
(time.time() - t))
t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
(len(L)//8, time.time() - t))
assert A == B == C, (len(A), len(B), len(C))
print()
del A, B, C, L
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
#
# Testing callback
#
print('Testing callback:')
A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()
r = pool.map_async(pow3, list(range(10)), callback=A.extend)
r.wait()
if A == B:
print('\tcallbacks succeeded\n')
else:
print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
#
# Check there are no outstanding tasks
#
assert not pool._cache, 'cache = %r' % pool._cache
#
# Check close() methods
#
print('Testing close():')
for worker in pool._pool:
assert worker.is_alive()
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()
assert result.get() is None
for worker in pool._pool:
assert not worker.is_alive()
print('\tclose() succeeded\n')
#
# Check terminate() method
#
print('Testing terminate():')
pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
pool.terminate()
pool.join()
for worker in pool._pool:
assert not worker.is_alive()
print('\tterminate() succeeded\n')
#
# Check garbage collection
#
print('Testing garbage collection:')
pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
results = pool = None
time.sleep(DELTA * 2)
for worker in processes:
assert not worker.is_alive()
print('\tgarbage collection succeeded\n')
if __name__ == '__main__':
multiprocessing.freeze_support()
assert len(sys.argv) in (1, 2)
if len(sys.argv) == 1 or sys.argv[1] == 'processes':
print(' Using processes '.center(79, '-'))
elif sys.argv[1] == 'threads':
print(' Using threads '.center(79, '-'))
import multiprocessing.dummy as multiprocessing
else:
print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
raise SystemExit(2)
test()