#!/usr/bin/env python
"""
A simple scriptable msn bot.
Can use gtalkbot plugins.
-----------------------------
Alberto Bertogli
albertito@blitiri.com.ar
"""
import sys
import os.path
import time
import select
import socket
import thread
import msnlib
import msncb
# null debug output
msnlib.debug = lambda s: ''
msncb.debug = lambda s: ''
#
# Our generic callbacks, used internally
#
# They depend on the md having a ._botobj reference to the bot object. Not so
# nice, but simple enough.
def generic_cb_msg(md, type, tid, params, sbd):
t = tid.split()
email = t[0]
if email == 'Hotmail':
return
lines = params.split('\n')
headers = {}
eoh = 1
for i in lines:
if i == '\r':
break
t, v = i.split(':', 1)
headers[t] = v
eoh += 1
if headers.get('Content-Type', '') == 'text/x-msmsgscontrol':
# typing, ignore
return
md._botobj._handle_msg(email, headers, lines[eoh:])
msncb.cb_msg(md, type, tid, params, sbd)
#
# The bot itself
#
class bot:
def __init__(self, email, passwd, userdict = None):
self.email = email
self.passwd = passwd
self.msg_handlers = []
if userdict:
self.userdict = userdict
else:
self.userdict = {}
def _setup(self):
self.m = msnlib.msnd()
self.m.cb = msncb.cb()
self.m.email = self.email
self.m.pwd = self.passwd
# used by the generic callbacks
self.m._botobj = self
# generic callbacks
self.m.cb.msg = generic_cb_msg
def login(self, status = 'online'):
"Logs into the MSN network"
self._setup()
self.m.login()
self.m.sync()
# mini loop so we are sure we get the entire list before going
# on with the normal stuff
while self.m.lst_total != self.m.syn_total:
infd, outfd = self.get_pollable_fds()
fds = select.select(infd, outfd, [], None)
for i in fds[0] + fds[1]:
self.m.read(i)
self.change_status(status)
self._check_users()
def close(self):
self.m.disconnect()
def reconnect(self):
"Reconnects to the MSN network"
self._setup()
self.login(self.status)
def change_status(self, status):
"Changes the status"
self.status = status
self.m.change_status(status)
def _check_users(self):
# add everyone in the userlist if they're not already in our
# roster
for email in self.userdict.keys():
if email not in self.m.users:
self.m.useradd(email, email)
def get_pollable_fds(self):
"Returns pollable fds, used for network pooling"
return self.m.pollable()
def loop(self):
"Simple, exclusive network loop"
while 1:
infd, outfd = self.get_pollable_fds()
fds = select.select(infd, outfd, [], 1)
for i in fds[0] + fds[1]:
try:
self.m.read(i)
except (msnlib.SocketError, socket.error), err:
traceback.print_last()
if i != self.m:
# the user closed a connection
m.close(i)
else:
# main socket closed
return
def register_msg_handler(self, f):
"Registers a message handler"
self.msg_handlers.append(f)
def _handle_msg(self, email, header, msg):
if email not in self.userdict:
self.m.sendmsg(email, "Who are you?")
return
reply = []
for f in self.msg_handlers:
r = f(email, self.userdict[email], header, msg)
if r:
reply.append(r)
if reply:
self.m.sendmsg(email, '\r\n'.join(reply))
#
# Message handlers
#
def sample_msg_handler(email, info, header, msg):
return "Echo!\n" + '\n'.join(msg)
# gtalkbot-compatible message handler
class gtalkbot_msg_handler:
def __init__(self, path):
self.plugins = []
sys.path.insert(0, path)
for f in os.listdir(path):
if f.endswith('.py'):
root, ext = os.path.splitext(f)
self.plugins.append(__import__(root))
sys.path.pop(0)
self.verbs = {}
for p in self.plugins:
for v in p.Verbs():
if v not in self.verbs:
self.verbs[v] = []
self.verbs[v].append(p)
self.authenticated_users = []
def handle_msg(self, email, info, header, msg):
# XXX: this only handles the first line
vl = msg[0].split(None, 1)
if not vl:
return
if len(vl) < 2:
verb, line = vl[0], ''
else:
verb, line = vl
if email not in self.authenticated_users and verb != 'auth':
return 'You need to authenticate\n' \
+ 'Use: auth <password>'
if verb == 'auth':
if line != info:
return 'Wrong password, try again'
self.authenticated_users.append(email)
return 'Welcome!'
elif verb == 'help':
if not line:
return 'Use: help <verb>'
reply = []
for p in self.plugins:
if 'Help' not in dir(p):
continue
r = p.Help(line)
if r:
reply.append(r)
if reply:
return '\r\n'.join(reply)
else:
return 'Sorry, no help for ' + line
elif verb in self.verbs:
reply = []
for p in self.verbs[verb]:
r = p.Command(verb, line)
if r:
reply.append(r)
if reply:
return '\r\n'.join(reply)
else:
return 'Unknown verb'
else:
return 'Unknown verb'
def __call__(self, email, info, header, msg):
return self.handle_msg(email, info, header, msg)
def main():
# get the login email and password from the parameters
try:
email = sys.argv[1]
passwd = sys.argv[2]
userlistfname = sys.argv[3]
pluginspath = sys.argv[4]
except:
print "Use: msnsbot email password userlist pluginspath"
sys.exit(1)
# create a user dictionary with email as key, and anything else as
# value (as a single string)
userlist = [ line.strip().split(None, 1) \
for line in open(userlistfname) ]
userdict = dict( [ x for x in userlist if len(x) > 1 ] )
b = bot(email, passwd, userdict)
#b.register_msg_handler(sample_msg_handler)
b.register_msg_handler(gtalkbot_msg_handler(pluginspath))
b.login()
b.loop()
if __name__ == '__main__':
main()