author | Alberto Bertogli
<albertito@blitiri.com.ar> 2008-10-24 03:54:56 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2008-10-24 03:55:18 UTC |
parent | 4e9bd45a3717f348b727d5107a74c1769db66ab8 |
pickle_rpc.py | +184 | -0 |
samples/pickle_rpc/c1.py | +12 | -0 |
samples/pickle_rpc/s1.py | +22 | -0 |
diff --git a/pickle_rpc.py b/pickle_rpc.py new file mode 100644 index 0000000..eebc0bb --- /dev/null +++ b/pickle_rpc.py @@ -0,0 +1,184 @@ + +""" +pickle_rpc - RPC using pickle for serialization +Alberto Bertogli (albertito@blitiri.com.ar) +----------------------------------------------- + +This module implements a simple RPC using pickle for serialization. It +provides an interface similar (but not exactly like) XML-RPC. + +It should work under any Unix, Windows and Mac systems. + +To create an RPC server: + + srv = pickle_rpc.Server('localhost') + srv.register(my_function) + srv.loop() + +To create an RPC client (we call it "Remote" because the object represents a +"remote execution server"): + + r = pickle_rpc.Remote('localhost') + print r.my_function(1, a = 'b') + +""" + +import sys +import os +import traceback +import socket +import select +import errno + +try: + import cPickle as pickle +except: + import pickle + + +# default listening port +default_port = 1642 + + +# valid replies +class replies: + SUCCESS = 0 + UNKNOWN = 1 + EXCEPTION = 2 + + +class SockFD (object): + """File descriptor wrapper for socket objects. Implement .write(), + .read() and .readline(), which is what pickle needs to work.""" + + def __init__(self, sock): + self.sock = sock + self.fd = os.fdopen(sock.fileno()) + + def write(self, s): + self.sock.send(s) + + def read(self, size = -1): + if size == -1: + return self.fd.read() + return self.fd.read(size) + + def readline(self, size = -1): + return self.fd.readline(size) + + def fileno(self): + return self.sock.fileno() + + def close(self): + self.sock = None + self.fd.close() + self.fd = None + + +class Server (object): + "Pickle-RPC server" + def __init__(self, ip, port = default_port): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.bind((ip, port)) + self.fds = [] + self.functions = {} + + def register(self, func, name = None): + if not name: + name = func.__name__ + self.functions[name] = func + + def loop(self): + self.sock.listen(15) + while True: + l = [self.sock] + self.fds + ifd, ofd, efd = select.select(l, [], []) + + for fd in ifd: + if fd == self.sock: + self.new_connection() + else: + self.recv(fd) + + def new_connection(self): + conn, addr = self.sock.accept() + self.fds.append(SockFD(conn)) + + def end_connection(self, fd): + if fd in self.fds: + self.fds.remove(fd) + try: + fd.close() + except IOError: + pass + + def recv(self, fd): + try: + req = pickle.load(fd) + if not isinstance(req, tuple) and len(req) < 1: + # invalid request, break the connection + raise socket.error + except (socket.error, EOFError), info: + self.end_connection(fd) + return + + funcname = req[0] + params = req[1] + kwparams = req[2] + if funcname not in self.functions: + self.send(fd, (replies.UNKNOWN,)) + return + + try: + r = self.functions[funcname](*params, **kwparams) + except: + e, i, tb = sys.exc_info() + strtb = traceback.format_exc() + self.send(fd, (replies.EXCEPTION, e, i, strtb)) + return + + self.send(fd, (replies.SUCCESS, r)) + + def send(self, fd, r): + try: + pickle.dump(r, fd, -1) + except (socket.error, EOFError), info: + self.end_connection(fd) + + +class UnknownFunction (Exception): + pass + +class Remote (object): + "Pickle-RPC client" + def __init__(self, ip, port = default_port): + self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.__sock.connect((ip, port)) + self.__fd = SockFD(self.__sock) + self.__last_tb_str = '' + + def _last_tb(self): + """Returns a string showing the server's last traceback, + useful for debugging.""" + return self.__last_tb_str + + def __getattr__(self, name): + def stub_func(*args, **kwargs): + return self.__rpc(name, args, kwargs) + + return stub_func + + def __rpc(self, name, args, kwargs): + msg = (name, args, kwargs) + pickle.dump(msg, self.__fd, -1) + rep = pickle.load(self.__fd) + if rep[0] == replies.SUCCESS: + return rep[1] + elif rep[0] == replies.EXCEPTION: + e, i, strtb = rep[1:] + self.__last_tb_str = strtb + raise e, i + elif rep[0] == replies.UNKNOWN: + raise UnknownFunction, name + + diff --git a/samples/pickle_rpc/c1.py b/samples/pickle_rpc/c1.py new file mode 100644 index 0000000..8d471b2 --- /dev/null +++ b/samples/pickle_rpc/c1.py @@ -0,0 +1,12 @@ + +import pickle_rpc +cli = pickle_rpc.Remote('localhost') + +print cli.f(1, a = 2) +print len(cli.long(10000)) + +try: + cli.exc() +except KeyError, info: + print 'successfuly caught', KeyError, info + diff --git a/samples/pickle_rpc/s1.py b/samples/pickle_rpc/s1.py new file mode 100644 index 0000000..c703071 --- /dev/null +++ b/samples/pickle_rpc/s1.py @@ -0,0 +1,22 @@ + +import pickle_rpc + +def f(*args, **kwargs): + print 'F!', args, kwargs + return ('Go!', args, kwargs) + +def exc(): + raise KeyError, 'No no no' + +def long(n): + l = [] + for i in range(n): + l.append((i, i + 1, i + 2, i + 3)) + return l + +srv = pickle_rpc.Server('localhost') +srv.register(f) +srv.register(exc) +srv.register(long) +srv.loop() +