"""Create a connection to IMAPS mail account extract all unique adresses and output a single file with the results to a S3 bucket. References: http://www.voidynullness.net/blog/2013/07/25/gmail-email-with-python-via-imap/ and https://yuji.wordpress.com/2011/06/22/python-imaplib-imap-example-with-gmail/ Initially grabbed from : https://gist.github.com/abought/15a1e08705b121c1b7bd Version: 1.0 on 22/06/2023 """ __author__ = 'mj' import email.parser import imaplib import getpass import sys import re import ssl import os import smtplib from dotenv import load_dotenv from email.message import EmailMessage from pprint import pprint as pp import boto3 from datetime import date # Load environment variables from .env file load_dotenv() # User may want to change these parameters if running script as-is # Search folders, multiple directories can be given # TODO: A user will want to change this SEARCH_FOLDER = ['"Trash"', '"INBOX"'] DEFAULT_MAIL_SERVER = os.getenv("EMAIL_SERVER") # Output file name with date timestamp today = date.today().strftime("%Y-%m-%d") OUTPUT_FILE = f"recipient_list_{today}.txt" # Email settings SENDER_SMTP = os.getenv("SENDER_SMTP") SENDER_EMAIL = os.getenv("SENDER_EMAIL") SENDER_PASSWORD = os.getenv("SENDER_PASSWORD") RECIPIENT_EMAIL = os.getenv("RECIPIENT_EMAIL") # S3 bucket settings AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") S3_BUCKET_NAME = os.getenv("S3_BUCKET_NAME") S3_BUCKET_REGION = os.getenv("S3_BUCKET_REGION") S3_OUTPUT_FILE_KEY = f"subdirectory/recipient_list_{today}.txt" # Modify the subdirectory path here # No user parameters below this line ADDR_PATTERN = re.compile("<(.+)>") # Finds email as def connect(user, pwd, server=DEFAULT_MAIL_SERVER): """Connect to [the specified] mail server. Return an open connection""" conn = imaplib.IMAP4_SSL(host=server, ssl_context=ssl.create_default_context()) try: conn.login(user, pwd) except imaplib.IMAP4.error: print("Failed to login") sys.exit(1) return conn def print_folders(conn): """Print a list of open mailbox folders""" for f in conn.list()[1]: folder_name = f.decode().split(' "/" ')[1] print("\t", folder_name) def get_mails_from_folder(conn, folder_name): """Fetch a specific folder (or label) from server""" typ, data = conn.select(mailbox=folder_name, readonly=False) # Set readonly=False to mark messages as seen if typ != 'OK': print("Could not open specified folder. Known labels:") print_folders(conn) return typ, data = conn.search(None, 'ALL') if typ != 'OK': print("Could not get mail list of folder: ", folder_name) return return data[0].split() def fetch_message(conn, msg_uid): """ Fetch a specific message uid (not sequential id!) from the given folder; return the parsed message. User must ensure that specified message ID exists in that folder. """ # TODO: Could we fetch just the envelope of the response to save bandwidth? typ, data = conn.fetch(msg_uid, '(RFC822)') if typ != 'OK': print("ERROR fetching message #", msg_uid) return return email.parser.BytesParser().parsebytes(data[0][1], headersonly=True) def get_recipients(msg): """Given a parsed message, extract and return recipient list""" recipients = [] addr_fields = ['From', 'To', 'Cc', 'Bcc', 'Reply-To', 'Sender'] for f in addr_fields: if msg[f] is None: continue # str conversion is needed for non-ascii chars rlist = ADDR_PATTERN.findall(str(msg[f])) recipients.extend(rlist) return recipients def publish_to_s3(bucket_name, file_name, data): s3 = boto3.client( 's3', aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, region_name=S3_BUCKET_REGION ) s3.put_object(Body=data, Bucket=bucket_name, Key=file_name) if __name__ == "__main__": # Retrieve username and password from environment variables username = os.getenv("EMAIL_USERNAME") password = os.getenv("EMAIL_PASSWORD") if username is None or password is None: print("Error: Email username or password not found in environment variables.") sys.exit(1) # Connect mail_conn = connect(username, password) # Open the output file in write mode with open(OUTPUT_FILE, 'w') as file: # Open folders and get list of email message uids all_recipients = [] for folder in SEARCH_FOLDER: # Switch to folder for mail_id in get_mails_from_folder(mail_conn, folder): data = fetch_message(mail_conn, mail_id) recip_list = get_recipients(data) all_recipients.extend(recip_list) # Get unique recipients unique_recipients = set(all_recipients) unique_recipient_count = len(unique_recipients) # Write each recipient on a new line in the output file for recipient in unique_recipients: file.write(recipient + '\n') # Publish the output file to S3 with open(OUTPUT_FILE, 'rb') as file: output_data = file.read() publish_to_s3(S3_BUCKET_NAME, S3_OUTPUT_FILE_KEY, output_data) # Delete the analyzed emails for folder in SEARCH_FOLDER: for mail_id in get_mails_from_folder(mail_conn, folder): mail_conn.store(mail_id, '+FLAGS', '\\Deleted') # Mark the email as deleted mail_conn.expunge() # Permanently remove the deleted emails # Close the connection mail_conn.close() mail_conn.logout() # Send email with the count of unique addresses msg = EmailMessage() msg['Subject'] = 'Mail extractor unique recipient count' msg['From'] = SENDER_EMAIL msg['To'] = RECIPIENT_EMAIL msg.set_content(f"The count of unique recipients is: {unique_recipient_count}") with smtplib.SMTP(SENDER_SMTP, 587) as server: server.starttls() server.login(SENDER_EMAIL, SENDER_PASSWORD) server.send_message(msg) print("Recipient list generated successfully, analyzed emails erased, and email sent.")