git » pymisc » master » tree

[master] / pfunctional.py

"""
Parallel implementation of some functional functions
Alberto Bertogli (albertito@blitiri.com.ar)
----------------------------------------------------

The functions work by partitioning the sequence passed to them, and running
the function over the items of each part in parallel. The number of threads to
use is determined by the 'workers' parameter, which defaults to 2.

Be aware that due to Python's internal threading implementation, purely CPU
bound parallel runs are probably not going to run faster than sequential runs.
Google for "GIL" (as in Global Interpreter Lock) for more information.
"""

import threading


def splicegen(seq, start, stop):
	"Returns a generator to iterate over seq[start:stop]."
	i = start
	while i < stop:
		yield seq[i]
		i += 1

def divide(seq, workers):
	"""divide(seq, workers) -> [gen1, gen2, ...]

	Divides the given sequence in 'workers' parts, and returns a list with
	generators for each parts.
	"""

	l = len(seq)
	wlen = l // workers
	r = []
	start = 0
	for i in range(workers - 1):
		stop = start + wlen
		part = splicegen(seq, start, stop)
		r.append(part)
		start = stop
	part = splicegen(seq, start, l)
	r.append(part)
	return r


class Holder (object):
	"""Holder class, which holds objects. It's used by the worker threads
	to return values."""
	def __init__(self):
		self._isset = False
		self._value = None

	def set(self, o):
		self._isset = True
		self._value = o

	def get(self):
		if not self._isset:
			raise
		return self._value


def pmap(func, seq, workers = 2):
	"""pmap(function, sequence, workers = 2) -> generator

	This is a parallel version of map(). It takes a function and a
	sequence, and returns a generator over the results.

	"""

	# a map wrapper that returns inside the holder
	def hmap(holder, f, s):
		holder.set(map(f, s))

	# split the working set
	parts = divide(seq, workers)

	# launch the threads
	threads = []
	for p in parts:
		h = Holder()
		t = threading.Thread(target = hmap, args = (h, func, p))
		t.start()
		threads.append((t, h))

	# aggregate the results
	for t, h in threads:
		t.join()
		for i in h.get():
			yield i


def pfilter(func, seq, workers = 2):
	"""filter(function or None, sequence) -> generator

	This is a parallel version of filter(). It takes a function (or None)
	and a sequence, and returns a generator over the results.
	"""
	# we use the same code structure than pmap

	def hfilter(holder, f, s):
		holder.set(filter(f, s))

	parts = divide(seq, workers)

	threads = []
	for p in parts:
		h = Holder()
		t = threading.Thread(target = hfilter, args = (h, func, p))
		t.start()
		threads.append((t, h))

	for t, h in threads:
		t.join()
		for i in h.get():
			yield i