import os import logging from datetime import datetime from dataclasses import dataclass from fastapi import FastAPI, Form, Response, WebSocket, WebSocketDisconnect from twilio.rest import Client from twilio.twiml.voice_response import VoiceResponse, Connect, Stream from openai import OpenAI import sqlite3 from dotenv import load_dotenv import requests from io import BytesIO import uuid import json from google.cloud import storage import asyncio import base64 import webrtcvad import pydub import numpy as np import urllib.parse import audioop load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CallConfig: phone_number: str campaign_name: str script_template: str max_duration: int = 300 class DatabaseManager: def __init__(self, db_path: str = "calls.db"): self.db_path = db_path self.init_database() def init_database(self): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone_number TEXT NOT NULL, campaign_name TEXT, call_sid TEXT UNIQUE, status TEXT, start_time TIMESTAMP, end_time TIMESTAMP, conversation_log TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS call_transcripts ( id INTEGER PRIMARY KEY AUTOINCREMENT, call_sid TEXT, speaker TEXT, message TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (call_sid) REFERENCES calls (call_sid) ) ''') conn.commit() conn.close() def create_call_record(self, phone_number: str, campaign_name: str, call_sid: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO calls (phone_number, campaign_name, call_sid, status, start_time) VALUES (?, ?, ?, ?, ?) ''', (phone_number, campaign_name, call_sid, 'initiated', datetime.now())) conn.commit() conn.close() logger.info(f"Call record created: {call_sid}") def update_call_status(self, call_sid: str, status: str, end_time=None): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if end_time: cursor.execute(''' UPDATE calls SET status=?, end_time=? WHERE call_sid=? ''', (status, end_time, call_sid)) else: cursor.execute(''' UPDATE calls SET status=? WHERE call_sid=? ''', (status, call_sid)) conn.commit() conn.close() def add_transcript(self, call_sid: str, speaker: str, message: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO call_transcripts (call_sid, speaker, message) VALUES (?, ?, ?) ''', (call_sid, speaker, message)) conn.commit() conn.close() class OpenAIHandler: def __init__(self, openai_api_key: str, elevenlabs_api_key: str, kb_file_path: str = "kb_articles.json"): self.client = OpenAI(api_key=openai_api_key) self.elevenlabs_api_key = elevenlabs_api_key self.conversation_history = {} self.silent_attempts = {} self.gcs_client = storage.Client() self.knowledge_base = self.load_knowledge_base(kb_file_path) self.is_speaking = {} self.intro_playing = {} def load_knowledge_base(self, kb_file_path: str) -> list: try: with open(kb_file_path, 'r') as file: kb = json.load(file) logger.info(f"Loaded knowledge base from {kb_file_path}") return kb except Exception as e: logger.error(f"Error loading knowledge base: {e}") return [] def search_knowledge_base(self, user_input: str) -> str: user_input_lower = user_input.lower() relevant_content = [] for article in self.knowledge_base: if any(keyword.lower() in user_input_lower for keyword in article['keywords']): relevant_content.append(article['content']) return "\n".join(relevant_content) if relevant_content else "" def initialize_conversation(self, call_sid: str, script_template: str): kb_context = "\n".join([article['content'] for article in self.knowledge_base]) system_message = f"{script_template}\n\nKnowledge Base Context:\n{kb_context}" self.conversation_history[call_sid] = [{"role": "system", "content": system_message}] self.silent_attempts[call_sid] = 0 self.is_speaking[call_sid] = False self.intro_playing[call_sid] = False logger.info(f"Initialized conversation history for {call_sid}") def detect_conversation_end(self, call_sid: str, user_input: str) -> bool: try: prompt = ( f"Based on the conversation history and the latest user input: '{user_input}', " f"determine if the conversation should end. Return only 'true' or 'false'. " f"End the conversation if the user indicates they are done, " f"or if the conversation has naturally concluded." ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=10, temperature=0.5 ) decision = response.choices[0].message.content.strip().lower() logger.info(f"Conversation end detection for {call_sid}: {decision}") return decision == 'true' except Exception as e: logger.error(f"Error in detect_conversation_end: {e}") return False def get_intent(self, user_input: str) -> tuple: try: prompt = ( f"Analyze the following user input: '{user_input}'.\n" f"If the input is empty or unclear, return a JSON object with intent='neutral' and confidence=0.5.\n" f"Otherwise, return a JSON object with 'intent' (one of: 'not_interested', 'question', 'positive', 'interested', 'neutral', 'confused') " f"and 'confidence' (float between 0.0 and 1.0)." ) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() logger.debug(f"Raw intent response for input '{user_input}': {raw_response}") try: result = json.loads(raw_response) intent = result.get("intent", "neutral") confidence = float(result.get("confidence", 0.5)) if intent not in ['not_interested', 'question', 'positive', 'interested', 'neutral', 'confused']: logger.warning(f"Invalid intent received: {intent}, defaulting to 'neutral'") intent = "neutral" if not (0.0 <= confidence <= 1.0): logger.warning(f"Invalid confidence received: {confidence}, defaulting to 0.5") confidence = 0.5 logger.info(f"Intent analysis: intent={intent}, confidence={confidence}") return intent, confidence except json.JSONDecodeError as e: logger.error(f"JSON parsing error in get_intent: {e}, raw response: {raw_response}") return "neutral", 0.5 except Exception as e: logger.error(f"Error in get_intent: {e}, input: {user_input}") return "neutral", 0.5 def analyze_tone(self, call_sid: str, user_input: str) -> dict: default_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} try: prompt = ( ''' Analyze the tone and emotion of the user's latest input, using the full context of the ongoing conversation. Return only a JSON object with the following fields: * "rate": one of "slow", "medium", or "fast" * "pitch": one of "low", "medium", or "high" * "stability": a float between 0.0 and 1.0 * "similarity_boost": a float between 0.0 and 1.0 ''' ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() tone_params = json.loads(raw_response) if not isinstance(tone_params, dict): raise ValueError("Tone params is not a dictionary") if tone_params.get("rate") not in ["slow", "medium", "fast"]: tone_params["rate"] = default_params["rate"] if tone_params.get("pitch") not in ["low", "medium", "high"]: tone_params["pitch"] = default_params["pitch"] if not (0.0 <= tone_params.get("stability", 0.5) <= 1.0): tone_params["stability"] = default_params["stability"] if not (0.0 <= tone_params.get("similarity_boost", 0.75) <= 1.0): tone_params["similarity_boost"] = default_params["similarity_boost"] logger.info(f"Tone analysis for {call_sid}: {tone_params}") return tone_params except Exception as e: logger.error(f"Error in analyze_tone for {call_sid}: {e}") return default_params async def generate_elevenlabs_audio_stream(self, call_sid: str, text: str, tone_params: dict) -> tuple: self.is_speaking[call_sid] = True try: url = "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM/stream" headers = {"xi-api-key": self.elevenlabs_api_key} data = { "text": text, "model_id": "eleven_flash_v2_5", "voice_settings": { "stability": tone_params.get("stability", 0.5), "similarity_boost": tone_params.get("similarity_boost", 0.75), "speaker_boost": True, "style": 1 } } response = requests.post(url, json=data, headers=headers, stream=True) if response.status_code == 200: audio = pydub.AudioSegment.from_file(BytesIO(response.content), format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Generated ElevenLabs audio stream for {call_sid}") else: logger.error(f"ElevenLabs API error for {call_sid}: {response.status_code} - {response.text}") yield None except Exception as e: logger.error(f"Error in generate_elevenlabs_audio_stream for {call_sid}: {e}") yield None finally: self.is_speaking[call_sid] = False async def stream_local_audio(self, call_sid: str, audio_path: str = "initial_message.mp3") -> tuple: self.is_speaking[call_sid] = True self.intro_playing[call_sid] = True try: audio = pydub.AudioSegment.from_file(audio_path, format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Streamed local audio for {call_sid} from {audio_path}") except Exception as e: logger.error(f"Error streaming local audio for {call_sid}: {e}") yield None finally: self.is_speaking[call_sid] = False self.intro_playing[call_sid] = False def get_silent_response(self, call_sid: str, attempt: int) -> tuple: if attempt == 1: message = "Just checking if you can hear me?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} else: message = "It seems you're not there. Thanks for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} logger.info(f"Final silent response triggered for {call_sid}") self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return tone_params, message, True self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return tone_params, message, False async def get_ai_response(self, call_sid: str, user_input: str) -> tuple: try: if call_sid not in self.conversation_history: self.initialize_conversation(call_sid, "Default conversation initialization") if not user_input: self.conversation_history[call_sid].append({"role": "user", "content": "Start the conversation."}) else: if self.detect_conversation_end(call_sid, user_input): farewell = "Thank you for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": farewell}) return tone_params, farewell, True kb_response = self.search_knowledge_base(user_input) if kb_response: user_input = f"{user_input}\n\nRelevant Knowledge Base Information:\n{kb_response}" self.conversation_history[call_sid].append({"role": "user", "content": user_input}) prompt = ( ''' You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company. Your tone should be friendly, empathetic, and concise. ## Guidelines: - Start warmly: Greet and explain why you're calling. - Acknowledge hesitation: Ease pressure if they seem surprised. - Set boundaries: Clarify what you can help with. - Offer value: Ask for relevant details if they're open. - Confirm follow-up info: Verify name and email if provided. - Close professionally: End politely if uninterested. ## Response Rules: - Keep replies under 50 words in natural, conversational language - Ask clarifying questions if unclear - If uninterested, end politely - If engaged, ask a relevant follow-up or confirm info ''' ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "system", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=150, temperature=0.7 ) raw_response = response.choices[0].message.content.strip() raw_response = raw_response.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') self.conversation_history[call_sid].append({"role": "assistant", "content": raw_response}) logger.info(f"Appended AI response to history for {call_sid}: {raw_response}") is_farewell = "thank you for your time" in raw_response.lower() or "goodbye" in raw_response.lower() tone_params = self.analyze_tone(call_sid, user_input or "Start the conversation.") return tone_params, raw_response, is_farewell except Exception as e: logger.error(f"Error in get_ai_response: {e}") error_message = "I'm sorry, I'm having trouble responding. We will call you back soon!" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": error_message}) return tone_params, error_message, True def upload_to_gcs(self, audio_stream, call_sid: str) -> str: bucket_name = 'farmbase-b2f7e.appspot.com' key = f"audio/{call_sid}_{uuid.uuid4()}.mp3" bucket = self.gcs_client.bucket(bucket_name) blob = bucket.blob(key) blob.upload_from_file(audio_stream, content_type='audio/mpeg') blob.make_public() return blob.public_url def cleanup_conversation(self, call_sid: str): if call_sid in self.conversation_history: logger.info(f"Cleaning up conversation history for {call_sid}") del self.conversation_history[call_sid] if call_sid in self.silent_attempts: logger.info(f"Cleaning up silent attempts for {call_sid}") del self.silent_attempts[call_sid] if call_sid in self.is_speaking: del self.is_speaking[call_sid] if call_sid in self.intro_playing: del self.intro_playing[call_sid] class TwilioCallManager: def __init__(self, account_sid: str, auth_token: str, from_number: str): self.client = Client(account_sid, auth_token) self.from_number = from_number self.db = DatabaseManager() self.openai_handler = None def set_openai_handler(self, openai_handler: OpenAIHandler): self.openai_handler = openai_handler def make_outbound_call(self, config: CallConfig, webhook_url: str) -> str: try: call = self.client.calls.create( to=config.phone_number, from_=self.from_number, url=f"{webhook_url}/twiml", method='POST', timeout=30, status_callback=f"{webhook_url}/call_status", status_callback_event=['initiated', 'ringing', 'answered', 'completed'] ) self.db.create_call_record(config.phone_number, config.campaign_name, call.sid) if self.openai_handler: self.openai_handler.initialize_conversation(call.sid, config.script_template) logger.info(f"Call initiated: {call.sid} to {config.phone_number}") return call.sid except Exception as e: logger.error(f"Error making call: {e}") return None app = FastAPI() call_manager = None openai_handler = None active_connections = {} @app.post("/twiml") async def twiml(): response = VoiceResponse() connect = Connect() stream = Stream(url=f"wss://{os.getenv('PUBLIC_URL').replace('https://', '')}/stream") connect.append(stream) response.append(connect) logger.info(f"TwiML generated: {str(response)}") return Response(content=str(response), media_type="text/xml") @app.post("/call_status") async def call_status(CallSid: str = Form(...), CallStatus: str = Form(...)): logger.info(f"Call {CallSid} status: {CallStatus}") if call_manager: if CallStatus == 'completed': call_manager.db.update_call_status(CallSid, CallStatus, datetime.now()) if openai_handler: openai_handler.cleanup_conversation(CallSid) if CallSid in active_connections: del active_connections[CallSid] else: call_manager.db.update_call_status(CallSid, CallStatus) return Response(status_code=200) async def safe_websocket_send(websocket: WebSocket, data: dict, call_sid: str = None): try: if call_sid and call_sid not in active_connections: logger.warning(f"Attempted to send data to inactive connection: {call_sid}") return False await websocket.send_json(data) return True except Exception as e: logger.error(f"Error sending WebSocket data for {call_sid}: {e}") if call_sid and call_sid in active_connections: del active_connections[call_sid] return False async def detect_voicemail(audio_segment: pydub.AudioSegment) -> bool: try: if audio_segment.duration_seconds < 0.1: logger.warning(f"Audio segment too short for voicemail detection: {audio_segment.duration_seconds}s") return False # Check audio energy to filter out silence/noise rms = audio_segment.rms ENERGY_THRESHOLD = 50 # Adjust based on testing if rms < ENERGY_THRESHOLD: logger.info(f"Audio energy too low for voicemail detection: RMS={rms}") return False wav_buffer = BytesIO() audio_segment = audio_segment.set_frame_rate(16000).set_channels(1).set_sample_width(2) audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) logger.info(f"Voicemail detection transcript: {transcript}") voicemail_indicators = ["leave a message", "voicemail", "not available", "after the tone", "beep"] is_voicemail = any(indicator in transcript.lower() for indicator in voicemail_indicators) logger.info(f"Voicemail detection result: {is_voicemail}") return is_voicemail except Exception as e: logger.error(f"Error in voicemail detection: {e}") return False async def detect_background_noise(audio_segment: pydub.AudioSegment) -> bool: try: rms = audio_segment.rms NOISE_THRESHOLD = 100 return rms > NOISE_THRESHOLD except Exception as e: logger.error(f"Error in background noise detection: {e}") return False async def detect_echo(audio_segment: pydub.AudioSegment) -> bool: try: return False except Exception as e: logger.error(f"Error in echo detection: {e}") return False async def handle_user_response(websocket: WebSocket, stream_sid: str, call_sid: str, intent: str, confidence: float, failed_attempts: int = 0): MAX_FAILED_ATTEMPTS = 3 try: if failed_attempts >= MAX_FAILED_ATTEMPTS: message = "I'm sorry, I'm having trouble understanding. We'll follow up later. Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return if confidence < 0.5: message = "Sorry, I didn’t catch that—could you repeat it?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 25 # 0.5 seconds start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if audio_segment.duration_seconds >= 0.1: wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, intent, confidence, failed_attempts + (1 if confidence < 0.5 else 0)) return else: logger.warning(f"Audio too short after low confidence: {audio_segment.duration_seconds}s") await handle_user_response(websocket, stream_sid, call_sid, "neutral", 0.5, failed_attempts + 1) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD * 6: # 3 seconds call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return return if intent == "not_interested": message = "No problem—thank you for your time!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return elif intent == "question": tone_params, response, is_farewell = await openai_handler.get_ai_response(call_sid, "User asked a question.") call_manager.db.add_transcript(call_sid, "Assistant", response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, response, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if is_farewell: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 25 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if audio_segment.duration_seconds >= 0.1: wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, intent, confidence) return else: logger.warning(f"Audio too short after question: {audio_segment.duration_seconds}s") await handle_user_response(websocket, stream_sid, call_sid, "neutral", 0.5, failed_attempts + 1) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 return elif intent in ["positive", "interested"]: tone_params, response, is_farewell = await openai_handler.get_ai_response(call_sid, "User is interested.") call_manager.db.add_transcript(call_sid, "Assistant", response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, response, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if is_farewell: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return elif intent in ["neutral", "confused"]: message = "This is a quick call about satellite-based mineral exploration. Do you hold a license?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 25 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if audio_segment.duration_seconds >= 0.1: wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, intent, confidence) return else: logger.warning(f"Audio too short after neutral/confused: {audio_segment.duration_seconds}s") await handle_user_response(websocket, stream_sid, call_sid, "neutral", 0.5, failed_attempts + 1) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 return else: message = "I’ll follow up another time. Thanks!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return except Exception as e: logger.error(f"Error in handle_user_response for {call_sid}: {e}") call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] @app.websocket("/stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() call_sid = None stream_sid = None try: vad = webrtcvad.Vad(2) audio_buffer = bytearray() voicemail_buffer = bytearray() is_speaking = False silent_chunks = 0 message_playing = False voicemail_check_counter = 0 initial_phase = True call_start_time = asyncio.get_event_loop().time() CHUNK_DURATION_MS = 20 SAMPLE_RATE = 8000 CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000) SILENCE_THRESHOLD = 25 # 0.5 seconds LONG_SILENCE_THRESHOLD = 150 # 3 seconds MIN_VOICEMAIL_DURATION_MS = 500 # 0.5 seconds VOICEMAIL_CHECK_INTERVAL = 25 # 0.5 seconds INITIAL_PHASE_DURATION = 10 # 10 seconds MAX_CALL_DURATION = 300 # 5 minutes first_message_sent = False intent_failed_attempts = 0 MAX_INTENT_FAILED_ATTEMPTS = 3 voicemail_failed_attempts = 0 MAX_VOICEMAIL_FAILED_ATTEMPTS = 5 while asyncio.get_event_loop().time() - call_start_time < MAX_CALL_DURATION: try: message = await websocket.receive_text() data = json.loads(message) event = data.get("event") if event == "connected": logger.info("WebSocket connected") continue if event == "start": stream_sid = data["streamSid"] call_sid = data["start"]["callSid"] active_connections[call_sid] = True logger.info(f"Media stream started: call_sid={call_sid}, stream_sid={stream_sid}") silent_chunks = 0 is_speaking = False if not first_message_sent and call_sid: await asyncio.sleep(1) hardcoded_message = "Hi, this is Joanne from Farmonaut. I'm reaching out because we help teams like yours with mineral detection using satellite data. This is out of the blue, but may I have a moment to explain?" call_manager.db.add_transcript(call_sid, "Assistant", hardcoded_message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": hardcoded_message}) openai_handler.intro_playing[call_sid] = True try: async for chunk in openai_handler.stream_local_audio(call_sid, "initial_message.mp3"): if chunk and call_sid in active_connections: if openai_handler.is_speaking.get(call_sid, False): success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success: break await asyncio.sleep(0.02) else: break first_message_sent = True openai_handler.intro_playing[call_sid] = False except Exception as e: logger.error(f"Error streaming initial audio for {call_sid}: {e}") openai_handler.intro_playing[call_sid] = False await asyncio.sleep(3) continue if event == "media" and call_sid in active_connections: audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue if asyncio.get_event_loop().time() - call_start_time > INITIAL_PHASE_DURATION: initial_phase = False if openai_handler.intro_playing.get(call_sid, False) or openai_handler.is_speaking.get(call_sid, False): if vad.is_speech(audio_data, SAMPLE_RATE): logger.info(f"User interruption detected for {call_sid}") openai_handler.is_speaking[call_sid] = False openai_handler.intro_playing[call_sid] = False await safe_websocket_send(websocket, {"event": "clear", "streamSid": stream_sid}, call_sid) audio_buffer.clear() voicemail_buffer.clear() audio_buffer.extend(audio_data) is_speaking = True silent_chunks = 0 intent_failed_attempts = 0 voicemail_failed_attempts = 0 continue if initial_phase: voicemail_buffer.extend(audio_data) voicemail_check_counter += 1 if voicemail_check_counter >= VOICEMAIL_CHECK_INTERVAL: voicemail_check_counter = 0 pcm_audio = audioop.ulaw2lin(voicemail_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=SAMPLE_RATE, channels=1 ) if audio_segment.duration_seconds >= MIN_VOICEMAIL_DURATION_MS / 1000: # Check if audio contains speech using VAD is_speech = False for i in range(0, len(voicemail_buffer), CHUNK_SIZE): if vad.is_speech(voicemail_buffer[i:i+CHUNK_SIZE], SAMPLE_RATE): is_speech = True break if is_speech: if await detect_voicemail(audio_segment): message = "Voicemail detected. Exiting." call_manager.db.add_transcript(call_sid, "System", message) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return else: voicemail_failed_attempts += 1 if voicemail_failed_attempts >= MAX_VOICEMAIL_FAILED_ATTEMPTS: logger.info(f"Max voicemail failed attempts reached for {call_sid}, stopping voicemail checks") initial_phase = False else: logger.info(f"No speech detected in voicemail buffer for {call_sid}, RMS={audio_segment.rms}") voicemail_buffer.clear() is_speech = vad.is_speech(audio_data, SAMPLE_RATE) if is_speech: if not is_speaking: logger.info(f"User speech detected for {call_sid}") is_speaking = True audio_buffer.clear() audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: message_playing = True pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=SAMPLE_RATE, channels=1 ) if audio_segment.duration_seconds >= 0.1: wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) logger.info(f"User speech transcript: {transcript}") call_manager.db.add_transcript(call_sid, "User", transcript) voicemail_indicators = ["leave a message", "voicemail", "not available", "after the tone", "beep"] if any(indicator in transcript.lower() for indicator in voicemail_indicators): message = "Voicemail detected. Exiting." call_manager.db.add_transcript(call_sid, "System", message) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return intent, confidence = openai_handler.get_intent(transcript) if intent == "neutral" and confidence == 0.5: intent_failed_attempts += 1 else: intent_failed_attempts = 0 await handle_user_response(websocket, stream_sid, call_sid, intent, confidence, intent_failed_attempts) audio_buffer.clear() message_playing = False silent_chunks = 0 else: logger.warning(f"User audio too short: {audio_segment.duration_seconds}s") intent_failed_attempts += 1 await handle_user_response(websocket, stream_sid, call_sid, "neutral", 0.5, intent_failed_attempts) audio_buffer.clear() message_playing = False silent_chunks = 0 else: if not message_playing and first_message_sent: silent_chunks += 1 if silent_chunks >= LONG_SILENCE_THRESHOLD: message_playing = True if call_sid not in openai_handler.silent_attempts: openai_handler.silent_attempts[call_sid] = 0 openai_handler.silent_attempts[call_sid] += 1 tone_params, text_response, is_farewell = openai_handler.get_silent_response( call_sid, openai_handler.silent_attempts[call_sid] ) call_manager.db.add_transcript(call_sid, "Assistant", text_response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and call_sid in active_connections: success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success or is_farewell: break if is_farewell: call_manager.db.add_transcript(call_sid, "System", "Silent after prompt. Exiting.") call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return await asyncio.sleep(3) message_playing = False silent_chunks = 0 intent_failed_attempts = 0 voicemail_failed_attempts = 0 continue if event == "stop": logger.info(f"Media stream stopped for {call_sid}") call_manager.db.add_transcript(call_sid, "System", "Call dropped unexpectedly.") break except WebSocketDisconnect: logger.info(f"WebSocket disconnected for {call_sid}") call_manager.db.add_transcript(call_sid, "System", "Call dropped unexpectedly.") break except Exception as e: logger.error(f"Error in WebSocket message processing for {call_sid}: {e}") break except Exception as e: logger.error(f"Error in WebSocket endpoint for {call_sid}: {e}") finally: if call_sid: if call_manager and openai_handler: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] try: await websocket.close() except: pass def initialize_system(): global call_manager, openai_handler twilio_sid = os.getenv('TWILIO_ACCOUNT_SID') twilio_token = os.getenv('TWILIO_AUTH_TOKEN') twilio_number = os.getenv('TWILIO_PHONE_NUMBER') openai_key = os.getenv('OPENAI_API_KEY') elevenlabs_api_key = os.getenv('ELEVENLABS_API_KEY') if not all([twilio_sid, twilio_token, twilio_number, openai_key, elevenlabs_api_key]): raise ValueError("Missing required environment variables") call_manager = TwilioCallManager(twilio_sid, twilio_token, twilio_number) openai_handler = OpenAIHandler(openai_key, elevenlabs_api_key, kb_file_path="kb_articles.json") call_manager.set_openai_handler(openai_handler) logger.info("System initialized successfully") def example_usage(): initialize_system() script_template = """ You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company and there is no prior conversation. Your role is to deliver a friendly, concise introduction that: * Builds quick rapport * Clearly states who you are and why you're calling * Keeps things low-pressure and empathetic ## Your message should: 1. Greet the recipient warmly 2. State your name and the reason for calling 3. Acknowledge that this is unexpected 4. Invite a quick moment of their time ## Examples: * "Hi, this is Joanne from Farmonaut—just reaching out because we help teams like yours with mineral detection using satellite data." * "Hey there, I know this is out of the blue—mind if I take 20 seconds to explain why I'm calling?" ## Output Rules: * Your message must be natural, human-like, and under 50 words * Do not include any follow-up questions or requests for info in this first message * This is just your intro line—keep it brief and warm * Do not reference any prior interaction or knowledge """ config = CallConfig( phone_number="+16696666882", campaign_name="Simple Promotion Call", script_template=script_template ) webhook_url = os.getenv('PUBLIC_URL', "https://d4c3b5c10eed.ngrok-free.app") call_sid = call_manager.make_outbound_call(config, webhook_url) if call_sid: print(f"Call initiated successfully: {call_sid}") else: print("Failed to initiate call") if __name__ == '__main__': example_usage() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080)