git » msnlib » master » tree

[master] / utils / msnsbot

#!/usr/bin/env python

"""

A simple scriptable msn bot.

Can use gtalkbot plugins.

-----------------------------
Alberto Bertogli
albertito@blitiri.com.ar
"""


import sys
import os.path
import time
import select
import socket
import thread

import msnlib
import msncb

# null debug output
msnlib.debug = lambda s: ''
msncb.debug = lambda s: ''


#
# Our generic callbacks, used internally
#
# They depend on the md having a ._botobj reference to the bot object. Not so
# nice, but simple enough.

def generic_cb_msg(md, type, tid, params, sbd):
	t = tid.split()
	email = t[0]

	if email == 'Hotmail':
		return

	lines = params.split('\n')
	headers = {}
	eoh = 1
	for i in lines:
		if i == '\r':
			break
		t, v = i.split(':', 1)
		headers[t] = v
		eoh += 1

	if headers.get('Content-Type', '') == 'text/x-msmsgscontrol':
		# typing, ignore
		return

	md._botobj._handle_msg(email, headers, lines[eoh:])
	msncb.cb_msg(md, type, tid, params, sbd)



#
# The bot itself
#

class bot:
	def __init__(self, email, passwd, userdict = None):
		self.email = email
		self.passwd = passwd
		self.msg_handlers = []
		if userdict:
			self.userdict = userdict
		else:
			self.userdict = {}

	def _setup(self):
		self.m = msnlib.msnd()
		self.m.cb = msncb.cb()
		self.m.email = self.email
		self.m.pwd = self.passwd

		# used by the generic callbacks
		self.m._botobj = self

		# generic callbacks
		self.m.cb.msg = generic_cb_msg

	def login(self, status = 'online'):
		"Logs into the MSN network"
		self._setup()
		self.m.login()
		self.m.sync()

		# mini loop so we are sure we get the entire list before going
		# on with the normal stuff
		while self.m.lst_total != self.m.syn_total:
			infd, outfd = self.get_pollable_fds()
			fds = select.select(infd, outfd, [], None)
			for i in fds[0] + fds[1]:
				self.m.read(i)

		self.change_status(status)
		self._check_users()

	def close(self):
		self.m.disconnect()

	def reconnect(self):
		"Reconnects to the MSN network"
		self._setup()
		self.login(self.status)

	def change_status(self, status):
		"Changes the status"
		self.status = status
		self.m.change_status(status)

	def _check_users(self):
		# add everyone in the userlist if they're not already in our
		# roster
		for email in self.userdict.keys():
			if email not in self.m.users:
				self.m.useradd(email, email)

	def get_pollable_fds(self):
		"Returns pollable fds, used for network pooling"
		return self.m.pollable()

	def loop(self):
		"Simple, exclusive network loop"
		while 1:
			infd, outfd = self.get_pollable_fds()

			fds = select.select(infd, outfd, [], 1)

			for i in fds[0] + fds[1]:
				try:
					self.m.read(i)
				except (msnlib.SocketError, socket.error), err:
					traceback.print_last()
					if i != self.m:
						# the user closed a connection
						m.close(i)
					else:
						# main socket closed
						return

	def register_msg_handler(self, f):
		"Registers a message handler"
		self.msg_handlers.append(f)

	def _handle_msg(self, email, header, msg):
		if email not in self.userdict:
			self.m.sendmsg(email, "Who are you?")
			return

		reply = []

		for f in self.msg_handlers:
			r = f(email, self.userdict[email], header, msg)
			if r:
				reply.append(r)

		if reply:
			self.m.sendmsg(email, '\r\n'.join(reply))


#
# Message handlers
#

def sample_msg_handler(email, info, header, msg):
	return "Echo!\n" + '\n'.join(msg)


# gtalkbot-compatible message handler
class gtalkbot_msg_handler:
	def __init__(self, path):
		self.plugins = []
		sys.path.insert(0, path)
		for f in os.listdir(path):
			if f.endswith('.py'):
				root, ext = os.path.splitext(f)
				self.plugins.append(__import__(root))
		sys.path.pop(0)
		self.verbs = {}

		for p in self.plugins:
			for v in p.Verbs():
				if v not in self.verbs:
					self.verbs[v] = []
				self.verbs[v].append(p)

		self.authenticated_users = []

	def handle_msg(self, email, info, header, msg):
		# XXX: this only handles the first line
		vl = msg[0].split(None, 1)
		if not vl:
			return
		if len(vl) < 2:
			verb, line = vl[0], ''
		else:
			verb, line = vl

		if email not in self.authenticated_users and verb != 'auth':
			return 'You need to authenticate\n' \
				+ 'Use: auth <password>'

		if verb == 'auth':
			if line != info:
				return 'Wrong password, try again'
			self.authenticated_users.append(email)
			return 'Welcome!'

		elif verb == 'help':
			if not line:
				return 'Use: help <verb>'

			reply = []
			for p in self.plugins:
				if 'Help' not in dir(p):
					continue
				r = p.Help(line)
				if r:
					reply.append(r)
			if reply:
				return '\r\n'.join(reply)
			else:
				return 'Sorry, no help for ' + line

		elif verb in self.verbs:
			reply = []
			for p in self.verbs[verb]:
				r = p.Command(verb, line)
				if r:
					reply.append(r)
			if reply:
				return '\r\n'.join(reply)
			else:
				return 'Unknown verb'

		else:
			return 'Unknown verb'


	def __call__(self, email, info, header, msg):
		return self.handle_msg(email, info, header, msg)


def main():

	# get the login email and password from the parameters
	try:
		email = sys.argv[1]
		passwd = sys.argv[2]
		userlistfname = sys.argv[3]
		pluginspath = sys.argv[4]
	except:
		print "Use: msnsbot email password userlist pluginspath"
		sys.exit(1)

	# create a user dictionary with email as key, and anything else as
	# value (as a single string)
	userlist = [ line.strip().split(None, 1) \
			for line in open(userlistfname) ]
	userdict = dict( [ x for x in userlist if len(x) > 1 ] )

	b = bot(email, passwd, userdict)
	#b.register_msg_handler(sample_msg_handler)
	b.register_msg_handler(gtalkbot_msg_handler(pluginspath))
	b.login()
	b.loop()

if __name__ == '__main__':
	main()