git » pymisc » commit 98d9167

Add a pickle_rpc module

author Alberto Bertogli
2008-10-24 03:54:56 UTC
committer Alberto Bertogli
2008-10-24 03:55:18 UTC
parent 4e9bd45a3717f348b727d5107a74c1769db66ab8

Add a pickle_rpc module

Signed-off-by: Alberto Bertogli <albertito@blitiri.com.ar>

pickle_rpc.py +184 -0
samples/pickle_rpc/c1.py +12 -0
samples/pickle_rpc/s1.py +22 -0

diff --git a/pickle_rpc.py b/pickle_rpc.py
new file mode 100644
index 0000000..eebc0bb
--- /dev/null
+++ b/pickle_rpc.py
@@ -0,0 +1,184 @@
+
+"""
+pickle_rpc - RPC using pickle for serialization
+Alberto Bertogli (albertito@blitiri.com.ar)
+-----------------------------------------------
+
+This module implements a simple RPC using pickle for serialization. It
+provides an interface similar (but not exactly like) XML-RPC.
+
+It should work under any Unix, Windows and Mac systems.
+
+To create an RPC server:
+
+	srv = pickle_rpc.Server('localhost')
+	srv.register(my_function)
+	srv.loop()
+
+To create an RPC client (we call it "Remote" because the object represents a
+"remote execution server"):
+
+	r = pickle_rpc.Remote('localhost')
+	print r.my_function(1, a = 'b')
+
+"""
+
+import sys
+import os
+import traceback
+import socket
+import select
+import errno
+
+try:
+	import cPickle as pickle
+except:
+	import pickle
+
+
+# default listening port
+default_port = 1642
+
+
+# valid replies
+class replies:
+	SUCCESS = 0
+	UNKNOWN = 1
+	EXCEPTION = 2
+
+
+class SockFD (object):
+	"""File descriptor wrapper for socket objects. Implement .write(),
+	.read() and .readline(), which is what pickle needs to work."""
+
+	def __init__(self, sock):
+		self.sock = sock
+		self.fd = os.fdopen(sock.fileno())
+
+	def write(self, s):
+		self.sock.send(s)
+
+	def read(self, size = -1):
+		if size == -1:
+			return self.fd.read()
+		return self.fd.read(size)
+
+	def readline(self, size = -1):
+		return self.fd.readline(size)
+
+	def fileno(self):
+		return self.sock.fileno()
+
+	def close(self):
+		self.sock = None
+		self.fd.close()
+		self.fd = None
+
+
+class Server (object):
+	"Pickle-RPC server"
+	def __init__(self, ip, port = default_port):
+		self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		self.sock.bind((ip, port))
+		self.fds = []
+		self.functions = {}
+
+	def register(self, func, name = None):
+		if not name:
+			name = func.__name__
+		self.functions[name] = func
+
+	def loop(self):
+		self.sock.listen(15)
+		while True:
+			l = [self.sock] + self.fds
+			ifd, ofd, efd = select.select(l, [], [])
+
+			for fd in ifd:
+				if fd == self.sock:
+					self.new_connection()
+				else:
+					self.recv(fd)
+
+	def new_connection(self):
+		conn, addr = self.sock.accept()
+		self.fds.append(SockFD(conn))
+
+	def end_connection(self, fd):
+		if fd in self.fds:
+			self.fds.remove(fd)
+			try:
+				fd.close()
+			except IOError:
+				pass
+
+	def recv(self, fd):
+		try:
+			req = pickle.load(fd)
+			if not isinstance(req, tuple) and len(req) < 1:
+				# invalid request, break the connection
+				raise socket.error
+		except (socket.error, EOFError), info:
+			self.end_connection(fd)
+			return
+
+		funcname = req[0]
+		params = req[1]
+		kwparams = req[2]
+		if funcname not in self.functions:
+			self.send(fd, (replies.UNKNOWN,))
+			return
+
+		try:
+			r = self.functions[funcname](*params, **kwparams)
+		except:
+			e, i, tb = sys.exc_info()
+			strtb = traceback.format_exc()
+			self.send(fd, (replies.EXCEPTION, e, i, strtb))
+			return
+
+		self.send(fd, (replies.SUCCESS, r))
+
+	def send(self, fd, r):
+		try:
+			pickle.dump(r, fd, -1)
+		except (socket.error, EOFError), info:
+			self.end_connection(fd)
+
+
+class UnknownFunction (Exception):
+	pass
+
+class Remote (object):
+	"Pickle-RPC client"
+	def __init__(self, ip, port = default_port):
+		self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+		self.__sock.connect((ip, port))
+		self.__fd = SockFD(self.__sock)
+		self.__last_tb_str = ''
+
+	def _last_tb(self):
+		"""Returns a string showing the server's last traceback,
+		useful for debugging."""
+		return self.__last_tb_str
+
+	def __getattr__(self, name):
+		def stub_func(*args, **kwargs):
+			return self.__rpc(name, args, kwargs)
+
+		return stub_func
+
+	def __rpc(self, name, args, kwargs):
+		msg = (name, args, kwargs)
+		pickle.dump(msg, self.__fd, -1)
+		rep = pickle.load(self.__fd)
+		if rep[0] == replies.SUCCESS:
+			return rep[1]
+		elif rep[0] == replies.EXCEPTION:
+			e, i, strtb = rep[1:]
+			self.__last_tb_str = strtb
+			raise e, i
+		elif rep[0] == replies.UNKNOWN:
+			raise UnknownFunction, name
+
+
diff --git a/samples/pickle_rpc/c1.py b/samples/pickle_rpc/c1.py
new file mode 100644
index 0000000..8d471b2
--- /dev/null
+++ b/samples/pickle_rpc/c1.py
@@ -0,0 +1,12 @@
+
+import pickle_rpc
+cli = pickle_rpc.Remote('localhost')
+
+print cli.f(1, a = 2)
+print len(cli.long(10000))
+
+try:
+	cli.exc()
+except KeyError, info:
+	print 'successfuly caught', KeyError, info
+
diff --git a/samples/pickle_rpc/s1.py b/samples/pickle_rpc/s1.py
new file mode 100644
index 0000000..c703071
--- /dev/null
+++ b/samples/pickle_rpc/s1.py
@@ -0,0 +1,22 @@
+
+import pickle_rpc
+
+def f(*args, **kwargs):
+	print 'F!', args, kwargs
+	return ('Go!', args, kwargs)
+
+def exc():
+	raise KeyError, 'No no no'
+
+def long(n):
+	l = []
+	for i in range(n):
+		l.append((i, i + 1, i + 2, i + 3))
+	return l
+
+srv = pickle_rpc.Server('localhost')
+srv.register(f)
+srv.register(exc)
+srv.register(long)
+srv.loop()
+