You are here: Home Code imap2gmail.py
About

I am a political scientist currently employed as a post-doctoral fellow at IIIS, Trinity College, Dublin. This site provides details of my background and research interests.

 

imap2gmail.py

Command-line tool to transfer messages from an existing IMAP account to a GMail account. Could easily be adapted to other ends. N.B. Now contains a fix for the missing trailing ':' after 'def flush(self)' (thanks to Stephan Jakoubek).

imap2gmail.py — Python Source, 10Kb

File contents

##################################################################
# CONFIG: Change these constants if you wish.
##################################################################

# This is overridden by the command-line argument if present.
DEFAULT_SMTP = 'gsmtp57.google.com'

# Time to wait between sending of each message to the SMTP server.
SMTP_SLEEP_INTERVAL = 2

##################################################################
# End CONFIG
##################################################################



# Command-line documentation... you'll have to figure out how to use the code direct from python ;-)
__doc__ = """
-i --imap_server    Sets the IMAP server that the to-be-exported-into-GMail messages currently reside on. (REQUIRED)
-u --user           Sets the username used to login to the IMAP server. (REQUIRED)
-t --to             Sets the gmail address that the messages are to be exported to. (REQUIRED)
-s --subroot        Sets the IMAP folder path that must be matched for all exports. (OPTIONAL)
                    e.g. 'INBOX.a_folder' would export all messages in INBOX.a_folder and its subfolders.
-r --smtp           Sets the SMTP server to use for sending the messages to GMail.  Defaults to '%s'. (OPTIONAL)
-p --passw          Sets the password used to login to the IMAP server.
                    If unspecified, user prompted to input it. (OPTIONAL)
-m --munge_subject  If this argument is present, the original IMAP folder path of each message is shoved into the
                    subject header. (OPTIONAL)
                    e.g. Subject: original subject [INBOX.a_folder]
                    "Yuck!", I hear you say.  Indeed.  But GMail won't filter on arbitrary headers (2004/8/10),
                    so this hack may be necessary for some.  If you're confident that GMail will soon allow you to
                    search/filter on arbitrary headers, you can forget about this.  The folder path is stuffed into
                    the 'X-Mail-Folder' header, and the flags are put into 'X-Flags', no matter whether you choose to
                    munge your subject or not.
-h --help           Displays this text.
""" % DEFAULT_SMTP


import imaplib, re, time, smtplib, sys, getpass, getopt
from email import message_from_string as mfs
from types import TupleType


folder_flags_re = re.compile('\A\((.*?)\)')
flags_re = re.compile('FLAGS \((.*?)\)')


class FolderListParser:
    """Parse, for example, this:
        '(\\NoInferiors \\Marked) "/" ~/mail/bris-zope'
    """

    def __init__(self, s):
        self._s = s

    def getFolderFlags(self):
        return folder_flags_re.findall(self._s)[0].split(' ')

    def getSeparator(self):
        ind = self._s.find('"')
        return self._s[ind+1:ind+2]

    def getPath(self):
        try:
            ind = self._s.find('"')
        except AttributeError:
            raise Exception(self._s)
        if self._s[ind+4] == '"':
            return self._s[ind+5:-1]
        return self._s[ind+4:]


class Logger:
    """A logger that doesn't complain about not being able to 'write'.
    """

    def __init__(self, log=None):
        self.log = log

    def write(self, text):
        if hasattr(self.log, 'write'):
            self.log.write(text)
        self.flush()

    def flush(self):
        if hasattr(self.log, 'flush'):
            self.log.flush()


class ProgressBar:
    """A simple progress bar, that's really meant for writing to sys.stdout,
    but will quite happily 'write' to anything that will let it.
    """

    def __init__(self, max_value, title='Progress Bar', logger=Logger()):
        self.max_value = max_value
        self.count = 0
        self.print_count = 0
        self.title = title
        self.logger = logger
        self.logger.write("""%s:
0-----------------------25-----------------------50-----------------------75---------------------100
""" % self.title )

    def _calculateCorrectPrintCount(self, count):
        if self.max_value == 0:
            return 100
        return divmod(count*100, self.max_value)[0]

    def logProgress(self, increment):
        new_count = self.count + increment
        correct_print_count = self._calculateCorrectPrintCount(new_count)
        if correct_print_count > self.print_count:
            diff = correct_print_count - self.print_count
            self.logger.write('#' * diff)
        self.count = new_count
        self.print_count = correct_print_count

    def finish(self, text=None):
        if self.print_count != self.max_value or self.max_value == 0:
            correct_print_count = self._calculateCorrectPrintCount(self.max_value)
            diff = correct_print_count - self.print_count
            self.logger.write('#' * diff)
        if text is None:
            text = 'Complete!'
        self.logger.write('\n%s\n\n' % text)


