"""
shawk.Client
------------
Define the Client interface in Shawk.
"""
from __future__ import print_function
from threading import Timer
import email
import csv
import re
import smtplib
import imapclient
from shawk.Contact import Contact
from shawk.Message import Message
from shawk import SMS_Address_Regex, sms_to_mail
[docs]class Client(object):
"""Define the main Shawk interface."""
def __init__(self, user, pwd, inbox=True, auto=True):
"""
Initialize the client and configure SMTP for sending messages.
This will establish an SMTP connection for sending messages to contacts.
Note that each instance of Client should be configured to a unique user (email).
"""
self.__user = user
self.auto_refresh_enabled = auto
self.contacts = {}
self.inbox = []
self.latest_messages = []
self.refresh_interval = 10 # Time in seconds
self.text_handlers = {}
self.default_text_handler = lambda x: print('Shawk received message: %s' % x)
# Configure SMTP
self.smtp = smtplib.SMTP("smtp.gmail.com", 587)
self.smtp.starttls()
self.smtp.login(str(user), str(pwd))
# Handle optional arguments
if inbox:
self.setup_inbox(pwd, auto=self.auto_refresh_enabled)
def __repr__(self):
"""Return the object representation of the Client."""
return "<shawk.Client({})>".format(self.__user)
def __str__(self):
"""Return the String representation of the Client."""
return "A Shawk SMS Client for {}".format(self.__user)
def __del__(self):
"""Delete the object."""
self.smtp.quit()
try:
self.imap.logout()
except AttributeError:
pass
[docs] def setup_inbox(self, password, user=None, folder='INBOX', refresh=False, auto=False, ssl=True):
"""
Configure an IMAP connection for receiving SMS.
Optionally configure behaviours such as auto-refresh,
refresh immediately once configured, or specify a folder.
Folder specifications are useful if you configure your Gmail account to
filter messages from certain senders to be moved to a specific folder,
that way they don't clutter your Gmail Inbox folder.
"""
# Apply user if not provided
if not user:
user = self.__user
# Connect IMAP server
self.imap = imapclient.IMAPClient('imap.gmail.com', ssl=ssl)
self.imap.login(user, password)
self.imap.select_folder(folder, readonly=True)
# Refresh if requested
if refresh and not auto:
self.refresh()
if auto:
self.enable_auto_refresh()
self.auto_refresh()
[docs] def enable_auto_refresh(self, start=True):
"""
Enable auto refresh of inbox.
Will also begin refreshing now, but can be disabled with `start=False`.
"""
self.auto_refresh_enabled = True
if start:
self.auto_refresh()
[docs] def disable_auto_refresh(self):
"""Disable auto refresh of inbox."""
self.auto_refresh_enabled = False
[docs] def auto_refresh(self):
"""Refresh the inbox automatically on an interval."""
if self.auto_refresh_enabled:
if self.imap:
self.refresh()
Timer(self.refresh_interval, self.auto_refresh, ()).start()
else:
raise Exception("No inbox is setup")
[docs] def refresh(self):
"""Refresh the inbox only once."""
# Get raw messages from imap
uids = self.imap.search('ALL')
raw_msgs = self.imap.fetch(uids, ['BODY[TEXT]', 'BODY[HEADER.FIELDS (FROM)]', 'INTERNALDATE'])
# Convert messages to string format and simplify structure
messages = []
for uid in raw_msgs:
obj = {}
obj['UID'] = uid
for key, value in raw_msgs[uid].items():
try:
if key.decode('utf-8') == 'BODY[HEADER.FIELDS (FROM)]':
obj['FROM'] = email.utils.parseaddr(value.decode('utf-8'))[1]
else:
if key.decode('utf-8') == 'BODY[TEXT]':
obj['BODY'] = value.decode('utf-8')
else:
obj[key.decode('utf-8')] = value.decode('utf-8')
except AttributeError:
obj[key.decode('utf-8')] = value
messages.append(obj)
# Find sms messages in messages
self.latest_messages = []
for msg in messages:
# If the sender's email address is in our supported gateways
if re.match(SMS_Address_Regex, msg['FROM']):
# Create Message object
contact = self.get_contact_from_address(msg['FROM'])
new_msg = Message(msg['BODY'], (contact or msg['FROM']), msg['INTERNALDATE'])
# Add to inbox and latest_messages
if new_msg not in self.inbox:
self.latest_messages.append(new_msg)
self.inbox.append(new_msg)
# Handle the new texts
for msg in self.latest_messages:
matched = False
# For each regex and function in text_handlers
for regex, func in self.text_handlers.items():
match = regex.match(msg.text)
# If a match occurred, call the function
if match:
matched = True
func(self, msg, match)
# If we did not match any specific regex
if not matched:
self.default_text_handler(self, msg)
[docs] def set_refresh_interval(self, time):
"""Define the refresh interval for auto refresh."""
self.refresh_interval = time
[docs] def get_refresh_interval(self):
"""Return the refresh interval for auto refresh."""
return self.refresh_interval
[docs] def text_handler(self, pattern=None, modifiers=''):
"""
Define a decorator that accepts a regular expression in string form for handlers.
Sets the default text handler if no string is provided.
"""
# Collect modifiers
modifiers = modifiers.lower()
flags = None
if 's' in modifiers:
flags = re.DOTALL if not flags else flags | re.DOTALL
if 'i' in modifiers:
flags = re.IGNORECASE if not flags else flags | re.IGNORECASE
if 'm' in modifiers:
flags = re.MULTILINE if not flags else flags | re.MULTILINE
if 'l' in modifiers:
flags = re.LOCALE if not flags else flags | re.LOCALE
if 'u' in modifiers:
flags = re.UNICODE if not flags else flags | re.UNICODE
if 'x' in modifiers:
flags = re.VERBOSE if not flags else flags | re.VERBOSE
# Compile the regular expression
try:
if flags:
text_regex = re.compile(pattern, flags)
else:
text_regex = re.compile(pattern)
except Exception as e:
# If text was provided
if pattern:
raise Exception("An error occured while compiling regex: ", e)
else:
pass
def decorator(func):
"""Closure that receives function."""
# Set the default text handler if no regex is provided
if not pattern:
self.default_text_handler = func
else:
self.text_handlers[text_regex] = func
# Return unmodified function
return func
# Return decorator
return decorator
def __sendmail(self, address, message):
"""Send the content of message to address."""
return self.smtp.sendmail('0', address, message)
[docs] def send(self, message, contact=None, address=None, number=None, name=None, carrier=None):
"""
Send a message.
Can determine a contact to use via a number of different specifications,
but it is advised to specify a Contact object if possible.
However, passing a specific address takes precedence over any other input.
"""
if not contact and not name and not number and not address:
raise Exception("No contact information provided")
if contact:
assert(type(contact) is Contact)
# Convert Message instances to string
if isinstance(message, Message):
message = message.text
# Send message to recipient
if address:
return self.__sendmail(address, message)
if contact:
return self.__sendmail(contact.get_address(), message)
# Address is not readily available, determine from other inputs
# Find address if given number
if number:
number = str(number)
# Get address of recipient
try:
address = self.contacts[number].get_address()
return self.__sendmail(address, message)
except KeyError:
# Number not in contacts
if not carrier:
# Not enough information
raise Exception("Could not find number {} in contacts; require carrier information".format(number))
else:
# Build address
address = sms_to_mail(number, carrier)
# Send the message
return self.__sendmail(address, message)
# Find address if only given name
if name and not address:
name = str(name)
for _, each in self.contacts.items():
if each.name == name:
address = each.get_address()
# Send the message
return self.__sendmail(address, message)
if not number:
# Name was not found in contacts
raise Exception("No contact found matching the name {}".format(name))