#!/usr/bin/env python """ A simple scriptable msn bot. Can use gtalkbot plugins. ----------------------------- Alberto Bertogli albertito@blitiri.com.ar """ import sys import os.path import time import select import socket import thread import msnlib import msncb # null debug output msnlib.debug = lambda s: '' msncb.debug = lambda s: '' # # Our generic callbacks, used internally # # They depend on the md having a ._botobj reference to the bot object. Not so # nice, but simple enough. def generic_cb_msg(md, type, tid, params, sbd): t = tid.split() email = t[0] if email == 'Hotmail': return lines = params.split('\n') headers = {} eoh = 1 for i in lines: if i == '\r': break t, v = i.split(':', 1) headers[t] = v eoh += 1 if headers.get('Content-Type', '') == 'text/x-msmsgscontrol': # typing, ignore return md._botobj._handle_msg(email, headers, lines[eoh:]) msncb.cb_msg(md, type, tid, params, sbd) # # The bot itself # class bot: def __init__(self, email, passwd, userdict = None): self.email = email self.passwd = passwd self.msg_handlers = [] if userdict: self.userdict = userdict else: self.userdict = {} def _setup(self): self.m = msnlib.msnd() self.m.cb = msncb.cb() self.m.email = self.email self.m.pwd = self.passwd # used by the generic callbacks self.m._botobj = self # generic callbacks self.m.cb.msg = generic_cb_msg def login(self, status = 'online'): "Logs into the MSN network" self._setup() self.m.login() self.m.sync() # mini loop so we are sure we get the entire list before going # on with the normal stuff while self.m.lst_total != self.m.syn_total: infd, outfd = self.get_pollable_fds() fds = select.select(infd, outfd, [], None) for i in fds[0] + fds[1]: self.m.read(i) self.change_status(status) self._check_users() def close(self): self.m.disconnect() def reconnect(self): "Reconnects to the MSN network" self._setup() self.login(self.status) def change_status(self, status): "Changes the status" self.status = status self.m.change_status(status) def _check_users(self): # add everyone in the userlist if they're not already in our # roster for email in self.userdict.keys(): if email not in self.m.users: self.m.useradd(email, email) def get_pollable_fds(self): "Returns pollable fds, used for network pooling" return self.m.pollable() def loop(self): "Simple, exclusive network loop" while 1: infd, outfd = self.get_pollable_fds() fds = select.select(infd, outfd, [], 1) for i in fds[0] + fds[1]: try: self.m.read(i) except (msnlib.SocketError, socket.error), err: traceback.print_last() if i != self.m: # the user closed a connection m.close(i) else: # main socket closed return def register_msg_handler(self, f): "Registers a message handler" self.msg_handlers.append(f) def _handle_msg(self, email, header, msg): if email not in self.userdict: self.m.sendmsg(email, "Who are you?") return reply = [] for f in self.msg_handlers: r = f(email, self.userdict[email], header, msg) if r: reply.append(r) if reply: self.m.sendmsg(email, '\r\n'.join(reply)) # # Message handlers # def sample_msg_handler(email, info, header, msg): return "Echo!\n" + '\n'.join(msg) # gtalkbot-compatible message handler class gtalkbot_msg_handler: def __init__(self, path): self.plugins = [] sys.path.insert(0, path) for f in os.listdir(path): if f.endswith('.py'): root, ext = os.path.splitext(f) self.plugins.append(__import__(root)) sys.path.pop(0) self.verbs = {} for p in self.plugins: for v in p.Verbs(): if v not in self.verbs: self.verbs[v] = [] self.verbs[v].append(p) self.authenticated_users = [] def handle_msg(self, email, info, header, msg): # XXX: this only handles the first line vl = msg[0].split(None, 1) if not vl: return if len(vl) < 2: verb, line = vl[0], '' else: verb, line = vl if email not in self.authenticated_users and verb != 'auth': return 'You need to authenticate\n' \ + 'Use: auth ' if verb == 'auth': if line != info: return 'Wrong password, try again' self.authenticated_users.append(email) return 'Welcome!' elif verb == 'help': if not line: return 'Use: help ' reply = [] for p in self.plugins: if 'Help' not in dir(p): continue r = p.Help(line) if r: reply.append(r) if reply: return '\r\n'.join(reply) else: return 'Sorry, no help for ' + line elif verb in self.verbs: reply = [] for p in self.verbs[verb]: r = p.Command(verb, line) if r: reply.append(r) if reply: return '\r\n'.join(reply) else: return 'Unknown verb' else: return 'Unknown verb' def __call__(self, email, info, header, msg): return self.handle_msg(email, info, header, msg) def main(): # get the login email and password from the parameters try: email = sys.argv[1] passwd = sys.argv[2] userlistfname = sys.argv[3] pluginspath = sys.argv[4] except: print "Use: msnsbot email password userlist pluginspath" sys.exit(1) # create a user dictionary with email as key, and anything else as # value (as a single string) userlist = [ line.strip().split(None, 1) \ for line in open(userlistfname) ] userdict = dict( [ x for x in userlist if len(x) > 1 ] ) b = bot(email, passwd, userdict) #b.register_msg_handler(sample_msg_handler) b.register_msg_handler(gtalkbot_msg_handler(pluginspath)) b.login() b.loop() if __name__ == '__main__': main()