imap_address_extractor/extractor.py

145 lines
4.5 KiB
Python

"""Create a connection to IMAPS mail account extract all unique adresses and output a single file with the results.
References:
http://www.voidynullness.net/blog/2013/07/25/gmail-email-with-python-via-imap/
and
https://yuji.wordpress.com/2011/06/22/python-imaplib-imap-example-with-gmail/
Initially grabbed from : https://gist.github.com/abought/15a1e08705b121c1b7bd
"""
__author__ = 'mj'
import email.parser
import imaplib
import getpass
import sys
import re
import ssl
import os
from dotenv import load_dotenv
from pprint import pprint as pp
# Load environment variables from .env file
load_dotenv()
# User may want to change these parameters if running script as-is
# Search folders, multiple directories can be given
# TODO: A user will want to change this
SEARCH_FOLDER = ['"Trash"', '"INBOX"']
DEFAULT_MAIL_SERVER = os.getenv("EMAIL_SERVER")
# Output file name
OUTPUT_FILE = "recipient_list.txt"
# No user parameters below this line
ADDR_PATTERN = re.compile("<(.+)>") # Finds email as <nospam@nospam.com>
def connect(user, pwd, server=DEFAULT_MAIL_SERVER):
"""Connect to [the specified] mail server. Return an open connection"""
conn = imaplib.IMAP4_SSL(host=server, ssl_context=ssl.create_default_context())
try:
conn.login(user, pwd)
except imaplib.IMAP4.error:
print("Failed to login")
sys.exit(1)
return conn
def print_folders(conn):
"""Print a list of open mailbox folders"""
for f in conn.list()[1]:
folder_name = f.decode().split(' "/" ')[1]
print("\t", folder_name)
def get_mails_from_folder(conn, folder_name):
"""Fetch a specific folder (or label) from server"""
typ, data = conn.select(mailbox=folder_name, readonly=False) # Set readonly=False to mark messages as seen
if typ != 'OK':
print("Could not open specified folder. Known labels:")
print_folders(conn)
return
typ, data = conn.search(None, 'ALL')
if typ != 'OK':
print("Could not get mail list of folder: ", folder_name)
return
return data[0].split()
def fetch_message(conn, msg_uid):
"""
Fetch a specific message uid (not sequential id!) from the given folder;
return the parsed message. User must ensure that specified
message ID exists in that folder.
"""
# TODO: Could we fetch just the envelope of the response to save bandwidth?
typ, data = conn.fetch(msg_uid, '(RFC822)')
if typ != 'OK':
print("ERROR fetching message #", msg_uid)
return
return email.parser.BytesParser().parsebytes(data[0][1], headersonly=True)
def get_recipients(msg):
"""Given a parsed message, extract and return recipient list"""
recipients = []
addr_fields = ['From', 'To', 'Cc', 'Bcc', 'Reply-To', 'Sender']
for f in addr_fields:
if msg[f] is None:
continue
# str conversion is needed for non-ascii chars
rlist = ADDR_PATTERN.findall(str(msg[f]))
recipients.extend(rlist)
return recipients
if __name__ == "__main__":
# Retrieve username and password from environment variables
username = os.getenv("EMAIL_USERNAME")
password = os.getenv("EMAIL_PASSWORD")
if username is None or password is None:
print("Error: Email username or password not found in environment variables.")
sys.exit(1)
# Connect
mail_conn = connect(username, password)
# Open the output file in write mode
with open(OUTPUT_FILE, 'w') as file:
# Open folders and get list of email message uids
all_recipients = []
for folder in SEARCH_FOLDER:
# Switch to folder
for mail_id in get_mails_from_folder(mail_conn, folder):
data = fetch_message(mail_conn, mail_id)
recip_list = get_recipients(data)
all_recipients.extend(recip_list)
# Get unique recipients
unique_recipients = set(all_recipients)
# Write each recipient on a new line in the output file
for recipient in unique_recipients:
file.write(recipient + '\n')
# Delete the analyzed emails
for folder in SEARCH_FOLDER:
for mail_id in get_mails_from_folder(mail_conn, folder):
mail_conn.store(mail_id, '+FLAGS', '\\Deleted') # Mark the email as deleted
mail_conn.expunge() # Permanently remove the deleted emails
# Close the connection
mail_conn.close()
mail_conn.logout()
print("Recipient list generated successfully and analyzed emails erased.")