class IMAPFolder:

    def __init__(self, path, logger=Logger()):
        self.path = path
        self.logger = logger

    def select(self, conn):
        typ, data = conn.select(self.path)
        if typ != 'OK':
            raise Exception('Could not select "%s"' % self.path)
        try:
            msg_count = int(data[0])
        except IndexError:
            msg_count = 0
        return msg_count

    def getMessages(self, conn):
        msg_count = self.select(conn)
        self.logger.write("Fetching %s messages from '%s'\n" % (msg_count, self.path))
        r = conn.fetch('1:*', '(RFC822 FLAGS)')
        messages = []
        if r[0] != 'OK':
            raise Exception('Bad response from server: %s' % r)
        for each in r[1]:
            if isinstance(each, TupleType):
                msg = mfs(each[1])
                # Add flags to X-Flags header
                flags = flags_re.search(each[0])
                if flags:
                    msg['X-Flags'] = ' '.join(flags.groups())
                # Add folder path to X-Mail-Folder
                msg['X-Mail-Folder'] = self.path
                messages.append(msg)
        return (messages, msg_count)


class IMAPAccount:

    def __init__(self, server, user, passw, logger=Logger()):
        self.server = server
        self.user = user
        self.passw = passw
        self.logger = logger
        self._conn = None

    def getConn(self):
        if self._conn is None or self._conn.state != 'AUTH':
            conn = imaplib.IMAP4(self.server)
            conn.login(self.user, self.passw)
            self._conn = conn
            self.logger.write("Connected to '%s' as '%s'\n" % (self.server, self.user))
        return self._conn

    def listFolders(self, subroot=None):
        conn = self.getConn()
        r = conn.list()
        folders = []
        folder_set = []
        if subroot is not None:
            for each in r[1]:
                if each.find(subroot) != -1:
                    folder_set.append(each)
        else:
            folder_set = r[1]
        for each in folder_set:
            p = FolderListParser(each)
            imapfolder = IMAPFolder(p.getPath(), logger=self.logger)
            folders.append(imapfolder)
        folder_paths = [each.path for each in folders]
        self.logger.write("Found IMAP folders: '%s'\n" % "', '".join(folder_paths))
        return folders


class GMailImporter:

    def __init__(  self
                 , imapserver
                 , user
                 , passw
                 , gmailaddr
                 , smtpsrv=DEFAULT_SMTP
                 , subroot=None
                 , mungesubject=None
                 , logger=Logger()
                ):
        self.imapacct = IMAPAccount(imapserver, user, passw, logger)
        self.gmailaddr = gmailaddr
        self.smtpsrv = smtpsrv
        self.subroot = subroot
        self.mungesubject = mungesubject
        self.logger = logger

    def doImport(self):
        server = smtplib.SMTP(self.smtpsrv)
        #server.set_debuglevel(5)
        conn = self.imapacct.getConn()
        folders = self.imapacct.listFolders(self.subroot)
        for folder in folders:
            count = 0
            msgs, msg_count = folder.getMessages(conn)
            progress_title = 'GMail upload progress for %s (%s messages in total)' % (folder.path, msg_count)
            progress_log = ProgressBar(msg_count, progress_title, self.logger)
            for msg in msgs:
                if self.mungesubject:
                    munged_subject = '%s [%s]' % (msg['Subject'], folder.path)
                    msg.replace_header('Subject', munged_subject)
                server.sendmail(msg['From'], self.gmailaddr, msg.as_string())
                progress_log.logProgress(1)
                count += 1
                time.sleep(SMTP_SLEEP_INTERVAL)
            progress_log.finish()
        server.quit()



def usage():
    print __doc__


def main(argv):
    try:
        opts, args = getopt.getopt(  argv
                                   , "hi:u:t:r:s:p:m"
                                   , [  "help"
                                      , "imap_server="
                                      , "user="
                                      , "to="
                                      , "smtp="
                                      , "subroot="
                                      , "munge_subject"
                                      , "passw="
                                      ]
                                   )
    except getopt.GetoptError:
        usage()
        sys.exit(2)
    imap_server = user = gmailaddr = subroot = passw = mungesubject = None
    smtpsrv = DEFAULT_SMTP
    for opt, arg in opts:
        if opt in ("-h", "--help"):
            usage()
            sys.exit()
        elif opt in ("-i", "--imap_server"):
            imap_server = arg
        elif opt in ("-u", "--user"):
            user = arg
        elif opt in ("-t", "--to"):
            gmailaddr = arg
        elif opt in ("-s", "--subroot"):
            subroot = arg
        elif opt in ("-r", "--smtp"):
            smtpsrv = arg
        elif opt in ("-m", "--munge_subject"):
            mungesubject = 1
        elif opt in ("-p", "--passw"):
            passw = arg
    if imap_server is None or user is None or gmailaddr is None:
        usage()
        sys.exit(2)
    if passw is None:
        passw = getpass.getpass('IMAP server password: ')
    logger = Logger(sys.stdout)
    g = GMailImporter(imap_server, user, passw, gmailaddr, smtpsrv, subroot, mungesubject, logger)
    g.doImport()

if __name__ == '__main__':
    main(sys.argv[1:])
Document Actions