git » pymisc » commit 4d1641f

Add a paralell implementation of map and filter.

author Alberto Bertogli
2006-11-01 14:23:11 UTC
committer Alberto Bertogli
2006-11-01 14:23:11 UTC
parent 64beac82ec00b28119626692daf6f628e7dab4d9

Add a paralell implementation of map and filter.

pfunctional.py +119 -0
samples/pfunctional/sample1.py +51 -0

diff --git a/pfunctional.py b/pfunctional.py
new file mode 100644
index 0000000..c987439
--- /dev/null
+++ b/pfunctional.py
@@ -0,0 +1,119 @@
+
+"""
+Paralell implementation of some functional functions.
+
+The functions work by partitioning the sequence passed to them, and running
+the function over the items of each part in paralell. The number of threads to
+use is determined by the 'workers' parameter, which defaults to 2.
+
+
+Be aware that due to Python's internal threading implementation, purely CPU
+bound paralell runs are probably not going to run faster than sequential runs.
+Google for "GIL" (as in Global Interpreter Lock) for more information.
+"""
+
+import threading
+
+
+def splicegen(seq, start, stop):
+	"Returns a generator to iterate over seq[start:stop]."
+	i = start
+	while i < stop:
+		yield seq[i]
+		i += 1
+
+def divide(seq, workers):
+	"""divide(seq, workers) -> [gen1, gen2, ...]
+
+	Divides the given sequence in 'workers' parts, and returns a list with
+	generators for each parts.
+	"""
+
+	l = len(seq)
+	wlen = l // workers
+	r = []
+	start = 0
+	for i in range(workers - 1):
+		stop = start + wlen
+		part = splicegen(seq, start, stop)
+		r.append(part)
+		start = stop
+	part = splicegen(seq, start, l)
+	r.append(part)
+	return r
+
+
+class Holder (object):
+	"""Holder class, which holds objects. It's used by the worker threads
+	to return values."""
+	def __init__(self):
+		self._isset = False
+		self._value = None
+
+	def set(self, o):
+		self._isset = True
+		self._value = o
+
+	def get(self):
+		if not self._isset:
+			raise
+		return self._value
+
+
+def pmap(func, seq, workers = 2):
+	"""pmap(function, sequence, workers = 2) -> generator
+
+	This is a paralell version of map(). It takes a function and a
+	sequence, and returns a generator over the results.
+
+	"""
+
+	# a map wrapper that returns inside the holder
+	def hmap(holder, f, s):
+		holder.set(map(f, s))
+
+	# split the working set
+	parts = divide(seq, workers)
+
+	# launch the threads
+	threads = []
+	for p in parts:
+		h = Holder()
+		t = threading.Thread(target = hmap, args = (h, func, p))
+		t.start()
+		threads.append((t, h))
+
+	# aggregate the results
+	for t, h in threads:
+		t.join()
+		for i in h.get():
+			yield i
+
+
+def pfilter(func, seq, workers = 2):
+	"""filter(function or None, sequence) -> generator
+
+	This is a paralell version of filter(). It takes a function (or None)
+	and a sequence, and returns a generator over the results.
+	"""
+	# we use the same code structure than pmap
+
+	def hfilter(holder, f, s):
+		holder.set(filter(f, s))
+
+	parts = divide(seq, workers)
+
+	threads = []
+	for p in parts:
+		h = Holder()
+		t = threading.Thread(target = hfilter, args = (h, func, p))
+		t.start()
+		threads.append((t, h))
+
+	for t, h in threads:
+		t.join()
+		for i in h.get():
+			yield i
+
+
+
diff --git a/samples/pfunctional/sample1.py b/samples/pfunctional/sample1.py
new file mode 100644
index 0000000..7f8a7d5
--- /dev/null
+++ b/samples/pfunctional/sample1.py
@@ -0,0 +1,51 @@
+
+import sys
+import threading
+import time
+
+from pfunctional import pmap, pfilter
+
+#
+# pmap()
+#
+
+def fm(e):
+	tid = threading.currentThread()
+	print tid, 'p', e
+	time.sleep(0.1)
+	print tid, 'd', e
+	return -e
+
+
+#
+# pfilter()
+#
+
+def ff(e):
+	tid = threading.currentThread()
+	print tid, 'p', e
+	time.sleep(0.1)
+	print tid, 'd', e
+	if e > 100: return True
+	return False
+
+
+#
+# main
+#
+
+# with one thread this would take 20 seconds, with two threads, 10 seconds,
+# with 4 threads, 5 seconds
+if __name__ == '__main__':
+	if len(sys.argv) < 2 or sys.argv[1] not in ['map', 'filter']:
+		print "Use: ptest.py [map|filter]"
+		sys.exit(1)
+	arg = sys.argv[1]
+	if arg == 'map':
+		r = pmap(fm, range(200), workers = 4)
+	elif arg == 'filter':
+		r = pfilter(ff, range(200), workers = 4)
+
+	print list(r)
+
+