import os import logging from datetime import datetime from dataclasses import dataclass from fastapi import FastAPI, Form, Response, WebSocket, WebSocketDisconnect from twilio.rest import Client from twilio.twiml.voice_response import VoiceResponse, Connect, Stream from openai import OpenAI import sqlite3 from dotenv import load_dotenv import requests from io import BytesIO import uuid import json from google.cloud import storage import asyncio import base64 import webrtcvad import pydub import numpy as np import urllib.parse import audioop # For μ-law conversion load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CallConfig: phone_number: str campaign_name: str script_template: str max_duration: int = 300 # 5 minutes class DatabaseManager: def __init__(self, db_path: str = "calls.db"): self.db_path = db_path self.init_database() def init_database(self): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone_number TEXT NOT NULL, campaign_name TEXT, call_sid TEXT UNIQUE, status TEXT, start_time TIMESTAMP, end_time TIMESTAMP, conversation_log TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS call_transcripts ( id INTEGER PRIMARY KEY AUTOINCREMENT, call_sid TEXT, speaker TEXT, message TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (call_sid) REFERENCES calls (call_sid) ) ''') conn.commit() conn.close() def create_call_record(self, phone_number: str, campaign_name: str, call_sid: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO calls (phone_number, campaign_name, call_sid, status, start_time) VALUES (?, ?, ?, ?, ?) ''', (phone_number, campaign_name, call_sid, 'initiated', datetime.now())) conn.commit() conn.close() logger.info(f"Call record created: {call_sid}") def update_call_status(self, call_sid: str, status: str, end_time=None): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if end_time: cursor.execute(''' UPDATE calls SET status=?, end_time=? WHERE call_sid=? ''', (status, end_time, call_sid)) else: cursor.execute(''' UPDATE calls SET status=? WHERE call_sid=? ''', (status, call_sid)) conn.commit() conn.close() def add_transcript(self, call_sid: str, speaker: str, message: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO call_transcripts (call_sid, speaker, message) VALUES (?, ?, ?) ''', (call_sid, speaker, message)) conn.commit() conn.close() class OpenAIHandler: def __init__(self, openai_api_key: str, elevenlabs_api_key: str, kb_file_path: str = "kb_articles.json"): self.client = OpenAI(api_key=openai_api_key) self.elevenlabs_api_key = elevenlabs_api_key self.conversation_history = {} self.silent_attempts = {} self.gcs_client = storage.Client() self.knowledge_base = self.load_knowledge_base(kb_file_path) self.is_speaking = {} # Track if AI is speaking self.intro_playing = {} # Track if intro is playing def load_knowledge_base(self, kb_file_path: str) -> list: try: with open(kb_file_path, 'r') as file: kb = json.load(file) logger.info(f"Loaded knowledge base from {kb_file_path}") return kb except Exception as e: logger.error(f"Error loading knowledge base: {e}") return [] def search_knowledge_base(self, user_input: str) -> str: user_input_lower = user_input.lower() relevant_content = [] for article in self.knowledge_base: if any(keyword.lower() in user_input_lower for keyword in article['keywords']): relevant_content.append(article['content']) return "\n".join(relevant_content) if relevant_content else "" def initialize_conversation(self, call_sid: str, script_template: str): kb_context = "\n".join([article['content'] for article in self.knowledge_base]) system_message = f"{script_template}\n\nKnowledge Base Context:\n{kb_context}" self.conversation_history[call_sid] = [{"role": "system", "content": system_message}] self.silent_attempts[call_sid] = 0 self.is_speaking[call_sid] = False self.intro_playing[call_sid] = False logger.info(f"Initialized conversation history for {call_sid}") def detect_conversation_end(self, call_sid: str, user_input: str) -> bool: try: prompt = ( f"Based on the conversation history and the latest user input: '{user_input}', " f"determine if the conversation should end. Return only 'true' or 'false'. " f"End the conversation if the user indicates they are done, " f"or if the conversation has naturally concluded." ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=10, temperature=0.5 ) decision = response.choices[0].message.content.strip().lower() logger.info(f"Conversation end detection for {call_sid}: {decision}") return decision == 'true' except Exception as e: logger.error(f"Error in detect_conversation_end: {e}") return False def get_intent(self, user_input: str) -> tuple: try: prompt = ( f"Analyze the following user input: '{user_input}'.\n" f"Return a JSON object with 'intent' (one of: 'not_interested', 'question', 'positive', 'interested', 'neutral', 'confused') " f"and 'confidence' (float between 0.0 and 1.0)." ) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=100, temperature=0.5 ) result = json.loads(response.choices[0].message.content.strip()) intent = result.get("intent", "neutral") confidence = float(result.get("confidence", 0.5)) logger.info(f"Intent analysis: intent={intent}, confidence={confidence}") return intent, confidence except Exception as e: logger.error(f"Error in get_intent: {e}") return "neutral", 0.5 def analyze_tone(self, call_sid: str, user_input: str) -> dict: default_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} try: prompt = ( ''' Analyze the tone and emotion of the user's latest input, using the full context of the ongoing conversation. Return only a JSON object with the following fields: * "rate": one of "slow", "medium", or "fast" * "pitch": one of "low", "medium", or "high" * "stability": a float between 0.0 and 1.0 * "similarity_boost": a float between 0.0 and 1.0 ''' ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() tone_params = json.loads(raw_response) if not isinstance(tone_params, dict): raise ValueError("Tone params is not a dictionary") if tone_params.get("rate") not in ["slow", "medium", "fast"]: tone_params["rate"] = default_params["rate"] if tone_params.get("pitch") not in ["low", "medium", "high"]: tone_params["pitch"] = default_params["pitch"] if not (0.0 <= tone_params.get("stability", 0.5) <= 1.0): tone_params["stability"] = default_params["stability"] if not (0.0 <= tone_params.get("similarity_boost", 0.75) <= 1.0): tone_params["similarity_boost"] = default_params["similarity_boost"] logger.info(f"Tone analysis for {call_sid}: {tone_params}") return tone_params except Exception as e: logger.error(f"Error in analyze_tone for {call_sid}: {e}") return default_params async def generate_elevenlabs_audio_stream(self, call_sid: str, text: str, tone_params: dict) -> tuple: self.is_speaking[call_sid] = True try: url = "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM/stream" headers = {"xi-api-key": self.elevenlabs_api_key} data = { "text": text, "model_id": "eleven_flash_v2_5", "voice_settings": { "stability": tone_params.get("stability", 0.5), "similarity_boost": tone_params.get("similarity_boost", 0.75), "speaker_boost": True, "style": 1 } } response = requests.post(url, json=data, headers=headers, stream=True) if response.status_code == 200: audio = pydub.AudioSegment.from_file(BytesIO(response.content), format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 # 1 second at 8kHz for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Generated ElevenLabs audio stream for {call_sid}") else: logger.error(f"ElevenLabs API error for {call_sid}: {response.status_code} - {response.text}") yield None except Exception as e: logger.error(f"Error in generate_elevenlabs_audio_stream for {call_sid}: {e}") yield None finally: self.is_speaking[call_sid] = False async def stream_local_audio(self, call_sid: str, audio_path: str = "initial_message.mp3") -> tuple: self.is_speaking[call_sid] = True self.intro_playing[call_sid] = True try: audio = pydub.AudioSegment.from_file(audio_path, format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 # 1 second at 8kHz for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Streamed local audio for {call_sid} from {audio_path}") except Exception as e: logger.error(f"Error streaming local audio for {call_sid}: {e}") yield None finally: self.is_speaking[call_sid] = False self.intro_playing[call_sid] = False def get_silent_response(self, call_sid: str, attempt: int) -> tuple: if attempt == 1: message = "Just checking if you can hear me?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} else: message = "It seems you're not there. Thanks for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} logger.info(f"Final silent response triggered for {call_sid}") self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return tone_params, message, True self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return tone_params, message, False async def get_ai_response(self, call_sid: str, user_input: str) -> tuple: try: if call_sid not in self.conversation_history: self.initialize_conversation(call_sid, "Default conversation initialization") if not user_input: self.conversation_history[call_sid].append({"role": "user", "content": "Start the conversation."}) else: if self.detect_conversation_end(call_sid, user_input): farewell = "Thank you for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": farewell}) return tone_params, farewell, True kb_response = self.search_knowledge_base(user_input) if kb_response: user_input = f"{user_input}\n\nRelevant Knowledge Base Information:\n{kb_response}" self.conversation_history[call_sid].append({"role": "user", "content": user_input}) prompt = ( ''' You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company. Your tone should be friendly, empathetic, and concise. ## Guidelines: - Start warmly: Greet and explain why you're calling. - Acknowledge hesitation: Ease pressure if they seem surprised. - Set boundaries: Clarify what you can help with. - Offer value: Ask for relevant details if they're open. - Confirm follow-up info: Verify name and email if provided. - Close professionally: End politely if uninterested. ## Response Rules: - Keep replies under 50 words in natural, conversational language - Ask clarifying questions if unclear - If uninterested, end politely - If engaged, ask a relevant follow-up or confirm info ''' ) temp_history = self.conversation_history[call_sid].copy() temp_history.append({"role": "system", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=150, temperature=0.7 ) raw_response = response.choices[0].message.content.strip() raw_response = raw_response.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') self.conversation_history[call_sid].append({"role": "assistant", "content": raw_response}) logger.info(f"Appended AI response to history for {call_sid}: {raw_response}") is_farewell = "thank you for your time" in raw_response.lower() or "goodbye" in raw_response.lower() tone_params = self.analyze_tone(call_sid, user_input or "Start the conversation.") return tone_params, raw_response, is_farewell except Exception as e: logger.error(f"Error in get_ai_response: {e}") error_message = "I'm sorry, I'm having trouble responding. We will call you back soon!" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": error_message}) return tone_params, error_message, True def upload_to_gcs(self, audio_stream, call_sid: str) -> str: bucket_name = 'farmbase-b2f7e.appspot.com' key = f"audio/{call_sid}_{uuid.uuid4()}.mp3" bucket = self.gcs_client.bucket(bucket_name) blob = bucket.blob(key) blob.upload_from_file(audio_stream, content_type='audio/mpeg') blob.make_public() return blob.public_url def cleanup_conversation(self, call_sid: str): if call_sid in self.conversation_history: logger.info(f"Cleaning up conversation history for {call_sid}") del self.conversation_history[call_sid] if call_sid in self.silent_attempts: logger.info(f"Cleaning up silent attempts for {call_sid}") del self.silent_attempts[call_sid] if call_sid in self.is_speaking: del self.is_speaking[call_sid] if call_sid in self.intro_playing: del self.intro_playing[call_sid] class TwilioCallManager: def __init__(self, account_sid: str, auth_token: str, from_number: str): self.client = Client(account_sid, auth_token) self.from_number = from_number self.db = DatabaseManager() self.openai_handler = None def set_openai_handler(self, openai_handler: OpenAIHandler): self.openai_handler = openai_handler def make_outbound_call(self, config: CallConfig, webhook_url: str) -> str: try: call = self.client.calls.create( to=config.phone_number, from_=self.from_number, url=f"{webhook_url}/twiml", method='POST', timeout=30, status_callback=f"{webhook_url}/call_status", status_callback_event=['initiated', 'ringing', 'answered', 'completed'] ) self.db.create_call_record(config.phone_number, config.campaign_name, call.sid) if self.openai_handler: self.openai_handler.initialize_conversation(call.sid, config.script_template) logger.info(f"Call initiated: {call.sid} to {config.phone_number}") return call.sid except Exception as e: logger.error(f"Error making call: {e}") return None app = FastAPI() call_manager = None openai_handler = None active_connections = {} @app.post("/twiml") async def twiml(): response = VoiceResponse() connect = Connect() stream = Stream(url=f"wss://{os.getenv('PUBLIC_URL').replace('https://', '')}/stream") connect.append(stream) response.append(connect) logger.info(f"TwiML generated: {str(response)}") return Response(content=str(response), media_type="text/xml") @app.post("/call_status") async def call_status(CallSid: str = Form(...), CallStatus: str = Form(...)): logger.info(f"Call {CallSid} status: {CallStatus}") if call_manager: if CallStatus == 'completed': call_manager.db.update_call_status(CallSid, CallStatus, datetime.now()) if openai_handler: openai_handler.cleanup_conversation(CallSid) if CallSid in active_connections: del active_connections[CallSid] else: call_manager.db.update_call_status(CallSid, CallStatus) return Response(status_code=200) async def safe_websocket_send(websocket: WebSocket, data: dict, call_sid: str = None): try: if call_sid and call_sid not in active_connections: logger.warning(f"Attempted to send data to inactive connection: {call_sid}") return False await websocket.send_json(data) return True except Exception as e: logger.error(f"Error sending WebSocket data for {call_sid}: {e}") if call_sid and call_sid in active_connections: del active_connections[call_sid] return False async def detect_voicemail(audio_segment: pydub.AudioSegment) -> bool: try: # Ensure audio_segment is in the correct format wav_buffer = BytesIO() # Export with explicit format and parameters to ensure compatibility audio_segment = audio_segment.set_frame_rate(16000).set_channels(1).set_sample_width(2) audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) # Assign a filename with .wav extension for the API wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) voicemail_indicators = ["leave a message", "voicemail", "not available", "after the tone", "beep"] is_voicemail = any(indicator in transcript.lower() for indicator in voicemail_indicators) logger.info(f"Voicemail detection result: {is_voicemail}, transcript: {transcript}") return is_voicemail except Exception as e: logger.error(f"Error in voicemail detection: {e}") return False async def detect_background_noise(audio_segment: pydub.AudioSegment) -> bool: try: # Simple noise detection based on RMS energy rms = audio_segment.rms NOISE_THRESHOLD = 100 # Adjust based on testing return rms > NOISE_THRESHOLD except Exception as e: logger.error(f"Error in background noise detection: {e}") return False async def detect_echo(audio_segment: pydub.AudioSegment) -> bool: try: # Simple echo detection (placeholder - implement based on audio analysis) # Could use cross-correlation or delay analysis in production return False # Placeholder except Exception as e: logger.error(f"Error in echo detection: {e}") return False async def handle_user_response(websocket: WebSocket, stream_sid: str, call_sid: str, intent: str, confidence: float): try: if confidence < 0.5: message = "Sorry, I didn’t catch that—could you repeat it?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) # 20ms at 8kHz SILENCE_THRESHOLD = 50 # 1 second start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_segment.export(format="wav").read()), response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) new_intent, new_confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, new_intent, new_confidence) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD * 3: # 3 seconds call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return return if intent == "not_interested": message = "No problem—thank you for your time!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return elif intent == "question": tone_params, response, is_farewell = await openai_handler.get_ai_response(call_sid, "User asked a question.") call_manager.db.add_transcript(call_sid, "Assistant", response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, response, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if is_farewell: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 50 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_segment.export(format="wav").read()), response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) new_intent, new_confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, new_intent, new_confidence) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 return elif intent in ["positive", "interested"]: tone_params, response, is_farewell = await openai_handler.get_ai_response(call_sid, "User is interested.") call_manager.db.add_transcript(call_sid, "Assistant", response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, response, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if is_farewell: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return elif intent in ["neutral", "confused"]: message = "This is a quick call about satellite-based mineral exploration. Do you hold a license?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 50 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_segment.export(format="wav").read()), response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) new_intent, new_confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, new_intent, new_confidence) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 return else: message = "I’ll follow up another time. Thanks!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return except Exception as e: logger.error(f"Error in handle_user_response for {call_sid}: {e}") call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] @app.websocket("/stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() call_sid = None stream_sid = None try: vad = webrtcvad.Vad(2) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 message_playing = False CHUNK_DURATION_MS = 20 SAMPLE_RATE = 8000 CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000) SILENCE_THRESHOLD = 50 # 1 second LONG_SILENCE_THRESHOLD = 150 # 3 seconds first_message_sent = False intro_played = False while True: try: message = await websocket.receive_text() data = json.loads(message) event = data.get("event") if event == "connected": logger.info("WebSocket connected") continue if event == "start": stream_sid = data["streamSid"] call_sid = data["start"]["callSid"] active_connections[call_sid] = True logger.info(f"Media stream started: call_sid={call_sid}, stream_sid={stream_sid}") silent_chunks = 0 is_speaking = False if not first_message_sent and call_sid: await asyncio.sleep(1) hardcoded_message = "Hi, this is Joanne from Farmonaut. I'm reaching out because we help teams like yours with mineral detection using satellite data. This is out of the blue, but may I have a moment to explain?" call_manager.db.add_transcript(call_sid, "Assistant", hardcoded_message) openai_handler.conversation_history[call_sid].append({"role": "assistant", "content": hardcoded_message}) openai_handler.intro_playing[call_sid] = True try: async for chunk in openai_handler.stream_local_audio(call_sid, "initial_message.mp3"): if chunk and call_sid in active_connections: if openai_handler.is_speaking.get(call_sid, False): success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success: break await asyncio.sleep(0.02) else: break # Stop if interrupted intro_played = True first_message_sent = True openai_handler.intro_playing[call_sid] = False except Exception as e: logger.error(f"Error streaming initial audio for {call_sid}: {e}") openai_handler.intro_playing[call_sid] = False await asyncio.sleep(3) continue if event == "media" and call_sid in active_connections: audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue if openai_handler.intro_playing.get(call_sid, False) or openai_handler.is_speaking.get(call_sid, False): if vad.is_speech(audio_data, SAMPLE_RATE): openai_handler.is_speaking[call_sid] = False openai_handler.intro_playing[call_sid] = False await safe_websocket_send(websocket, { "event": "clear", "streamSid": stream_sid }, call_sid) audio_buffer.clear() audio_buffer.extend(audio_data) is_speaking = True silent_chunks = 0 continue pcm_audio = audioop.ulaw2lin(audio_data, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_voicemail(audio_segment): message = "Voicemail detected. Exiting." call_manager.db.add_transcript(call_sid, "System", message) call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return if await detect_background_noise(audio_segment): transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_segment.export(format="wav").read()), response_format="text" ) if not transcript.strip(): # Unintelligible message = "It's a bit noisy—should I try you back later?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(2) is_speech = vad.is_speech(audio_data, 8000) if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_segment.export(format="wav").read()), response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, intent, confidence) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD * 3: call_manager.db.add_transcript(call_sid, "System", "No response after noise prompt.") call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return return if await detect_echo(audio_segment): message = "I’m hearing an echo—can you hear me okay?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", message) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, message, tone_params): if chunk and call_sid in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) await asyncio.sleep(3) audio_buffer = bytearray() start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue pcm_audio = audioop.ulaw2lin(audio_data, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_echo(audio_segment): call_manager.db.add_transcript(call_sid, "System", "Echo persists. Exiting.") call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return except asyncio.TimeoutError: pass continue is_speech = vad.is_speech(audio_data, SAMPLE_RATE) if is_speech: if not is_speaking: await safe_websocket_send(websocket, { "event": "clear", "streamSid": stream_sid }, call_sid) logger.info(f"User speech detected, clearing audio for {call_sid}") is_speaking = True audio_buffer.clear() message_playing = False audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: message_playing = True pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=BytesIO(audio_segment.export(format="wav").read()), response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_sid, call_sid, intent, confidence) await asyncio.sleep(2) message_playing = False audio_buffer.clear() silent_chunks = 0 else: if not message_playing and first_message_sent: silent_chunks += 1 if silent_chunks >= LONG_SILENCE_THRESHOLD: message_playing = True if call_sid not in openai_handler.silent_attempts: openai_handler.silent_attempts[call_sid] = 0 openai_handler.silent_attempts[call_sid] += 1 tone_params, text_response, is_farewell = openai_handler.get_silent_response( call_sid, openai_handler.silent_attempts[call_sid] ) call_manager.db.add_transcript(call_sid, "Assistant", text_response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and call_sid in active_connections: success = await safe_websocket_send(websocket, { "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }, call_sid) if not success or is_farewell: break if is_farewell: call_manager.db.add_transcript(call_sid, "System", "Silent after prompt. Exiting.") call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] return await asyncio.sleep(2) message_playing = False silent_chunks = 0 continue if event == "stop": logger.info(f"Media stream stopped for {call_sid}") call_manager.db.add_transcript(call_sid, "System", "Call dropped unexpectedly.") break except WebSocketDisconnect: logger.info(f"WebSocket disconnected for {call_sid}") call_manager.db.add_transcript(call_sid, "System", "Call dropped unexpectedly.") break except Exception as e: logger.error(f"Error in WebSocket message processing for {call_sid}: {e}") continue except Exception as e: logger.error(f"Error in WebSocket endpoint for {call_sid}: {e}") finally: if call_sid: if call_manager and openai_handler: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) if call_sid in active_connections: del active_connections[call_sid] try: await websocket.close() except: pass def initialize_system(): global call_manager, openai_handler twilio_sid = os.getenv('TWILIO_ACCOUNT_SID') twilio_token = os.getenv('TWILIO_AUTH_TOKEN') twilio_number = os.getenv('TWILIO_PHONE_NUMBER') openai_key = os.getenv('OPENAI_API_KEY') elevenlabs_api_key = os.getenv('ELEVENLABS_API_KEY') if not all([twilio_sid, twilio_token, twilio_number, openai_key, elevenlabs_api_key]): raise ValueError("Missing required environment variables") call_manager = TwilioCallManager(twilio_sid, twilio_token, twilio_number) openai_handler = OpenAIHandler(openai_key, elevenlabs_api_key, kb_file_path="kb_articles.json") call_manager.set_openai_handler(openai_handler) logger.info("System initialized successfully") def example_usage(): initialize_system() script_template = """ You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company and there is no prior conversation. Your role is to deliver a friendly, concise introduction that: * Builds quick rapport * Clearly states who you are and why you're calling * Keeps things low-pressure and empathetic ## Your message should: 1. Greet the recipient warmly 2. State your name and the reason for calling 3. Acknowledge that this is unexpected 4. Invite a quick moment of their time ## Examples: * "Hi, this is Joanne from Farmonaut—just reaching out because we help teams like yours with mineral detection using satellite data." * "Hey there, I know this is out of the blue—mind if I take 20 seconds to explain why I'm calling?" ## Output Rules: * Your message must be natural, human-like, and under 50 words * Do not include any follow-up questions or requests for info in this first message * This is just your intro line—keep it brief and warm * Do not reference any prior interaction or knowledge """ config = CallConfig( phone_number="+16696666882", campaign_name="Simple Promotion Call", script_template=script_template ) webhook_url = os.getenv('PUBLIC_URL', "https://d4c3b5c10eed.ngrok-free.app") call_sid = call_manager.make_outbound_call(config, webhook_url) if call_sid: print(f"Call initiated successfully: {call_sid}") else: print("Failed to initiate call") if __name__ == '__main__': example_usage() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080)