"""
adatasks - An ada-alike tasks implementation for Python
Alberto Bertogli (albertito@blitiri.com.ar)
3/Jul/2005
"""
import thread, threading
class EntryQueue:
"Queue of pending entries to a task."
def __init__(self, mutex):
self.lock = threading.Condition(mutex)
self.queue = []
def inqueue(self, entry):
for e, params in self.queue:
if e == entry:
return True
return False
def isempty(self):
if self.queue:
return False
return True
def push(self, entry, params):
self.queue.append((entry, params))
def pop(self, entry):
for e, params in self.queue:
if e == entry:
self.queue.remove((e, params))
return params
raise 'NeverGetHere!'
def popany(self):
if self.queue:
return self.queue.pop(0)
return None
# entry queue dictionary, indexes queues per task
entrydb = {}
entrydb_lock = threading.Lock()
def create_queue(name):
"Creates a queue for the task with the given name."
entrydb_lock.acquire()
if not entrydb.has_key(name):
entrydb[name] = EntryQueue(entrydb_lock)
entrydb_lock.release()
def get_queue(name):
"Gets a tasks queue."
entrydb_lock.acquire()
q = entrydb[name]
entrydb_lock.release()
return q
# list of tasks to launch
tolaunch = []
tolaunch_lock = threading.Lock()
# list of running tasks: running[task] = running_count
running = {}
running_lock = threading.Condition()
class Task:
"A task."
def __init__(self, f):
self.f = f
self.fname = f.func_name
create_queue(self.fname)
def __run(self, *params):
try:
self.f(*params)
finally:
running_lock.acquire()
running[self.fname] -= 1
running_lock.notifyAll()
running_lock.release()
def __call__(self, *params):
running_lock.acquire()
if not running.has_key(self.fname):
running[self.fname] = 1
else:
running[self.fname] += 1
running_lock.release()
thread.start_new_thread(self.__run, params)
def __getattr__(self, entry):
if entry in dir(self.f):
# return f's attribute if one exists with the given
# name
return self.f.__getattribute__(entry)
# write the entry wrapper
def wrapper(*params):
q = get_queue(self.fname)
q.lock.acquire()
q.push(entry, params)
q.lock.notifyAll()
q.lock.release()
return
# return the wrapper
return wrapper
def accept(self, entry):
q = get_queue(self.fname)
q.lock.acquire()
while not q.inqueue(entry):
q.lock.wait()
ret = q.pop(entry)
q.lock.release()
return ret
def select(self, *entries):
q = get_queue(self.fname)
q.lock.acquire()
if not entries:
r = q.popany()
while r == None:
q.lock.wait()
r = q.popany()
q.lock.release()
return r
else:
loop = True
while loop:
for e in entries:
if q.inqueue(e):
loop = False
break
else:
q.lock.wait()
r = q.pop(e)
q.lock.release()
return (e, r)
def task(f):
"Task decorator"
task = Task(f)
tolaunch_lock.acquire()
tolaunch.append(task)
tolaunch_lock.release()
return task
def launch():
"Launch pending tasks"
global tolaunch
tolaunch_lock.acquire()
for t in tolaunch:
t()
tolaunch = []
tolaunch_lock.release()
def wait_for_task(task):
"Wait for the given task to end running"
running_lock.acquire()
while not (running.has_key(task.fname) and running[task.fname] == 0):
running_lock.wait()
running_lock.release()
def wait_for_all():
"Wait for all tasks to end running"
running_lock.acquire()
while True:
for t in running.keys():
if running[t] != 0:
break
else:
break
running_lock.wait()
running_lock.release()
#
# Asynchronous functions, a simple and obvious idea which might be useful for
# some applications. If you want to see formal documentation about this, check
# this out:
# http://research.microsoft.com/Users/luca/Papers/Polyphony%20(TOPLAS).pdf
# (the paper is about much more intresting things, but mentions stuff like
# this in detail).
#
def async(f):
"""Asynchronous decorator. It makes function calls asynchronous, they
will run in a new independant thread and you can't get their return
value or wait for them; if you need any of those things use the ada
tasks, this is just too simple."""
def newf(*args, **kwargs):
thread.start_new_thread(f, args, kwargs)
return newf