"""
Parallel implementation of some functional functions
Alberto Bertogli (albertito@blitiri.com.ar)
----------------------------------------------------
The functions work by partitioning the sequence passed to them, and running
the function over the items of each part in parallel. The number of threads to
use is determined by the 'workers' parameter, which defaults to 2.
Be aware that due to Python's internal threading implementation, purely CPU
bound parallel runs are probably not going to run faster than sequential runs.
Google for "GIL" (as in Global Interpreter Lock) for more information.
"""
import threading
def splicegen(seq, start, stop):
"Returns a generator to iterate over seq[start:stop]."
i = start
while i < stop:
yield seq[i]
i += 1
def divide(seq, workers):
"""divide(seq, workers) -> [gen1, gen2, ...]
Divides the given sequence in 'workers' parts, and returns a list with
generators for each parts.
"""
l = len(seq)
wlen = l // workers
r = []
start = 0
for i in range(workers - 1):
stop = start + wlen
part = splicegen(seq, start, stop)
r.append(part)
start = stop
part = splicegen(seq, start, l)
r.append(part)
return r
class Holder (object):
"""Holder class, which holds objects. It's used by the worker threads
to return values."""
def __init__(self):
self._isset = False
self._value = None
def set(self, o):
self._isset = True
self._value = o
def get(self):
if not self._isset:
raise
return self._value
def pmap(func, seq, workers = 2):
"""pmap(function, sequence, workers = 2) -> generator
This is a parallel version of map(). It takes a function and a
sequence, and returns a generator over the results.
"""
# a map wrapper that returns inside the holder
def hmap(holder, f, s):
holder.set(map(f, s))
# split the working set
parts = divide(seq, workers)
# launch the threads
threads = []
for p in parts:
h = Holder()
t = threading.Thread(target = hmap, args = (h, func, p))
t.start()
threads.append((t, h))
# aggregate the results
for t, h in threads:
t.join()
for i in h.get():
yield i
def pfilter(func, seq, workers = 2):
"""filter(function or None, sequence) -> generator
This is a parallel version of filter(). It takes a function (or None)
and a sequence, and returns a generator over the results.
"""
# we use the same code structure than pmap
def hfilter(holder, f, s):
holder.set(filter(f, s))
parts = divide(seq, workers)
threads = []
for p in parts:
h = Holder()
t = threading.Thread(target = hfilter, args = (h, func, p))
t.start()
threads.append((t, h))
for t, h in threads:
t.join()
for i in h.get():
yield i