author | Alberto Bertogli
<albertogli@telpin.com.ar> 2006-11-01 14:23:11 UTC |
committer | Alberto Bertogli
<albertogli@telpin.com.ar> 2006-11-01 14:23:11 UTC |
parent | 64beac82ec00b28119626692daf6f628e7dab4d9 |
pfunctional.py | +119 | -0 |
samples/pfunctional/sample1.py | +51 | -0 |
diff --git a/pfunctional.py b/pfunctional.py new file mode 100644 index 0000000..c987439 --- /dev/null +++ b/pfunctional.py @@ -0,0 +1,119 @@ + +""" +Paralell implementation of some functional functions. + +The functions work by partitioning the sequence passed to them, and running +the function over the items of each part in paralell. The number of threads to +use is determined by the 'workers' parameter, which defaults to 2. + + +Be aware that due to Python's internal threading implementation, purely CPU +bound paralell runs are probably not going to run faster than sequential runs. +Google for "GIL" (as in Global Interpreter Lock) for more information. +""" + +import threading + + +def splicegen(seq, start, stop): + "Returns a generator to iterate over seq[start:stop]." + i = start + while i < stop: + yield seq[i] + i += 1 + +def divide(seq, workers): + """divide(seq, workers) -> [gen1, gen2, ...] + + Divides the given sequence in 'workers' parts, and returns a list with + generators for each parts. + """ + + l = len(seq) + wlen = l // workers + r = [] + start = 0 + for i in range(workers - 1): + stop = start + wlen + part = splicegen(seq, start, stop) + r.append(part) + start = stop + part = splicegen(seq, start, l) + r.append(part) + return r + + +class Holder (object): + """Holder class, which holds objects. It's used by the worker threads + to return values.""" + def __init__(self): + self._isset = False + self._value = None + + def set(self, o): + self._isset = True + self._value = o + + def get(self): + if not self._isset: + raise + return self._value + + +def pmap(func, seq, workers = 2): + """pmap(function, sequence, workers = 2) -> generator + + This is a paralell version of map(). It takes a function and a + sequence, and returns a generator over the results. + + """ + + # a map wrapper that returns inside the holder + def hmap(holder, f, s): + holder.set(map(f, s)) + + # split the working set + parts = divide(seq, workers) + + # launch the threads + threads = [] + for p in parts: + h = Holder() + t = threading.Thread(target = hmap, args = (h, func, p)) + t.start() + threads.append((t, h)) + + # aggregate the results + for t, h in threads: + t.join() + for i in h.get(): + yield i + + +def pfilter(func, seq, workers = 2): + """filter(function or None, sequence) -> generator + + This is a paralell version of filter(). It takes a function (or None) + and a sequence, and returns a generator over the results. + """ + # we use the same code structure than pmap + + def hfilter(holder, f, s): + holder.set(filter(f, s)) + + parts = divide(seq, workers) + + threads = [] + for p in parts: + h = Holder() + t = threading.Thread(target = hfilter, args = (h, func, p)) + t.start() + threads.append((t, h)) + + for t, h in threads: + t.join() + for i in h.get(): + yield i + + + diff --git a/samples/pfunctional/sample1.py b/samples/pfunctional/sample1.py new file mode 100644 index 0000000..7f8a7d5 --- /dev/null +++ b/samples/pfunctional/sample1.py @@ -0,0 +1,51 @@ + +import sys +import threading +import time + +from pfunctional import pmap, pfilter + +# +# pmap() +# + +def fm(e): + tid = threading.currentThread() + print tid, 'p', e + time.sleep(0.1) + print tid, 'd', e + return -e + + +# +# pfilter() +# + +def ff(e): + tid = threading.currentThread() + print tid, 'p', e + time.sleep(0.1) + print tid, 'd', e + if e > 100: return True + return False + + +# +# main +# + +# with one thread this would take 20 seconds, with two threads, 10 seconds, +# with 4 threads, 5 seconds +if __name__ == '__main__': + if len(sys.argv) < 2 or sys.argv[1] not in ['map', 'filter']: + print "Use: ptest.py [map|filter]" + sys.exit(1) + arg = sys.argv[1] + if arg == 'map': + r = pmap(fm, range(200), workers = 4) + elif arg == 'filter': + r = pfilter(ff, range(200), workers = 4) + + print list(r) + +