import os import openai from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request #from google.auth.transport.urllib3 import Request from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build #import gspread from datetime import datetime, timedelta import base64 import json #import datetime from dateutil.parser import parse as parse_date import traceback # Set up the OpenAI API key openai_api_key = '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' from openai import OpenAI client = OpenAI( api_key=openai_api_key, ) # Set up the scopes for Gmail and Sheets APIs SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.send', 'https://www.googleapis.com/auth/spreadsheets'] # Authenticate and initialize Gmail and Sheets APIs def authenticate_gmail_sheets(): creds = None if os.path.exists('token.json'): creds = Credentials.from_authorized_user_file('token.json', SCOPES) if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) creds = flow.run_local_server(port=0) with open('token.json', 'w') as token: token.write(creds.to_json()) gmail_service = build('gmail', 'v1', credentials=creds) sheets_service = build('sheets', 'v4', credentials=creds) return gmail_service, sheets_service # Get the emails sent by me where the response from the recipient was more than 5 days ago or no response def get_old_emails(gmail_service): # Calculate the date 7 days ago date_7_days_ago = (datetime.utcnow() - datetime.timedelta(days=7)).strftime('%Y/%m/%d') # Define the query to get emails from the last 7 days query = f'after:{date_7_days_ago}' time_gap = 7 # #query = f'from:{my_email}' # query = f'from:me after:{time_gap}d' # days = 7 # today = datetime.utcnow() # start_date = today - timedelta(days=days+2) # Considering 2 weekend days # query = f'from:me after:{start_date.strftime("%Y/%m/%d")}' result = gmail_service.users().messages().list(userId='me', q=query).execute() messages = result.get('messages', []) old_emails = [] for msg in messages: message = gmail_service.users().messages().get(userId='me', id=msg['id']).execute() headers = message['payload']['headers'] thread_id = message['threadId'] date_str = next(header['value'] for header in headers if header['name'] == 'Date') #date = datetime.strptime(date_str, '%a, %d %b %Y %H:%M:%S %z') try: date = parse_date(date_str) except ValueError: print(f"Error parsing date: {date_str}") continue #if (datetime.now(date.tzinfo) - date).days > time_gap: thread = gmail_service.users().threads().get(userId='me', id=thread_id).execute() messages_in_thread = thread['messages'] #if len(messages_in_thread) == 1 or all(msg['labelIds'] == ['SENT'] for msg in messages_in_thread[1:]): old_emails.append(thread_id) return old_emails # Get the last 3 emails from a thread # Get the last 3 emails from a thread # Get the last 3 emails from a thread def get_thread_messages(service, user_id, thread_id): thread = service.users().threads().get(userId=user_id, id=thread_id).execute() messages = thread['messages'] # Get the last sender's email ID last_message = messages[-1] last_sender = None for header in last_message['payload']['headers']: if header['name'] == 'From': last_sender = header['value'] break print(f"Last sender's email ID: {last_sender}") bodies = [] # Get the bodies of the most recent three emails recent_messages = messages[-3:] if len(messages) >= 3 else messages for i, message in enumerate(recent_messages): parts = message['payload'].get('parts') body = None if parts: for part in parts: if part['mimeType'] == 'text/plain': body = part['body']['data'] break if body: body = base64.urlsafe_b64decode(body).decode('utf-8') bodies.append(body) #print(f"Email {len(recent_messages) - i} body: {body}") else: print(f"Email {len(recent_messages) - i} has no text/plain body") return last_sender, bodies def get_last_three_emails(gmail_service, thread_id): thread = gmail_service.users().threads().get(userId='me', id=thread_id).execute() messages = thread['messages'][-3:] emails = [] for message in messages: payload = message['payload'] headers = payload['headers'] date_str = next(header['value'] for header in headers if header['name'] == 'Date') # Initialize body content as empty string body = "" # Check if the message has parts if 'parts' in payload: for part in payload['parts']: # Check if the part has a body with data if 'body' in part and 'data' in part['body']: # Decode and append the part's body data body += base64.urlsafe_b64decode(part['body']['data']).decode('utf-8', errors='ignore') + "\n" # If no parts or body data found, try decoding the entire payload body elif 'body' in payload: body = base64.urlsafe_b64decode(payload['body']['data']).decode('utf-8', errors='ignore') + "\n" # Find sender's email address from headers sender = next(header['value'] for header in headers if header['name'] == 'From') # Append each email's details to the list emails.append({'date': date_str, 'from': sender, 'subject': payload['headers'][0]['value'], 'body': body}) return emails # Generate follow-up email using GPT-4 def generate_followup_email(last_three_emails, sender_details): prompt = f"Based on the following email thread. The sender is {sender_details}. The follow-up email should only have body (no subject). Start the email with Recipient Name from the prompt below. Generate a professional and polite follow-up email:\n\n" #for email in last_three_emails: prompt += str(last_three_emails) response = client.chat.completions.create( messages=[ { "role": "system", "content": "" }, { "role": "user", "content": prompt # Assuming query_text is defined earlier } ], model="gpt-4o-mini-2024-07-18", max_tokens=500 ) #response = openai.Completion.create(engine="gpt-4", prompt=prompt, max_tokens=500) return response.choices[0].message.content # Send follow-up email using Gmail API def send_followup_email(gmail_service, thread_id, to_email, followup_email): message = gmail_service.users().messages().send(userId='me', body={ 'raw': base64.urlsafe_b64encode(f'MIME-Version: 1.0\nContent-Type: text/plain; charset="UTF-8"\n\n{followup_email}'.encode('utf-8')).decode('utf-8'), 'threadId': thread_id }).execute() return message # Detect the context type of the follow-up email def detect_context_type(last_three_emails): context_types = ["Subscription", "Support", "Inquiry", "Feedback", "Updates", "Billing", "Promotion", "Newsletter", "Webinar", "Partnership", "Demo", "Training", "Renewal", "Survey", "Announcement", "Invitation", "Reminder", "Confirmation", "Cancellation", "Report", "Alert", "Onboarding", "Discount", "Referral", "Technical Issue"] prompt = "" #for email in last_three_emails: prompt += str(last_three_emails) response = client.chat.completions.create( messages=[ { "role": "system", "content": "" }, { "role": "user", "content": f"Classify the following email thread into one of the following categories: {', '.join(context_types)}:\n\n{prompt}" # Assuming query_text is defined earlier } ], model="gpt-4o-mini-2024-07-18", max_tokens=10, temperature = 0 ) response_text = response.choices[0].message.content #prompt = f"Classify the following email into one of the following categories: {', '.join(context_types)}:\n\n{followup_email}\n\nCategory:" #response = openai.Completion.create(engine="gpt-4", prompt=prompt, max_tokens=10) return response_text # Update Google Spreadsheet def update_google_sheet(sheets_service, spreadsheet_id, sheet_name, data): sheet = sheets_service.spreadsheets() sheet.values().append(spreadsheetId=spreadsheet_id, range=(sheet_name + '!A1'), valueInputOption='RAW', insertDataOption='INSERT_ROWS', body={'values': data}).execute() def should_send_followup(last_three_emails): # Create the prompt for GPT-4 prompt = "Based on the following email thread, determine if a follow-up email is required. A follow-up email is necessary if: 1. the recipient's response was more than 5 days ago, 2. no response was received. No follow-up should be sent if a. the email is between people from farmonaut.com, b. a email conversation which looks to be complete. Respond with 'Yes' or 'No'.\n\n" #for email in last_three_emails: #prompt += f"From: {email['from']}\nDate: {email['date']}\nSubject: {email['subject']}\n\n{email['body']}\n\n" # print(len(last_three_emails)) # print(last_three_emails[0]) # print(last_three_emails[(len(last_three_emails)-1)]["sender"]) prompt += "Thread Conversation: "+ str(last_three_emails) + "\n\n, From: " +str(last_three_emails[(len(last_three_emails)-1)]["sender"]) prompt += "Is a follow-up email required? Answer yes or no" response = client.chat.completions.create( messages=[ { "role": "system", "content": "" }, { "role": "user", "content": prompt # Assuming query_text is defined earlier } ], model="gpt-4o-mini-2024-07-18", max_tokens=10, temperature=0 ) followup_needed = response.choices[0].message.content.lower() print(followup_needed) # Extract the response text and strip any leading/trailing whitespace #followup_needed = response.choices[0].text.strip() # Return True if follow-up is needed, False otherwise return followup_needed.lower() == 'yes' def get_thread(service, user_id, thread_id): thread = service.users().threads().get(userId=user_id, id=thread_id).execute() messages = thread['messages'] thread_data = [] # Process the last 3 emails in the thread for msg in messages[-3:]: msg_data = { 'id': msg['id'], 'date': '', 'sender': '', 'body': '' } # Get email details headers = msg['payload']['headers'] for header in headers: if header['name'] == 'Date': msg_data['date'] = header['value'] if header['name'] == 'From': msg_data['sender'] = header['value'] # Get email body if 'parts' in msg['payload']: parts = msg['payload']['parts'] for part in parts: if part['mimeType'] == 'text/plain': body = base64.urlsafe_b64decode(part['body']['data']).decode('utf-8') msg_data['body'] = body else: body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') msg_data['body'] = body thread_data.append(msg_data) return thread_data import time # Main function to automate follow-up emails def automate_followup_emails(): spreadsheet_id = '1AziMFQdivL-na1G-DtUbARofrdemBgf-rjlPXRhrg1g' gmail_service, sheets_service = authenticate_gmail_sheets() old_emails = get_old_emails(gmail_service) sender_details = { "name": "Ankur Omar", "Position": "Director and CEO", "Email": "ankuromar@farmonaut.com", "Company": "Farmonaut Technologies Pvt Ltd" } for thread_id in old_emails: try: thread_data = get_thread(gmail_service, 'me', thread_id) #last_three_emails = get_last_three_emails(gmail_service, thread_id) #recipient_email, bodies = get_thread_messages(gmail_service, 'me', thread_id) #recipient_email = last_three_emails[-1]['from'] #print(thread_data) if should_send_followup(thread_data): context_type = detect_context_type(thread_data) followup_email = generate_followup_email(thread_data, sender_details) #print(recipient_email, my_email, context_type,followup_email) #send_followup_email(gmail_service, thread_id, recipient_email, followup_email) data = [ [thread_data[(len(thread_data)-1)]["sender"], context_type, thread_data[(len(thread_data)-1)]["date"], str(datetime.now()), thread_data[(len(thread_data)-1)]["body"], followup_email] ] #print(data) update_google_sheet(sheets_service, spreadsheet_id, 'Sheet1', data) else: data = [ [thread_data[(len(thread_data)-1)]["sender"], 'not needed', thread_data[(len(thread_data)-1)]["date"], str(datetime.now()), thread_data[(len(thread_data)-1)]["body"], 'No follow-up email needed'] ] update_google_sheet(sheets_service, spreadsheet_id, 'Sheet2', data) except: print(traceback.format_exc()) automate_followup_emails()