import os import logging import re from datetime import datetime import time from dataclasses import dataclass from typing import Dict from fastapi import FastAPI, Form, Response, WebSocket, WebSocketDisconnect from twilio.rest import Client from twilio.twiml.voice_response import VoiceResponse, Connect, Stream from openai import OpenAI import sqlite3 from dotenv import load_dotenv import requests from io import BytesIO import uuid import json from google.cloud import storage import asyncio import base64 import webrtcvad import pydub import numpy as np import urllib.parse import audioop load_dotenv() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CallConfig: phone_number: str campaign_name: str script_template: str max_duration: int = 150 class DatabaseManager: def __init__(self, db_path: str = "calls.db"): self.db_path = db_path sqlite3.register_adapter(datetime, lambda val: val.isoformat()) self.init_database() def init_database(self): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone_number TEXT NOT NULL, campaign_name TEXT NOT NULL, call_sid TEXT UNIQUE, status TEXT NOT NULL, start_time TIMESTAMP, end_time TIMESTAMP, conversation_log TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS call_transcripts ( id INTEGER PRIMARY KEY AUTOINCREMENT, call_sid TEXT, speaker TEXT NOT NULL, content TEXT NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (call_sid) REFERENCES calls (call_sid) ) ''') conn.commit() def create_call_record(self, phone_number: str, campaign_name: str, call_sid: str): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO calls (phone_number, campaign_name, call_sid, status, start_time) VALUES (?, ?, ?, ?, ?) ''', (phone_number, campaign_name, call_sid, 'initiated', datetime.now())) conn.commit() logger.info(f"Created call record for {call_sid}") def update_call_status(self, call_sid: str, status: str, end_time: datetime = None): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() if end_time: cursor.execute(''' UPDATE calls SET status=?, end_time=? WHERE call_id=? ''', (status, end_time, call_id)) else: cursor.execute(''' UPDATE calls SET status=? WHERE call_id=? ''', (status, call_id)) conn.commit() def add_transcript(self, call_id: str, speaker: str, content: str): with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(''' INSERT INTO call_transcripts (call_id, speaker, content) VALUES (?, ?, ?) ''', (call_id, speaker, content)) conn.commit() class OpenAIHandler: def __init__(self, openai_api_key: str, elevenlabs_api_key: str, kb_file_path: str = "kb_articles.json"): self.client = OpenAI(api_key=openai_api_key) self.elevenlabs_api_key = elevenlabs_api_key self.conversation_history: Dict[str, list] = {} self.silent_attempts: Dict[str, int] = {} self.gcs_client = storage.Client() self.knowledge_base = self.load_knowledge_base(kb_file_path) self.is_speaking: Dict[str, bool] = {} self.intro_playing: Dict[str, bool] = {} self.last_playback_time: Dict[str, float] = {} # Track last playback time for echo detection def load_knowledge_base(self, kb_file_path: str) -> list: try: with open(kb_file_path, 'r') as file: kb = json.load(file) logger.info(f"Loaded knowledge base from {kb_file_path}") return kb except Exception as e: logger.error(f"Error loading knowledge base: {e}") return [] def search_knowledge_base(self, query: str) -> str: query_lower = query.lower() relevant_content = [] for article in self.knowledge_base: if any(keyword.lower() in query_lower for keyword in article['keywords']): relevant_content.append(article['content']) return "\n".join(relevant_content) if relevant_content else "" def initialize_conversation(self, call_id: str, script_template: str): kb_context = "\n".join([article['content'] for article in self.knowledge_base]) system_message = f"{script_template}\n\nKnowledge Base Context:\n{kb_context}" self.conversation_history[call_id] = [{"role": "system", "content": system_message}] self.silent_attempts[call_id] = 0 self.is_speaking[call_id] = False self.intro_playing[call_id] = False self.last_playback_time[call_id] = 0.0 logger.info(f"Initialized conversation for {call_id}") def detect_conversation_end(self, call_id: str, user_input: str) -> bool: try: prompt = ( f"Based on the conversation history and the latest user input: '{user_input}', " f"determine if the conversation should end. Return only 'true' or 'false'. " f"End the conversation if the user indicates they are done, " f"or if the conversation has naturally concluded." ) temp_history = self.conversation_history[call_id].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=10, temperature=0.5 ) decision = response.choices[0].message.content.strip().lower() logger.info(f"Conversation end detection for {call_id}: {decision}") return decision == 'true' except Exception as e: logger.error(f"Error in detect_conversation_end: {e}") return False def get_intent(self, user_input: str) -> tuple: try: prompt = ( f"Analyze the following user input: '{user_input}'.\n" f"If the input is empty, unclear, or invalid, return {{\"intent\": \"neutral\", \"confidence\": 0.5}}.\n" f"Otherwise, return a JSON object with 'intent' (one of: 'not_interested', 'question', 'positive', 'interested', 'neutral', 'confused') " f"and 'confidence' (float between 0.0 and 1.0). Ensure the response is raw JSON without Markdown or code block formatting." ) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=[{"role": "user", "content": prompt}], max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() logger.debug(f"Raw intent response for input '{user_input}': {raw_response}") cleaned_response = re.sub(r'```json\s*|\s*```|.*?(?=\{)', '', raw_response, flags=re.DOTALL).strip() try: result = json.loads(cleaned_response) intent = result.get("intent", "neutral") confidence = float(result.get("confidence", 0.5)) if intent not in ['not_interested', 'question', 'positive', 'interested', 'neutral', 'confused']: logger.warning(f"Invalid intent received: {intent}, defaulting to 'neutral'") intent = "neutral" if not (0.0 <= confidence <= 1.0): logger.warning(f"Invalid confidence received: {confidence}, defaulting to 0.5") confidence = 0.5 logger.info(f"Intent analysis: intent={intent}, confidence={confidence}") return intent, confidence except json.JSONDecodeError as e: logger.error(f"JSON parsing error in get_intent: {e}, raw response: {raw_response}") return "neutral", 0.5 except Exception as e: logger.error(f"Error in get_intent: {e}, input: {user_input}") return "neutral", 0.5 def analyze_tone(self, call_id: str, user_input: str) -> dict: default_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} try: prompt = ( ''' Analyze the tone and emotion of the user's latest input, using the full context of the ongoing conversation. Return only a JSON object with the following fields: * "rate": one of "slow", "medium", or "fast" * "pitch": one of "low", "medium", or "high" * "stability": a float between 0.0 and 1.0 * "similarity_boost": a float between 0.0 and 1.0 Ensure the response is raw JSON without Markdown or code block formatting. ''' ) temp_history = self.conversation_history[call_id].copy() temp_history.append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() tone_params = json.loads(raw_response) if not isinstance(tone_params, dict): raise ValueError("Tone params is not a dictionary") if tone_params.get("rate") not in ["slow", "medium", "fast"]: tone_params["rate"] = default_params["rate"] if tone_params.get("pitch") not in ["low", "medium", "high"]: tone_params["pitch"] = default_params["pitch"] if not (0.0 <= tone_params.get("stability", 0.5) <= 1.0): tone_params["stability"] = default_params["stability"] if not (0.0 <= tone_params.get("similarity_boost", 0.75) <= 1.0): tone_params["similarity_boost"] = default_params["similarity_boost"] logger.info(f"Tone analysis for {call_id}: {tone_params}") return tone_params except Exception as e: logger.error(f"Error in analyze_tone for {call_id}: {e}") return default_params async def generate_elevenlabs_audio_stream(self, call_id: str, text: str, tone_params: dict) -> tuple: self.is_speaking[call_id] = True self.last_playback_time[call_id] = time.time() try: url = "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM/stream" headers = {"xi-api-key": self.elevenlabs_api_key} data = { "text": text, "model_id": "eleven_flash_v2_5", "voice_settings": { "stability": tone_params.get("stability", 0.5), "similarity_boost": tone_params.get("similarity_boost", 0.75), "speaker_boost": True, "style": 1 } } response = requests.post(url, json=data, headers=headers, stream=True) if response.status_code == 200: audio = pydub.AudioSegment.from_file(BytesIO(response.content), format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Generated ElevenLabs audio stream for {call_id}") else: logger.error(f"ElevenLabs API error for {call_id}: {response.status_code} - {response.text}") yield None except Exception as e: logger.error(f"Error in generate_elevenlabs_audio_stream for {call_id}: {e}") yield None finally: self.is_speaking[call_id] = False self.last_playback_time[call_id] = time.time() async def stream_local_audio(self, call_id: str, audio_path: str = "initial_message.mp3") -> tuple: self.is_speaking[call_id] = True self.intro_playing[call_id] = True self.last_playback_time[call_id] = time.time() try: audio = pydub.AudioSegment.from_file(audio_path, format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Streamed local audio for {call_id} from {audio_path}") except Exception as e: logger.error(f"Error streaming local audio for {call_id}: {e}") yield None finally: self.is_speaking[call_id] = False self.intro_playing[call_id] = False self.last_playback_time[call_id] = time.time() def get_silent_response(self, call_id: str, attempt: int) -> tuple: if attempt == 1: message = "Just checking if you can hear me?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} else: message = "It seems you're not there. Thanks for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} logger.info(f"Final silent response triggered for {call_id}") self.conversation_history[call_id].append({"role": "assistant", "content": message}) return tone_params, message, True self.conversation_history[call_id].append({"role": "assistant", "content": message}) return tone_params, message, False async def get_ai_response(self, call_id: str, user_input: str) -> tuple: try: if call_id not in self.conversation_history: self.initialize_conversation(call_id, "Default conversation initialization") if not user_input: self.conversation_history[call_id].append({"role": "user", "content": "Start the conversation."}) else: if self.detect_conversation_end(call_id, user_input): farewell = "Thank you for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} self.conversation_history[call_id].append({"role": "assistant", "content": farewell}) return tone_params, farewell, True kb_response = self.search_knowledge_base(user_input) if kb_response: user_input = f"{user_input}\n\nRelevant Knowledge Base Information:\n{kb_response}" self.conversation_history[call_id].append({"role": "user", "content": user_input}) prompt = ( ''' You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company. Your tone should be friendly, empathetic, and concise. ## Guidelines: - Start warmly: Greet and explain why you're calling. - Acknowledge hesitation: Ease pressure if they seem surprised. - Set boundaries: Clarify what you can help with. - Offer value: Ask for relevant details if they're open. - Confirm follow-up info: Verify name and email if provided. - Close professionally: End politely if uninterested. ## Response Rules: - Keep replies under 50 words in natural, conversational language - Ask clarifying questions if unclear - If uninterested, end politely - If engaged, ask a relevant follow-up or confirm info ''' ) temp_history = self.conversation_history[call_id].copy() temp_history.append({"role": "system", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4o-mini", messages=temp_history, max_tokens=150, temperature=0.7 ) raw_response = response.choices[0].message.content.strip() raw_response = raw_response.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", ''') self.conversation_history[call_id].append({"role": "assistant", "content": raw_response}) logger.info(f"Appended AI response to history for {call_id}: {raw_response}") is_farewell = "thank you for your time" in raw_response.lower() or "goodbye" in raw_response.lower() tone_params = self.analyze_tone(call_id, user_input or "Start the conversation.") return tone_params, raw_response, is_farewell except Exception as e: logger.error(f"Error in get_ai_response: {e}") error_message = "I'm sorry, I'm having trouble responding. We will call you back soon!" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} self.conversation_history[call_id].append({"role": "assistant", "content": error_message}) return tone_params, error_message, True def upload_to_gcs(self, audio_stream, call_id: str) -> str: bucket_name = 'farmbase-b2f7e.appspot.com' key = f"audio/{call_id}_{uuid.uuid4()}.mp3" bucket = self.gcs_client.bucket(bucket_name) blob = bucket.blob(key) blob.upload_from_file(audio_stream, content_type='audio/mpeg') blob.make_public() return blob.public_url def cleanup_conversation(self, call_id: str): if call_id in self.conversation_history: logger.info(f"Cleaning up conversation history for {call_id}") del self.conversation_history[call_id] if call_id in self.silent_attempts: logger.info(f"Cleaning up silent attempts for {call_id}") del self.silent_attempts[call_id] if call_id in self.is_speaking: del self.is_speaking[call_id] if call_id in self.intro_playing: del self.intro_playing[call_id] if call_id in self.last_playback_time: del self.last_playback_time[call_id] class TwilioCallManager: def __init__(self, account_id: str, auth_token: str, from_number: str): self.client = Client(account_id, auth_token) self.from_number = from_number self.db = DatabaseManager() self.openai_handler = None def set_openai_handler(self, openai_handler: OpenAIHandler): self.openai_handler = openai_handler def make_outbound_call(self, config: CallConfig, webhook_url: str) -> str: try: call = self.client.calls.create( to=config.phone_number, from_=self.from_number, url=f"{webhook_url}/twiml", method='POST', timeout=30, status_callback=f"{webhook_url}/call_status", status_callback_event=['initiated', 'ringing', 'answered', 'completed'] ) self.db.create_call_record(config.phone_number, config.campaign_name, call.id) if self.openai_handler: self.openai_handler.initialize_conversation(call.id, config.script_template) logger.info(f"Call initiated: {call.id} to {config.phone_number}") return call.id except Exception as e: logger.error(f"Error making call: {e}") return None app = FastAPI() call_manager = None openai_handler = None active_connections = {} @app.post("/twiml") async def twiml(): response = VoiceResponse() connect = Connect() stream = Stream(url=f"wss://{os.getenv('PUBLIC_URL').replace('https://', '')}/stream") connect.append(stream) response.append(connect) logger.info(f"TwiML generated: {str(response)}") return Response(content=str(response), media_type="text/xml") @app.post("/call_status") async def call_status(CallId: str = Form(...), CallStatus: str = Form(...)): logger.info(f"Call {CallId} status: {CallStatus}") if call_manager: if CallStatus == 'completed': call_manager.db.update_call_status(CallId, CallStatus, datetime.now()) if openai_handler: openai_handler.cleanup_conversation(CallId) if CallId in active_connections: del active_connections[CallId] else: call_manager.db.update_call_status(CallId, CallStatus) return Response(status_code=200) async def safe_websocket_send(websocket: WebSocket, data: dict, call_id: str = None): try: if call_id and call_id not in active_connections: logger.warning(f"Attempted to send data to inactive connection: {call_id}") return False await websocket.send_json(data) return True except Exception as e: logger.error(f"Error sending WebSocket data for {call_id}: {e}") if call_id and call_id in active_connections: del active_connections[call_id] return False async def detect_voicemail(audio_segment: pydub.AudioSegment, call_id: str) -> bool: try: if audio_segment.duration_seconds < 0.1: logger.warning(f"Audio segment too short for voicemail detection: {audio_segment.duration_seconds}s") return False rms = audio_segment.rms ENERGY_THRESHOLD = 200 # Increased to filter out echo/noise if rms < ENERGY_THRESHOLD: logger.info(f"Audio energy too low for voicemail detection: RMS={rms}") return False debug_file = f"voicemail_debug_{call_id}_{uuid.uuid4()}.wav" audio_segment.export(debug_file, format="wav", codec="pcm_s16le") logger.info(f"Saved voicemail audio for debugging: {debug_file}") wav_buffer = BytesIO() audio_segment = audio_segment.set_frame_rate(16000).set_channels(1).set_sample_width(2) audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text", prompt="Expect clear speech or voicemail indicators like 'leave a message' or 'not available'." ) logger.info(f"Voicemail detection transcript: {transcript}") if not transcript.strip() or len(transcript.strip()) < 3: logger.info(f"Invalid or short transcript: {transcript}") return False voicemail_indicators = ["leave a message", "voicemail", "not available", "after the tone", "beep"] is_voicemail = any(indicator in transcript.lower() for indicator in voicemail_indicators) logger.info(f"Voicemail detection result: {is_voicemail}") return is_voicemail except Exception as e: logger.error(f"Error in voicemail detection: {e}") return False async def detect_background_noise(audio_segment: pydub.AudioSegment) -> bool: try: rms = audio_segment.rms NOISE_THRESHOLD = 200 return rms > NOISE_THRESHOLD except Exception as e: logger.error(f"Error in background noise detection: {e}") return False async def detect_echo(audio_segment: pydub.AudioSegment, call_id: str) -> bool: try: current_time = time.time() last_playback = openai_handler.last_playback_time.get(call_id, 0.0) ECHO_WINDOW = 2.0 # Seconds to consider for echo detection if current_time - last_playback < ECHO_WINDOW: logger.info(f"Possible echo detected for {call_id}: audio received within {ECHO_WINDOW}s of playback") return True rms = audio_segment.rms if rms < 200: # Low energy likely indicates echo or noise logger.info(f"Low energy audio detected, possible echo: RMS={rms}") return True return False except Exception as e: logger.error(f"Error in echo detection: {e}") return False async def handle_user_response(websocket: WebSocket, stream_id: str, call_id: str, intent: str, confidence: float, failed_attempts: int = 0): MAX_FAILED_ATTEMPTS = 3 try: if failed_attempts >= MAX_FAILED_ATTEMPTS: message = "I'm sorry, I'm having trouble understanding. We'll follow up later. Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_id, "Assistant", message) openai_handler.conversation_history[call_id].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, message, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return if confidence < 0.5: message = "Sorry, I didn’t catch that—could you repeat it?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_id, "Assistant", message) openai_handler.conversation_history[call_id].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, message, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 25 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(3) is_speech = vad.is_speech(audio_data, 8000) pcm_audio = audioop.ulaw2lin(audio_data, 2) temp_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_echo(temp_segment, call_id): logger.info(f"Echo detected, ignoring audio chunk for {call_id}") continue if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if audio_segment.duration_seconds >= 0.1 and audio_segment.rms >= 200: debug_file = f"user_speech_{call_id}_{uuid.uuid4()}.wav" audio_segment.export(debug_file, format="wav", codec="pcm_s16le") logger.info(f"Saved user speech audio for debugging: {debug_file}") wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text", prompt="Expect clear speech or voicemail indicators." ) logger.info(f"User speech transcript: {transcript}") if not transcript.strip() or len(transcript.strip()) < 3: logger.info(f"Invalid or short transcript: {transcript}") await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, failed_attempts + 1) return call_manager.db.add_transcript(call_id, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_id, call_id, intent, confidence, failed_attempts + (1 if confidence < 0.5 else 0)) return else: logger.warning(f"Audio too short or low energy: duration={audio_segment.duration_seconds}s, RMS={audio_segment.rms}") await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, failed_attempts + 1) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD * 6: call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return return if intent == "not_interested": message = "No problem—thank you for your time!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_id, "Assistant", message) openai_handler.conversation_history[call_id].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, message, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return elif intent == "question": tone_params, response, is_farewell = await openai_handler.get_ai_response(call_id, "User asked a question.") call_manager.db.add_transcript(call_id, "Assistant", response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, response, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) if is_farewell: call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 25 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(3) is_speech = vad.is_speech(audio_data, 8000) pcm_audio = audioop.ulaw2lin(audio_data, 2) temp_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_echo(temp_segment, call_id): logger.info(f"Echo detected, ignoring audio chunk for {call_id}") continue if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if audio_segment.duration_seconds >= 0.1 and audio_segment.rms >= 200: debug_file = f"user_speech_{call_id}_{uuid.uuid4()}.wav" audio_segment.export(debug_file, format="wav", codec="pcm_s16le") logger.info(f"Saved user speech audio for debugging: {debug_file}") wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text", prompt="Expect clear speech or voicemail indicators." ) logger.info(f"User speech transcript: {transcript}") if not transcript.strip() or len(transcript.strip()) < 3: logger.info(f"Invalid or short transcript: {transcript}") await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, failed_attempts + 1) return call_manager.db.add_transcript(call_id, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_id, call_id, intent, confidence) return else: logger.warning(f"Audio too short or low energy: duration={audio_segment.duration_seconds}s, RMS={audio_segment.rms}") await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, failed_attempts + 1) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 return elif intent in ["positive", "interested"]: tone_params, response, is_farewell = await openai_handler.get_ai_response(call_id, "User is interested.") call_manager.db.add_transcript(call_id, "Assistant", response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, response, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) if is_farewell: call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return elif intent in ["neutral", "confused"]: message = "This is a quick call about satellite-based mineral exploration. Do you hold a license?" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_id, "Assistant", message) openai_handler.conversation_history[call_id].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, message, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) await asyncio.sleep(3) audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_SIZE = int(8000 * 20 / 1000) SILENCE_THRESHOLD = 25 start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < 3.0: try: message = await asyncio.wait_for(websocket.receive_text(), timeout=0.02) data = json.loads(message) if data.get("event") == "media": audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue vad = webrtcvad.Vad(3) is_speech = vad.is_speech(audio_data, 8000) pcm_audio = audioop.ulaw2lin(audio_data, 2) temp_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_echo(temp_segment, call_id): logger.info(f"Echo detected, ignoring audio chunk for {call_id}") continue if is_speech: is_speaking = True audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if audio_segment.duration_seconds >= 0.1 and audio_segment.rms >= 200: debug_file = f"user_speech_{call_id}_{uuid.uuid4()}.wav" audio_segment.export(debug_file, format="wav", codec="pcm_s16le") logger.info(f"Saved user speech audio for debugging: {debug_file}") wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text", prompt="Expect clear speech or voicemail indicators." ) logger.info(f"User speech transcript: {transcript}") if not transcript.strip() or len(transcript.strip()) < 3: logger.info(f"Invalid or short transcript: {transcript}") await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, failed_attempts + 1) return call_manager.db.add_transcript(call_id, "User", transcript) intent, confidence = openai_handler.get_intent(transcript) await handle_user_response(websocket, stream_id, call_id, intent, confidence) return else: logger.warning(f"Audio too short or low energy: duration={audio_segment.duration_seconds}s, RMS={audio_segment.rms}") await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, failed_attempts + 1) return else: silent_chunks += 1 except asyncio.TimeoutError: silent_chunks += 1 return else: message = "I’ll follow up another time. Thanks!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} call_manager.db.add_transcript(call_id, "Assistant", message) openai_handler.conversation_history[call_id].append({"role": "assistant", "content": message}) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, message, tone_params): if chunk and call_id in active_connections: await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return except Exception as e: logger.error(f"Error in handle_user_response for {call_id}: {e}") call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] @app.websocket("/stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() call_id = None stream_id = None try: vad = webrtcvad.Vad(3) audio_buffer = bytearray() voicemail_buffer = bytearray() is_speaking = False silent_chunks = 0 message_playing = False voicemail_check_counter = 0 initial_phase = True call_start_time = asyncio.get_event_loop().time() CHUNK_DURATION_MS = 20 SAMPLE_RATE = 8000 CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000) SILENCE_THRESHOLD = 25 LONG_SILENCE_THRESHOLD = 250 # Increased to 5 seconds MIN_VOICEMAIL_DURATION_MS = 500 VOICEMAIL_CHECK_INTERVAL = 50 INITIAL_PHASE_DURATION = 10 MAX_CALL_DURATION = 150 first_message_sent = False intent_failed_attempts = 0 voicemail_failed_attempts = 0 MAX_INTENT_FAILED_ATTEMPTS = 3 MAX_VOICEMAIL_FAILED_ATTEMPTS = 5 initial_message_delivered = False playback_buffer_time = 2.0 # Buffer to avoid echo after playback while asyncio.get_event_loop().time() - call_start_time < MAX_CALL_DURATION: try: message = await websocket.receive_text() data = json.loads(message) event = data.get("event") if event == "connected": logger.info("WebSocket connected") continue if event == "start": stream_id = data["streamId"] call_id = data["start"]["callId"] active_connections[call_id] = True logger.info(f"Media stream started: call_id={call_id}, stream_id={stream_id}") silent_chunks = 0 is_speaking = False if not first_message_sent and call_id: await asyncio.sleep(1) hardcoded_message = "Hi, this is Joanne from Farmonaut. I'm reaching out because we help teams like yours with mineral detection using satellite data. This is out of the blue, but may I have a moment to explain?" call_manager.db.add_transcript(call_id, "Assistant", hardcoded_message) openai_handler.conversation_history[call_id].append({"role": "assistant", "content": hardcoded_message}) openai_handler.intro_playing[call_id] = True try: async for chunk in openai_handler.stream_local_audio(call_id, "initial_message.mp3"): if chunk and call_id in active_connections: if openai_handler.is_speaking.get(call_id, False): success = await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) if not success: break await asyncio.sleep(0.02) else: break first_message_sent = True initial_message_delivered = True openai_handler.intro_playing[call_id] = False logger.info(f"Initial message delivered for {call_id}") except Exception as e: logger.error(f"Error streaming initial audio for {call_id}: {e}") openai_handler.intro_playing[call_id] = False initial_message_delivered = False await asyncio.sleep(playback_buffer_time) # Wait to avoid echo continue if event == "media" and call_id in active_connections: audio_data = base64.b64decode(data["media"]["payload"]) if len(audio_data) != 160: continue if asyncio.get_event_loop().time() - call_start_time > INITIAL_PHASE_DURATION: initial_phase = False if openai_handler.intro_playing.get(call_id, False) or openai_handler.is_speaking.get(call_id, False) or (time.time() - openai_handler.last_playback_time.get(call_id, 0.0) < playback_buffer_time): pcm_audio = audioop.ulaw2lin(audio_data, 2) temp_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_echo(temp_segment, call_id): logger.info(f"Echo detected during playback, ignoring audio for {call_id}") continue if vad.is_speech(audio_data, SAMPLE_RATE): logger.info(f"User interruption detected for {call_id}") openai_handler.is_speaking[call_id] = False openai_handler.intro_playing[call_id] = False await safe_websocket_send(websocket, {"event": "clear", "streamId": stream_id}, call_id) audio_buffer.clear() voicemail_buffer.clear() audio_buffer.extend(audio_data) is_speaking = True silent_chunks = 0 intent_failed_attempts = 0 voicemail_failed_attempts = 0 initial_message_delivered = True continue if initial_phase and not initial_message_delivered: logger.info(f"Skipping voicemail check during initial message playback for {call_id}") continue if initial_phase: voicemail_buffer.extend(audio_data) voicemail_check_counter += 1 if voicemail_check_counter >= VOICEMAIL_CHECK_INTERVAL: voicemail_check_counter = 0 pcm_audio = audioop.ulaw2lin(voicemail_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=SAMPLE_RATE, channels=1 ) if await detect_echo(audio_segment, call_id): logger.info(f"Echo detected in voicemail buffer for {call_id}") voicemail_buffer.clear() continue if audio_segment.duration_seconds >= MIN_VOICEMAIL_DURATION_MS / 1000: is_speech = False for i in range(0, len(voicemail_buffer), CHUNK_SIZE): if vad.is_speech(voicemail_buffer[i:i+CHUNK_SIZE], SAMPLE_RATE): is_speech = True break if is_speech: if await detect_voicemail(audio_segment, call_id): message = "Voicemail detected. Exiting." call_manager.db.add_transcript(call_id, "System", message) call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return voicemail_failed_attempts += 1 if voicemail_failed_attempts >= MAX_VOICEMAIL_FAILED_ATTEMPTS: logger.info(f"Max voicemail failed attempts reached for {call_id}, stopping voicemail checks") initial_phase = False else: logger.info(f"No speech detected in voicemail buffer for {call_id}, RMS={audio_segment.rms}") voicemail_failed_attempts += 1 if voicemail_failed_attempts >= MAX_VOICEMAIL_FAILED_ATTEMPTS: logger.info(f"Max voicemail failed attempts reached for {call_id}, stopping voicemail checks") initial_phase = False voicemail_buffer.clear() await asyncio.sleep(0.2) # Increased rate limit continue pcm_audio = audioop.ulaw2lin(audio_data, 2) temp_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=8000, channels=1 ) if await detect_echo(temp_segment, call_id): logger.info(f"Echo detected, ignoring audio chunk for {call_id}") continue is_speech = vad.is_speech(audio_data, SAMPLE_RATE) if is_speech and initial_message_delivered: if not is_speaking: logger.info(f"User speech detected for {call_id}") is_speaking = True audio_buffer.clear() audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False if len(audio_buffer) > 0: message_playing = True pcm_audio = audioop.ulaw2lin(audio_buffer, 2) audio_segment = pydub.AudioSegment( data=pcm_audio, sample_width=2, frame_rate=SAMPLE_RATE, channels=1 ) if await detect_echo(audio_segment, call_id): logger.info(f"Echo detected in user speech buffer for {call_id}") audio_buffer.clear() message_playing = False silent_chunks = 0 continue if audio_segment.duration_seconds >= 0.1 and audio_segment.rms >= 200: debug_file = f"user_speech_{call_id}_{uuid.uuid4()}.wav" audio_segment.export(debug_file, format="wav", codec="pcm_s16le") logger.info(f"Saved user speech audio for debugging: {debug_file}") wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav", codec="pcm_s16le") wav_buffer.seek(0) wav_buffer.name = "audio.wav" transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text", prompt="Expect clear speech or voicemail indicators." ) logger.info(f"User speech transcript: {transcript}") if not transcript.strip() or len(transcript.strip()) < 3: logger.info(f"Invalid or short transcript: {transcript}") intent_failed_attempts += 1 await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, intent_failed_attempts) audio_buffer.clear() message_playing = False silent_chunks = 0 continue call_manager.db.add_transcript(call_id, "User", transcript) voicemail_indicators = ["leave a message", "voicemail", "not available", "after the tone", "beep"] if any(indicator in transcript.lower() for indicator in voicemail_indicators): message = "Voicemail detected. Exiting." call_manager.db.add_transcript(call_id, "System", message) call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return intent, confidence = openai_handler.get_intent(transcript) if intent == "neutral" and confidence == 0.5: intent_failed_attempts += 1 else: intent_failed_attempts = 0 await handle_user_response(websocket, stream_id, call_id, intent, confidence, intent_failed_attempts) audio_buffer.clear() message_playing = False silent_chunks = 0 else: logger.warning(f"User audio too short or low energy: duration={audio_segment.duration_seconds}s, RMS={audio_segment.rms}") intent_failed_attempts += 1 await handle_user_response(websocket, stream_id, call_id, "neutral", 0.5, intent_failed_attempts) audio_buffer.clear() message_playing = False silent_chunks = 0 else: if not message_playing and first_message_sent and initial_message_delivered: silent_chunks += 1 if silent_chunks >= LONG_SILENCE_THRESHOLD and time.time() - openai_handler.last_playback_time.get(call_id, 0.0) >= playback_buffer_time: message_playing = True if call_id not in openai_handler.silent_attempts: openai_handler.silent_attempts[call_id] = 0 openai_handler.silent_attempts[call_id] += 1 tone_params, text_response, is_farewell = openai_handler.get_silent_response( call_id, openai_handler.silent_attempts[call_id] ) call_manager.db.add_transcript(call_id, "Assistant", text_response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_id, text_response, tone_params): if chunk and call_id in active_connections: success = await safe_websocket_send(websocket, { "event": "media", "streamId": stream_id, "media": {"payload": chunk} }, call_id) if not success or is_farewell: break if is_farewell: call_manager.db.add_transcript(call_id, "System", "Silent after prompt. Exiting.") call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] return await asyncio.sleep(3) message_playing = False silent_chunks = 0 intent_failed_attempts = 0 voicemail_failed_attempts = 0 continue if event == "stop": logger.info(f"Media stream stopped for {call_id}") call_manager.db.add_transcript(call_id, "System", "Call dropped unexpectedly.") break except WebSocketDisconnect: logger.info(f"WebSocket disconnected for {call_id}") call_manager.db.add_transcript(call_id, "System", "Call dropped unexpectedly.") break except Exception as e: logger.error(f"Error in WebSocket message processing for {call_id}: {e}") break except Exception as e: logger.error(f"Error in WebSocket endpoint for {call_id}: {e}") finally: if call_id: if call_manager and openai_handler: call_manager.db.update_call_status(call_id, "completed", datetime.now()) openai_handler.cleanup_conversation(call_id) if call_id in active_connections: del active_connections[call_id] try: await websocket.close() except: pass def initialize_system(): global call_manager, openai_handler twilio_id = os.getenv('TWILIO_ACCOUNT_ID') twilio_token = os.getenv('TWILIO_AUTH_TOKEN') twilio_number = os.getenv('TWILIO_PHONE_NUMBER') openai_key = os.getenv('OPENAI_API_KEY') elevenlabs_api_key = os.getenv('ELEVENLABS_API_KEY') if not all([twilio_id, twilio_token, twilio_number, openai_key, elevenlabs_api_key]): raise ValueError("Missing required environment variables") call_manager = TwilioCallManager(twilio_id, twilio_token, twilio_number) openai_handler = OpenAIHandler(openai_key, elevenlabs_api_key, kb_file_path="kb_articles.json") call_manager.set_openai_handler(openai_handler) logger.info("System initialized successfully") def example_usage(): initialize_system() script_template = """ You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company and there is no prior conversation. Your role is to deliver a friendly, concise introduction that: * Builds quick rapport * Clearly states who you are and why you're calling * Keeps things low-pressure and empathetic ## Your message should: 1. Greet the recipient warmly 2. State your name and the reason for calling 3. Acknowledge that this is unexpected 4. Invite a quick moment of their time ## Examples: * "Hi, this is Joanne from Farmonaut—just reaching out because we help teams like yours with mineral detection using satellite data." * "Hey there, I know this is out of the blue—mind if I take 20 seconds to explain why I'm calling?" ## Output Rules: * Your message must be natural, human-like, and under 50 words * Do not include any follow-up questions or requests for info in this first message * This is just your intro line—keep it brief and warm * Do not reference any prior interaction or knowledge """ config = CallConfig( phone_number="+16696666882", campaign_name="Simple Promotion Call", script_template=script_template ) webhook_url = os.getenv('PUBLIC_URL', "https://d4c3b5c10eed.ngrok-free.app") call_id = call_manager.make_outbound_call(config, webhook_url) if call_id: print(f"Call initiated successfully: {call_id}") else: print("Failed to initiate call") if __name__ == '__main__': example_usage() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080)