author | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-02-12 21:51:28 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2009-02-12 21:51:28 UTC |
parent | 34b679ea393a3b77c03f8259486ba1527c2db530 |
utils/msnsbot | +278 | -0 |
diff --git a/utils/msnsbot b/utils/msnsbot new file mode 100755 index 0000000..8e790a1 --- /dev/null +++ b/utils/msnsbot @@ -0,0 +1,278 @@ +#!/usr/bin/env python + +""" + +A simple scriptable msn bot. + +Can use gtalkbot plugins. + +----------------------------- +Alberto Bertogli +albertito@blitiri.com.ar +""" + + +import sys +import os.path +import time +import select +import socket +import thread + +import msnlib +import msncb + +# null debug output +msnlib.debug = lambda s: '' +msncb.debug = lambda s: '' + + +# +# Our generic callbacks, used internally +# +# They depend on the md having a ._botobj reference to the bot object. Not so +# nice, but simple enough. + +def generic_cb_msg(md, type, tid, params, sbd): + t = tid.split() + email = t[0] + + if email == 'Hotmail': + return + + lines = params.split('\n') + headers = {} + eoh = 1 + for i in lines: + if i == '\r': + break + t, v = i.split(':', 1) + headers[t] = v + eoh += 1 + + if headers.get('Content-Type', '') == 'text/x-msmsgscontrol': + # typing, ignore + return + + md._botobj._handle_msg(email, headers, lines[eoh:]) + msncb.cb_msg(md, type, tid, params, sbd) + + + +# +# The bot itself +# + +class bot: + def __init__(self, email, passwd, userdict = None): + self.email = email + self.passwd = passwd + self.msg_handlers = [] + if userdict: + self.userdict = userdict + else: + self.userdict = {} + + def _setup(self): + self.m = msnlib.msnd() + self.m.cb = msncb.cb() + self.m.email = self.email + self.m.pwd = self.passwd + + # used by the generic callbacks + self.m._botobj = self + + # generic callbacks + self.m.cb.msg = generic_cb_msg + + def login(self, status = 'online'): + "Logs into the MSN network" + self._setup() + self.m.login() + self.m.sync() + + # mini loop so we are sure we get the entire list before going + # on with the normal stuff + while self.m.lst_total != self.m.syn_total: + infd, outfd = self.get_pollable_fds() + fds = select.select(infd, outfd, [], None) + for i in fds[0] + fds[1]: + self.m.read(i) + + self.change_status(status) + self._check_users() + + def close(self): + self.m.disconnect() + + def reconnect(self): + "Reconnects to the MSN network" + self._setup() + self.login(self.status) + + def change_status(self, status): + "Changes the status" + self.status = status + self.m.change_status(status) + + def _check_users(self): + # add everyone in the userlist if they're not already in our + # roster + for email in self.userdict.keys(): + if email not in self.m.users: + self.m.useradd(email, email) + + def get_pollable_fds(self): + "Returns pollable fds, used for network pooling" + return self.m.pollable() + + def loop(self): + "Simple, exclusive network loop" + while 1: + infd, outfd = self.get_pollable_fds() + + fds = select.select(infd, outfd, [], 1) + + for i in fds[0] + fds[1]: + try: + self.m.read(i) + except ('SocketError', socket.error), err: + traceback.print_last() + if i != self.m: + # the user closed a connection + m.close(i) + else: + # main socket closed + return + + def register_msg_handler(self, f): + "Registers a message handler" + self.msg_handlers.append(f) + + def _handle_msg(self, email, header, msg): + if email not in self.userdict: + self.m.sendmsg(email, "Who are you?") + return + + reply = [] + + for f in self.msg_handlers: + r = f(email, self.userdict[email], header, msg) + if r: + reply.append(r) + + if reply: + self.m.sendmsg(email, '\r\n'.join(reply)) + + +# +# Message handlers +# + +def sample_msg_handler(email, info, header, msg): + return "Echo!\n" + '\n'.join(msg) + + +# gtalkbot-compatible message handler +class gtalkbot_msg_handler: + def __init__(self, path): + self.plugins = [] + sys.path.insert(0, path) + for f in os.listdir(path): + if f.endswith('.py'): + root, ext = os.path.splitext(f) + self.plugins.append(__import__(root)) + sys.path.pop(0) + self.verbs = {} + + for p in self.plugins: + for v in p.Verbs(): + if v not in self.verbs: + self.verbs[v] = [] + self.verbs[v].append(p) + + self.authenticated_users = [] + + def handle_msg(self, email, info, header, msg): + # XXX: this only handles the first line + vl = msg[0].split(None, 1) + if not vl: + return + if len(vl) < 2: + verb, line = vl[0], '' + else: + verb, line = vl + + if email not in self.authenticated_users and verb != 'auth': + return 'You need to authenticate\n' \ + + 'Use: auth <password>' + + if verb == 'auth': + if line != info: + return 'Wrong password, try again' + self.authenticated_users.append(email) + return 'Welcome!' + + elif verb == 'help': + if not line: + return 'Use: help <verb>' + + reply = [] + for p in self.plugins: + if 'Help' not in dir(p): + continue + r = p.Help(line) + if r: + reply.append(r) + if reply: + return '\r\n'.join(reply) + else: + return 'Sorry, no help for ' + line + + elif verb in self.verbs: + reply = [] + for p in self.verbs[verb]: + r = p.Command(verb, line) + if r: + reply.append(r) + if reply: + return '\r\n'.join(reply) + else: + return 'Unknown verb' + + else: + return 'Unknown verb' + + + def __call__(self, email, info, header, msg): + return self.handle_msg(email, info, header, msg) + + +def main(): + + # get the login email and password from the parameters + try: + email = sys.argv[1] + passwd = sys.argv[2] + userlistfname = sys.argv[3] + pluginspath = sys.argv[4] + except: + print "Use: msnsbot email password userlist pluginspath" + sys.exit(1) + + # create a user dictionary with email as key, and anything else as + # value (as a single string) + userlist = [ line.strip().split(None, 1) \ + for line in open(userlistfname) ] + userdict = dict( [ x for x in userlist if len(x) > 1 ] ) + + b = bot(email, passwd, userdict) + #b.register_msg_handler(sample_msg_handler) + b.register_msg_handler(gtalkbot_msg_handler(pluginspath)) + b.login() + b.loop() + +if __name__ == '__main__': + main() + +