import os import logging from datetime import datetime from dataclasses import dataclass from fastapi import FastAPI, Form, Response, WebSocket, WebSocketDisconnect from twilio.rest import Client from twilio.twiml.voice_response import VoiceResponse, Connect, Stream from openai import OpenAI import sqlite3 from dotenv import load_dotenv import requests from io import BytesIO import uuid import json from google.cloud import storage import asyncio import base64 import webrtcvad import pydub import numpy as np import urllib.parse import audioop # For μ-law conversion load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CallConfig: phone_number: str campaign_name: str script_template: str max_duration: int = 300 # 5 minutes class DatabaseManager: def __init__(self, db_path: str = "calls.db"): self.db_path = db_path self.init_database() def init_database(self): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone_number TEXT NOT NULL, campaign_name TEXT, call_sid TEXT UNIQUE, status TEXT, start_time TIMESTAMP, end_time TIMESTAMP, conversation_log TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS call_transcripts ( id INTEGER PRIMARY KEY AUTOINCREMENT, call_sid TEXT, speaker TEXT, message TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (call_sid) REFERENCES calls (call_sid) ) ''') conn.commit() conn.close() def create_call_record(self, phone_number: str, campaign_name: str, call_sid: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO calls (phone_number, campaign_name, call_sid, status, start_time) VALUES (?, ?, ?, ?, ?) ''', (phone_number, campaign_name, call_sid, 'initiated', datetime.now())) conn.commit() conn.close() logger.info(f"Call record created: {call_sid}") def update_call_status(self, call_sid: str, status: str, end_time=None): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if end_time: cursor.execute(''' UPDATE calls SET status=?, end_time=? WHERE call_sid=? ''', (status, end_time, call_sid)) else: cursor.execute(''' UPDATE calls SET status=? WHERE call_sid=? ''', (status, call_sid)) conn.commit() conn.close() def add_transcript(self, call_sid: str, speaker: str, message: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO call_transcripts (call_sid, speaker, message) VALUES (?, ?, ?) ''', (call_sid, speaker, message)) conn.commit() conn.close() class OpenAIHandler: def __init__(self, openai_api_key: str, elevenlabs_api_key: str, kb_file_path: str = "kb_articles.json"): self.client = OpenAI(api_key=openai_api_key) self.elevenlabs_api_key = elevenlabs_api_key self.conversation_history = {} self.silent_attempts = {} self.gcs_client = storage.Client() self.knowledge_base = self.load_knowledge_base(kb_file_path) def load_knowledge_base(self, kb_file_path: str) -> list: try: with open(kb_file_path, 'r') as file: kb = json.load(file) logger.info(f"Loaded knowledge base from {kb_file_path}") return kb except Exception as e: logger.error(f"Error loading knowledge base: {e}") return [] def search_knowledge_base(self, user_input: str) -> str: user_input_lower = user_input.lower() relevant_content = [] for article in self.knowledge_base: if any(keyword.lower() in user_input_lower for keyword in article['keywords']): relevant_content.append(article['content']) return "\n".join(relevant_content) if relevant_content else "" def initialize_conversation(self, call_sid: str, script_template: str): kb_context = "\n".join([article['content'] for article in self.knowledge_base]) system_message = f"{script_template}\n\nKnowledge Base Context:\n{kb_context}" self.conversation_history[call_sid] = [{"role": "system", "content": system_message}] self.silent_attempts[call_sid] = 0 logger.info(f"Initialized conversation history for {call_sid}") def detect_conversation_end(self, call_sid: str, user_input: str) -> bool: try: prompt = ( f"Based on the conversation history and the latest user input: '{user_input}', " f"determine if the conversation should end. Return only 'true' or 'false'. " f"End the conversation if the user indicates they are done, " f"or if the conversation has naturally concluded." ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=10, temperature=0.5 ) decision = response.choices[0].message.content.strip().lower() logger.info(f"Conversation end detection for {call_sid}: {decision}") return decision == 'true' except Exception as e: logger.error(f"Error in detect_conversation_end: {e}") return False def analyze_tone(self, call_sid: str, user_input: str) -> dict: default_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} try: prompt = ( ''' Analyze the tone and emotion of the user's latest input, using the full context of the ongoing conversation. Return only a JSON object with the following fields: * "rate": one of "slow", "medium", or "fast" * "pitch": one of "low", "medium", or "high" * "stability": a float between 0.0 and 1.0 * "similarity_boost": a float between 0.0 and 1.0 ''' ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() tone_params = json.loads(raw_response) if not isinstance(tone_params, dict): raise ValueError("Tone params is not a dictionary") if tone_params.get("rate") not in ["slow", "medium", "fast"]: tone_params["rate"] = default_params["rate"] if tone_params.get("pitch") not in ["low", "medium", "high"]: tone_params["pitch"] = default_params["pitch"] if not (0.0 <= tone_params.get("stability", 0.5) <= 1.0): tone_params["stability"] = default_params["stability"] if not (0.0 <= tone_params.get("similarity_boost", 0.75) <= 1.0): tone_params["similarity_boost"] = default_params["similarity_boost"] logger.info(f"Tone analysis for {call_sid}: {tone_params}") return tone_params except Exception as e: logger.error(f"Error in analyze_tone for {call_sid}: {e}") return default_params async def generate_elevenlabs_audio_stream(self, call_sid: str, text: str, tone_params: dict) -> tuple: try: url = "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM/stream" headers = {"xi-api-key": self.elevenlabs_api_key} data = { "text": text, "model_id": "eleven_flash_v2_5", "voice_settings": { "stability": tone_params.get("stability", 0.5), "similarity_boost": tone_params.get("similarity_boost", 0.75), "speaker_boost": True, "style": 1 } } response = requests.post(url, json=data, headers=headers, stream=True) if response.status_code == 200: audio = pydub.AudioSegment.from_file(BytesIO(response.content), format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 # 1 second at 8kHz for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Generated ElevenLabs audio stream for {call_sid}") else: logger.error(f"ElevenLabs API error for {call_sid}: {response.status_code} - {response.text}") yield None except Exception as e: logger.error(f"Error in generate_elevenlabs_audio_stream for {call_sid}: {e}") yield None async def stream_local_audio(self, call_sid: str, audio_path: str = "initial_message.mp3") -> tuple: try: audio = pydub.AudioSegment.from_file(audio_path, format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 # 1 second at 8kHz for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Streamed local audio for {call_sid} from {audio_path}") except Exception as e: logger.error(f"Error streaming local audio for {call_sid}: {e}") yield None def get_silent_response(self, call_sid: str, attempt: int) -> tuple: if attempt == 1: message = "Are you still there? I'd love to help with any questions about our satellite technology." tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} else: message = "It seems you're not there. Thanks for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} logger.info(f"Final silent response triggered for {call_sid}") self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return tone_params, message, True self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return tone_params, message, False async def get_ai_response(self, call_sid: str, user_input: str) -> tuple: try: if call_sid not in self.conversation_history: self.initialize_conversation(call_sid, "Default conversation initialization") if not user_input: self.conversation_history[call_sid].append({"role": "user", "content": "Start the conversation."}) else: if self.detect_conversation_end(call_sid, user_input): farewell = "Thank you for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": farewell}) return tone_params, farewell, True kb_response = self.search_knowledge_base(user_input) if kb_response: user_input = f"{user_input}\n\nRelevant Knowledge Base Information:\n{kb_response}" self.conversation_history[call_sid].append({"role": "user", "content": user_input}) prompt = ( ''' You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company. Your tone should be friendly, empathetic, and concise. ## Guidelines: - Start warmly: Greet and explain why you're calling. - Acknowledge hesitation: Ease pressure if they seem surprised. - Set boundaries: Clarify what you can help with. - Offer value: Ask for relevant details if they're open. - Confirm follow-up info: Verify name and email if provided. - Close professionally: End politely if uninterested. ## Response Rules: - Keep replies under 50 words in natural, conversational language - Ask clarifying questions if unclear - If uninterested, end politely - If engaged, ask a relevant follow-up or confirm info ''' ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "system", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=150, temperature=0.7 ) raw_response = response.choices[0].message.content.strip() raw_response = raw_response.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') self.conversation_history[call_sid].append({"role": "assistant", "content": raw_response}) logger.info(f"Appended AI response to history for {call_sid}: {raw_response}") is_farewell = "thank you for your time" in raw_response.lower() or "goodbye" in raw_response.lower() tone_params = self.analyze_tone(call_sid, user_input or "Start the conversation.") return tone_params, raw_response, is_farewell except Exception as e: logger.error(f"Error in get_ai_response: {e}") error_message = "I'm sorry, I'm having trouble responding. We will call you back soon!" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": error_message}) return tone_params, error_message, True def upload_to_gcs(self, audio_stream, call_sid: str) -> str: bucket_name = 'farmbase-b2f7e.appspot.com' key = f"audio/{call_sid}_{uuid.uuid4()}.mp3" bucket = self.gcs_client.bucket(bucket_name) blob = bucket.blob(key) blob.upload_from_file(audio_stream, content_type='audio/mpeg') blob.make_public() return blob.public_url def cleanup_conversation(self, call_sid: str): if call_sid in self.conversation_history: logger.info(f"Cleaning up conversation history for {call_sid}") del self.conversation_history[call_sid] if call_sid in self.silent_attempts: logger.info(f"Cleaning up silent attempts for {call_sid}") del self.silent_attempts[call_sid] class TwilioCallManager: def __init__(self, account_sid: str, auth_token: str, from_number: str): self.client = Client(account_sid, auth_token) self.from_number = from_number self.db = DatabaseManager() self.openai_handler = None def set_openai_handler(self, openai_handler: OpenAIHandler): self.openai_handler = openai_handler def make_outbound_call(self, config: CallConfig, webhook_url: str) -> str: try: call = self.client.calls.create( to=config.phone_number, from_=self.from_number, url=f"{webhook_url}/twiml", method='POST', timeout=30, status_callback=f"{webhook_url}/call_status", status_callback_event=['initiated', 'ringing', 'answered', 'completed'] ) self.db.create_call_record(config.phone_number, config.campaign_name, call.sid) if self.openai_handler: self.openai_handler.initialize_conversation(call.sid, config.script_template) logger.info(f"Call initiated: {call.sid} to {config.phone_number}") return call.sid except Exception as e: logger.error(f"Error making call: {e}") return None app = FastAPI() call_manager = None openai_handler = None active_connections = {} @app.post("/twiml") async def twiml(): response = VoiceResponse() connect = Connect() stream = Stream(url=f"wss://{os.getenv('PUBLIC_URL').replace('https://', '')}/stream") connect.append(stream) response.append(connect) logger.info(f"TwiML generated: {str(response)}") return Response(content=str(response), media_type="text/xml") @app.post("/call_status") async def call_status(CallSid: str = Form(...), CallStatus: str = Form(...)): logger.info(f"Call {CallSid} status: {CallStatus}") if call_manager: if CallStatus == 'completed': call_manager.db.update_call_status(CallSid, CallStatus, datetime.now()) if openai_handler: openai_handler.cleanup_conversation(CallSid) # Clean up active connection tracking if CallSid in active_connections: del active_connections[CallSid] else: call_manager.db.update_call_status(CallSid, CallStatus) return Response(status_code=200) async def safe_websocket_send(websocket: WebSocket, data: dict, call_sid: str = None): """Safely send data through websocket with proper error handling""" try: if call_sid and call_sid not in active_connections: logger.warning(f"Attempted to send data to inactive connection: {call_sid}") return False await websocket.send_json(data) return True except Exception as e: logger.error(f"Error sending WebSocket data for {call_sid}: {e}") if call_sid and call_sid in active_connections: del active_connections[call_sid] return False @app.websocket("/stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() call_sid = None stream_sid = None try: vad = webrtcvad.Vad(2) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_DURATION_MS = 20 SAMPLE_RATE = 8000 CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000) SILENCE_THRESHOLD = 30 # Increased threshold first_message_sent = False while True: try: message = await websocket.receive_text() data = json.loads(message) event = data.get("event") if event == "connected": logger.info("WebSocket connected") continue if event == "start": stream_sid = data["streamSid"] call_sid = data["start"]["callSid"] active_connections[call_sid] = True logger.info(f"Media stream started: call_sid={call_sid}, stream_sid={stream_sid}") if not first_message_sent and call_sid: # Add delay to ensure connection is stable await asyncio.sleep(1) hardcoded_message = "Hi, this is Joanne from Farmonaut. I'm reaching out because we help teams like yours with mineral detection using satellite data. This is out of the blue, but may I have a moment to explain?" call_manager.db.add_transcript(call_sid, "Assistant", hardcoded_message) if call_sid in openai_handler.conversation_history: openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": hardcoded_message}) # Stream initial audio with better error handling try: async for chunk in openai_handler.stream_local_audio(call_sid, "initial_message.mp3"): if chunk and call_sid in active_connections: success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success: break await asyncio.sleep(0.02) # Small delay between chunks except Exception as e: logger.error(f"Error streaming initial audio for {call_sid}: {e}") first_message_sent = True continue if event == "media" and call_sid in active_connections: try: audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: # Expected size for 20ms at 8kHz μ-law continue is_speech = vad.is_speech(audio_data, SAMPLE_RATE) if is_speech: if not is_speaking: await safe_websocket_send(websocket, { "event": "clear", "streamSid": stream_sid }, call_sid) logger.info(f"User speech detected, clearing audio for {call_sid}") is_speaking = True audio_buffer.clear() audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: await process_user_audio(websocket, stream_sid, call_sid, audio_buffer) audio_buffer.clear() silent_chunks = 0 else: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD * 3: # Much longer wait for silent response await handle_silence(websocket, stream_sid, call_sid) silent_chunks = 0 except Exception as e: logger.error(f"Error processing media for {call_sid}: {e}") continue if event == "stop": logger.info(f"Media stream stopped for {call_sid}") break except WebSocketDisconnect: logger.info(f"WebSocket disconnected for {call_sid}") break except Exception as e: logger.error(f"Error in WebSocket message processing for {call_sid}: {e}") continue except Exception as e: logger.error(f"Error in WebSocket endpoint for {call_sid}: {e}") finally: # Cleanup if call_sid: if call_manager and openai_handler: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] try: await websocket.close() except: pass async def process_user_audio(websocket: WebSocket, stream_sid: str, call_sid: str, audio_buffer: bytearray): """Process user audio input and generate response""" try: # Convert μ-law to 16-bit PCM pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav") wav_buffer.seek(0) # Transcribe audio transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) if transcript.strip(): call_manager.db.add_transcript(call_sid, "User", transcript) logger.info(f"Transcribed user input for {call_sid}: {transcript}") # Generate AI response tone_params, text_response, is_farewell = await openai_handler.get_ai_response(call_sid, transcript) call_manager.db.add_transcript(call_sid, "Assistant", text_response) # Stream AI response async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and call_sid in active_connections: success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success or is_farewell: break if is_farewell: await safe_websocket_send(websocket, { "event": "clear", "streamSid": stream_sid }, call_sid) if call_sid in active_connections: del active_connections[call_sid] except Exception as e: logger.error(f"Error processing user audio for {call_sid}: {e}") error_message = "Sorry, I couldn't understand you. Let's try again." tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", error_message) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, error_message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) async def handle_silence(websocket: WebSocket, stream_sid: str, call_sid: str): """Handle periods of silence from the user""" try: if call_sid not in openai_handler.silent_attempts: openai_handler.silent_attempts[call_sid] = 0 openai_handler.silent_attempts[call_sid] += 1 tone_params, text_response, is_farewell = openai_handler.get_silent_response( call_sid, openai_handler.silent_attempts[call_sid] ) call_manager.db.add_transcript(call_sid, "Assistant", text_response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and call_sid in active_connections: success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success or is_farewell: break if is_farewell: await safe_websocket_send(websocket, { "event": "clear", "streamSid": stream_sid }, call_sid) if call_sid in active_connections: del active_connections[call_sid] except Exception as e: logger.error(f"Error handling silence for {call_sid}: {e}") def initialize_system(): global call_manager, openai_handler twilio_sid = os.getenv('TWILIO_ACCOUNT_SID') twilio_token = os.getenv('TWILIO_AUTH_TOKEN') twilio_number = os.getenv('TWILIO_PHONE_NUMBER') openai_key = os.getenv('OPENAI_API_KEY') elevenlabs_api_key = os.getenv('ELEVENLABS_API_KEY') if not all([twilio_sid, twilio_token, twilio_number, openai_key, elevenlabs_api_key]): raise ValueError("Missing required environment variables") call_manager = TwilioCallManager(twilio_sid, twilio_token, twilio_number) openai_handler = OpenAIHandler(openai_key, elevenlabs_api_key, kb_file_path="kb_articles.json") call_manager.set_openai_handler(openai_handler) logger.info("System initialized successfully") def example_usage(): initialize_system() script_template = """ You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company and there is no prior conversation. Your role is to deliver a friendly, concise introduction that: * Builds quick rapport * Clearly states who you are and why you're calling * Keeps things low-pressure and empathetic ## Your message should: 1. Greet the recipient warmly 2. State your name and the reason for calling 3. Acknowledge that this is unexpected 4. Invite a quick moment of their time ## Examples: * "Hi, this is Joanne from Farmonaut—just reaching out because we help teams like yours with mineral detection using satellite data." * "Hey there, I know this is out of the blue—mind if I take 20 seconds to explain why I'm calling?" ## Output Rules: * Your message must be natural, human-like, and under 50 words * Do not include any follow-up questions or requests for info in this first message * This is just your intro line—keep it brief and warm * Do not reference any prior interaction or knowledge """ config = CallConfig( phone_number="+16696666882", campaign_name="Simple Promotion Call", script_template=script_template ) webhook_url = os.getenv('PUBLIC_URL', "https://d4c3b5c10eed.ngrok-free.app") call_sid = call_manager.make_outbound_call(config, webhook_url) if call_sid: print(f"Call initiated successfully: {call_sid}") else: print("Failed to initiate call") if __name__ == '__main__': example_usage() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080)