git » pymisc » master » tree

[master] / adatasks.py

"""
adatasks - An ada-alike tasks implementation for Python
Alberto Bertogli (albertito@blitiri.com.ar)
3/Jul/2005
"""

import thread, threading

class EntryQueue:
	"Queue of pending entries to a task."

	def __init__(self, mutex):
		self.lock = threading.Condition(mutex)
		self.queue = []

	def inqueue(self, entry):
		for e, params in self.queue:
			if e == entry:
				return True
		return False

	def isempty(self):
		if self.queue:
			return False
		return True

	def push(self, entry, params):
		self.queue.append((entry, params))

	def pop(self, entry):
		for e, params in self.queue:
			if e == entry:
				self.queue.remove((e, params))
				return params
		raise 'NeverGetHere!'

	def popany(self):
		if self.queue:
			return self.queue.pop(0)
		return None


# entry queue dictionary, indexes queues per task
entrydb = {}
entrydb_lock = threading.Lock()

def create_queue(name):
	"Creates a queue for the task with the given name."
	entrydb_lock.acquire()
	if not entrydb.has_key(name):
		entrydb[name] = EntryQueue(entrydb_lock)
	entrydb_lock.release()

def get_queue(name):
	"Gets a tasks queue."
	entrydb_lock.acquire()
	q = entrydb[name]
	entrydb_lock.release()
	return q


# list of tasks to launch
tolaunch = []
tolaunch_lock = threading.Lock()

# list of running tasks: running[task] = running_count
running = {}
running_lock = threading.Condition()


class Task:
	"A task."

	def __init__(self, f):
		self.f = f
		self.fname = f.func_name
		create_queue(self.fname)

	def __run(self, *params):
		try:
			self.f(*params)
		finally:
			running_lock.acquire()
			running[self.fname] -= 1
			running_lock.notifyAll()
			running_lock.release()

	def __call__(self, *params):
		running_lock.acquire()
		if not running.has_key(self.fname):
			running[self.fname] = 1
		else:
			running[self.fname] += 1
		running_lock.release()

		thread.start_new_thread(self.__run, params)

	def __getattr__(self, entry):
		if entry in dir(self.f):
			# return f's attribute if one exists with the given
			# name
			return self.f.__getattribute__(entry)

		# write the entry wrapper
		def wrapper(*params):
			q = get_queue(self.fname)
			q.lock.acquire()
			q.push(entry, params)
			q.lock.notifyAll()
			q.lock.release()
			return

		# return the wrapper
		return wrapper

	def accept(self, entry):
		q = get_queue(self.fname)
		q.lock.acquire()
		while not q.inqueue(entry):
			q.lock.wait()
		ret = q.pop(entry)
		q.lock.release()
		return ret

	def select(self, *entries):
		q = get_queue(self.fname)

		q.lock.acquire()
		if not entries:
			r = q.popany()
			while r == None:
				q.lock.wait()
				r = q.popany()
			q.lock.release()
			return r
		else:
			loop = True
			while loop:
				for e in entries:
					if q.inqueue(e):
						loop = False
						break
				else:
					q.lock.wait()
			r = q.pop(e)
			q.lock.release()
			return (e, r)


def task(f):
	"Task decorator"
	task = Task(f)
	tolaunch_lock.acquire()
	tolaunch.append(task)
	tolaunch_lock.release()
	return task


def launch():
	"Launch pending tasks"
	global tolaunch
	tolaunch_lock.acquire()
	for t in tolaunch:
		t()
	tolaunch = []
	tolaunch_lock.release()


def wait_for_task(task):
	"Wait for the given task to end running"
	running_lock.acquire()
	while not (running.has_key(task.fname) and running[task.fname] == 0):
		running_lock.wait()
	running_lock.release()


def wait_for_all():
	"Wait for all tasks to end running"
	running_lock.acquire()
	while True:
		for t in running.keys():
			if running[t] != 0:
				break
		else:
			break
		running_lock.wait()
	running_lock.release()



#
# Asynchronous functions, a simple and obvious idea which might be useful for
# some applications. If you want to see formal documentation about this, check
# this out:
# http://research.microsoft.com/Users/luca/Papers/Polyphony%20(TOPLAS).pdf
# (the paper is about much more intresting things, but mentions stuff like
# this in detail).
#
def async(f):
	"""Asynchronous decorator. It makes function calls asynchronous, they
	will run in a new independant thread and you can't get their return
	value or wait for them; if you need any of those things use the ada
	tasks, this is just too simple."""
	def newf(*args, **kwargs):
		thread.start_new_thread(f, args, kwargs)
	return newf