import os import logging from datetime import datetime from dataclasses import dataclass from fastapi import FastAPI, Form, Response, WebSocket from twilio.rest import Client from twilio.twiml.voice_response import VoiceResponse, Connect, Stream from openai import OpenAI import sqlite3 from dotenv import load_dotenv import requests from io import BytesIO import uuid import json from google.cloud import storage import asyncio import base64 import webrtcvad import pydub import numpy as np import urllib.parse import audioop # For μ-law conversion load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class CallConfig: phone_number: str campaign_name: str script_template: str max_duration: int = 300 # 5 minutes class DatabaseManager: def __init__(self, db_path: str = "calls.db"): self.db_path = db_path self.init_database() def init_database(self): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS calls ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone_number TEXT NOT NULL, campaign_name TEXT, call_sid TEXT UNIQUE, status TEXT, start_time TIMESTAMP, end_time TIMESTAMP, conversation_log TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS call_transcripts ( id INTEGER PRIMARY KEY AUTOINCREMENT, call_sid TEXT, speaker TEXT, message TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (call_sid) REFERENCES calls (call_sid) ) ''') conn.commit() conn.close() def create_call_record(self, phone_number: str, campaign_name: str, call_sid: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO calls (phone_number, campaign_name, call_sid, status, start_time) VALUES (?, ?, ?, ?, ?) ''', (phone_number, campaign_name, call_sid, 'initiated', datetime.now())) conn.commit() conn.close() logger.info(f"Call record created: {call_sid}") def update_call_status(self, call_sid: str, status: str, end_time=None): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if end_time: cursor.execute(''' UPDATE calls SET status=?, end_time=? WHERE call_sid=? ''', (status, end_time, call_sid)) else: cursor.execute(''' UPDATE calls SET status=? WHERE call_sid=? ''', (status, call_sid)) conn.commit() conn.close() def add_transcript(self, call_sid: str, speaker: str, message: str): conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO call_transcripts (call_sid, speaker, message) VALUES (?, ?, ?) ''', (call_sid, speaker, message)) conn.commit() conn.close() class OpenAIHandler: def __init__(self, openai_api_key: str, elevenlabs_api_key: str, kb_file_path: str = "kb_articles.json"): self.client = OpenAI(api_key=openai_api_key) self.elevenlabs_api_key = elevenlabs_api_key self.conversation_history = {} self.silent_attempts = {} self.gcs_client = storage.Client() self.knowledge_base = self.load_knowledge_base(kb_file_path) def load_knowledge_base(self, kb_file_path: str) -> list: try: with open(kb_file_path, 'r') as file: kb = json.load(file) logger.info(f"Loaded knowledge base from {kb_file_path}") return kb except Exception as e: logger.error(f"Error loading knowledge base: {e}") return [] def search_knowledge_base(self, user_input: str) -> str: user_input_lower = user_input.lower() relevant_content = [] for article in self.knowledge_base: if any(keyword.lower() in user_input_lower for keyword in article['keywords']): relevant_content.append(article['content']) return "\n".join(relevant_content) if relevant_content else "" def initialize_conversation(self, call_sid: str, script_template: str): kb_context = "\n".join([article['content'] for article in self.knowledge_base]) system_message = f"{script_template}\n\nKnowledge Base Context:\n{kb_context}" self.conversation_history[call_sid] = [{"role": "system", "content": system_message}] self.silent_attempts[call_sid] = 0 logger.info(f"Initialized conversation history for {call_sid}") def detect_conversation_end(self, call_sid: str, user_input: str) -> bool: try: prompt = ( f"Based on the conversation history and the latest user input: '{user_input}', " f"determine if the conversation should end. Return only 'true' or 'false'. " f"End the conversation if the user indicates they are done, " f"or if the conversation has naturally concluded." ) self.conversation_history[call_sid].append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4.1-mini-2025-04-14", messages=self.conversation_history[call_sid], max_tokens=10, temperature=0.5 ) decision = response.choices[0].message.content.strip().lower() self.conversation_history[call_sid].pop() logger.info(f"Conversation end detection for {call_sid}: {decision}") return decision == 'true' except Exception as e: logger.error(f"Error in detect_conversation_end: {e}") return False def analyze_tone(self, call_sid: str, user_input: str) -> dict: default_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} try: prompt = ( ''' Analyze the tone and emotion of the user’s latest input (`user_input`), using the full context of the ongoing conversation. Return only a JSON object with the following fields: * `"rate"`: one of `"slow"`, `"medium"`, or `"fast"` * `"pitch"`: one of `"low"`, `"medium"`, or `"high"` * `"stability"`: a float between `0.0` and `1.0` * `"similarity_boost"`: a float between `0.0` and `1.0` ''' ) self.conversation_history[call_sid].append({"role": "user", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4.1-mini-2025-04-14", messages=self.conversation_history[call_sid], max_tokens=100, temperature=0.5 ) raw_response = response.choices[0].message.content.strip() tone_params = json.loads(raw_response) if not isinstance(tone_params, dict): raise ValueError("Tone params is not a dictionary") if tone_params.get("rate") not in ["slow", "medium", "fast"]: tone_params["rate"] = default_params["rate"] if tone_params.get("pitch") not in ["low", "medium", "high"]: tone_params["pitch"] = default_params["pitch"] if not (0.0 <= tone_params.get("stability", 0.5) <= 1.0): tone_params["stability"] = default_params["stability"] if not (0.0 <= tone_params.get("similarity_boost", 0.75) <= 1.0): tone_params["similarity_boost"] = default_params["similarity_boost"] self.conversation_history[call_sid].pop() logger.info(f"Tone analysis for {call_sid}: {tone_params}") return tone_params except Exception as e: logger.error(f"Error in analyze_tone for {call_sid}: {e}") self.conversation_history[call_sid].pop() return default_params async def generate_elevenlabs_audio_stream(self, call_sid: str, text: str, tone_params: dict) -> tuple: try: url = "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM/stream" headers = {"xi-api-key": self.elevenlabs_api_key} data = { "text": text, "model_id": "eleven_flash_v2_5", "voice_settings": { "stability": tone_params.get("stability", 0.5), "similarity_boost": tone_params.get("similarity_boost", 0.75), "speaker_boost": True, "style": 1 } } response = requests.post(url, json=data, headers=headers, stream=True) if response.status_code == 200: audio = pydub.AudioSegment.from_file(BytesIO(response.content), format="mp3") audio = audio.set_frame_rate(8000).set_channels(1).set_sample_width(2) raw_audio = np.array(audio.get_array_of_samples(), dtype=np.int16) ulaw_audio = audioop.lin2ulaw(raw_audio.tobytes(), 2) chunk_size = 8000 # 1 second at 8kHz for i in range(0, len(ulaw_audio), chunk_size): chunk = ulaw_audio[i:i+chunk_size] b64_chunk = base64.b64encode(chunk).decode('utf-8') yield b64_chunk logger.info(f"Generated ElevenLabs audio stream for {call_sid}") else: logger.error(f"ElevenLabs API error for {call_sid}: {response.status_code} - {response.text}") yield None except Exception as e: logger.error(f"Error in generate_elevenlabs_audio_stream for {call_sid}: {e}") yield None def get_silent_response(self, call_sid: str, attempt: int) -> tuple: if attempt == 1: message = "Are you still there? I'd love to help with any questions about our satellite technology." tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} else: message = "It seems you're not there. Thanks for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} logger.info(f"Final silent response triggered for {call_sid}") self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return None, message, True self.conversation_history[call_sid].append({"role": "assistant", "content": message}) return None, message, False async def get_ai_response(self, call_sid: str, user_input: str) -> tuple: try: if call_sid not in self.conversation_history: self.initialize_conversation(call_sid, "Default conversation initialization") if not user_input: self.conversation_history[call_sid].append({"role": "user", "content": "Start the conversation."}) else: if self.detect_conversation_end(call_sid, user_input): farewell = "Thank you for your time! Goodbye!" tone_params = {"rate": "slow", "pitch": "high", "stability": 0.7, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": farewell}) return None, farewell, True tone_params = self.analyze_tone(call_sid, user_input) kb_response = self.search_knowledge_base(user_input) if kb_response: user_input = f"{user_input}\n\nRelevant Knowledge Base Information:\n{kb_response}" self.conversation_history[call_sid].append({"role": "user", "content": user_input}) prompt = ( ''' You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company. Your tone should be friendly, empathetic, and concise. ## Guidelines: - Start warmly: Greet and explain why you're calling. - Acknowledge hesitation: Ease pressure if they seem surprised. - Set boundaries: Clarify what you can help with. - Offer value: Ask for relevant details if they’re open. - Confirm follow-up info: Verify name and email if provided. - Close professionally: End politely if uninterested. ## Response Rules: - Keep replies under 50 words in natural, conversational language - Ask clarifying questions if unclear - If uninterested, end politely - If engaged, ask a relevant follow-up or confirm info ''' ) self.conversation_history[call_sid].append({"role": "system", "content": prompt}) response = self.client.chat.completions.create( model="gpt-4.1-mini-2025-04-14", messages=self.conversation_history[call_sid], max_tokens=150, temperature=0.7 ) raw_response = response.choices[0].message.content.strip() raw_response = raw_response.replace('&', '&').replace('<', '<').replace('>', '>').replace('"', '"').replace("'", '') self.conversation_history[call_sid].append({"role": "assistant", "content": raw_response}) logger.info(f"Appended AI response to history for {call_sid}: {raw_response}") is_farewell = "thank you for your time" in raw_response.lower() or "goodbye" in raw_response.lower() tone_params = self.analyze_tone(call_sid, user_input or "Start the conversation.") return tone_params, raw_response, is_farewell except Exception as e: logger.error(f"Error in get_ai_response: {e}") error_message = "I'm sorry, I'm having trouble responding. We will call you back soon!" tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} self.conversation_history[call_sid].append({"role": "assistant", "content": error_message}) return tone_params, error_message, True def upload_to_gcs(self, audio_stream, call_sid: str) -> str: bucket_name = 'farmbase-b2f7e.appspot.com' key = f"audio/{call_sid}_{uuid.uuid4()}.mp3" bucket = self.gcs_client.bucket(bucket_name) blob = bucket.blob(key) blob.upload_from_file(audio_stream, content_type='audio/mpeg') blob.make_public() return blob.public_url def cleanup_conversation(self, call_sid: str): if call_sid in self.conversation_history: logger.info(f"Cleaning up conversation history for {call_sid}") del self.conversation_history[call_sid] if call_sid in self.silent_attempts: logger.info(f"Cleaning up silent attempts for {call_sid}") del self.silent_attempts[call_sid] class TwilioCallManager: def __init__(self, account_sid: str, auth_token: str, from_number: str): self.client = Client(account_sid, auth_token) self.from_number = from_number self.db = DatabaseManager() self.openai_handler = None def set_openai_handler(self, openai_handler: OpenAIHandler): self.openai_handler = openai_handler def make_outbound_call(self, config: CallConfig, webhook_url: str) -> str: try: call = self.client.calls.create( to=config.phone_number, from_=self.from_number, url=f"{webhook_url}/twiml", method='POST', timeout=30, status_callback=f"{webhook_url}/call_status", status_callback_event=['initiated', 'ringing', 'answered', 'completed'] ) self.db.create_call_record(config.phone_number, config.campaign_name, call.sid) if self.openai_handler: self.openai_handler.initialize_conversation(call.sid, config.script_template) logger.info(f"Call initiated: {call.sid} to {config.phone_number}") return call.sid except Exception as e: logger.error(f"Error making call: {e}") return None app = FastAPI() call_manager = None openai_handler = None @app.post("/twiml") async def twiml(): response = VoiceResponse() connect = Connect() stream = Stream(url=f"wss://{os.getenv('PUBLIC_URL').replace('https://', '')}/stream") connect.append(stream) response.append(connect) logger.info(f"TwiML generated: {str(response)}") return Response(content=str(response), media_type="text/xml") @app.post("/call_status") async def call_status(CallSid: str = Form(...), CallStatus: str = Form(...)): logger.info(f"Call {CallSid} status: {CallStatus}") if call_manager: if CallStatus == 'completed': call_manager.db.update_call_status(CallSid, CallStatus, datetime.now()) if openai_handler: openai_handler.cleanup_conversation(CallSid) else: call_manager.db.update_call_status(CallSid, CallStatus) return Response(status_code=200) @app.websocket("/stream") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() call_sid = None stream_sid = None vad = webrtcvad.Vad(2) # Adjusted for better sensitivity audio_buffer = bytearray() is_speaking = False silent_chunks = 0 CHUNK_DURATION_MS = 20 SAMPLE_RATE = 8000 CHUNK_SIZE = int(SAMPLE_RATE * CHUNK_DURATION_MS / 1000) # 160 bytes SILENCE_THRESHOLD = 10 try: while True: message = await websocket.receive_text() data = json.loads(message) event = data.get("event") if event == "connected": logger.info("WebSocket connected") continue if event == "start": stream_sid = data["streamSid"] call_sid = data["start"]["callSid"] logger.info(f"Media stream started: call_sid={call_sid}, stream_sid={stream_sid}") tone_params, text_response, is_farewell = await openai_handler.get_ai_response(call_sid, "") call_manager.db.add_transcript(call_sid, "Assistant", text_response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and not is_farewell: await websocket.send_json({ "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }) if is_farewell: await websocket.send_json({ "event": "clear", "streamSid": stream_sid }) await websocket.close() continue if event == "media": audio_data = base64.b64decode(data["media"]["payload"]) is_speech = vad.is_speech(audio_data, SAMPLE_RATE) if is_speech: if not is_speaking: await websocket.send_json({ "event": "clear", "streamSid": stream_sid }) logger.info(f"User speech detected, clearing audio for {call_sid}") is_speaking = True audio_buffer.clear() audio_buffer.extend(audio_data) silent_chunks = 0 else: if is_speaking: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD: is_speaking = False audio_segment = pydub.AudioSegment( data=audio_buffer, sample_width=1, frame_rate=SAMPLE_RATE, channels=1 ) wav_buffer = BytesIO() audio_segment.export(wav_buffer, format="wav") wav_buffer.seek(0) try: transcript = openai_handler.client.audio.transcriptions.create( model="whisper-1", file=wav_buffer, response_format="text" ) call_manager.db.add_transcript(call_sid, "User", transcript) logger.info(f"Transcribed user input for {call_sid}: {transcript}") tone_params, text_response, is_farewell = await openai_handler.get_ai_response(call_sid, transcript) call_manager.db.add_transcript(call_sid, "Assistant", text_response) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and not is_farewell: await websocket.send_json({ "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }) if is_farewell: await websocket.send_json({ "event": "clear", "streamSid": stream_sid }) await websocket.close() except Exception as e: logger.error(f"Transcription error for {call_sid}: {e}") error_message = "Sorry, I couldn't understand you. Let's try again." tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} call_manager.db.add_transcript(call_sid, "Assistant", error_message) async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, error_message, tone_params): if chunk: await websocket.send_json({ "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }) audio_buffer.clear() else: silent_chunks += 1 if silent_chunks >= SILENCE_THRESHOLD * 2: openai_handler.silent_attempts[call_sid] = openai_handler.silent_attempts.get(call_sid, 0) + 1 _, text_response, is_farewell = openai_handler.get_silent_response(call_sid, openai_handler.silent_attempts[call_sid]) call_manager.db.add_transcript(call_sid, "Assistant", text_response) tone_params = {"rate": "medium", "pitch": "high", "stability": 0.5, "similarity_boost": 0.75} async for chunk in openai_handler.generate_elevenlabs_audio_stream(call_sid, text_response, tone_params): if chunk and not is_farewell: await websocket.send_json({ "event": "media", "streamSid": stream_sid, "media": {"payload": chunk} }) if is_farewell: await websocket.send_json({ "event": "clear", "streamSid": stream_sid }) await websocket.close() continue if event == "stop": logger.info(f"Media stream stopped for {call_sid}") if call_manager and openai_handler: call_manager.db.update_call_status(call_sid, "completed", datetime.now()) openai_handler.cleanup_conversation(call_sid) await websocket.close() break except Exception as e: logger.error(f"Error in WebSocket for {call_sid}: {e}") if call_manager and openai_handler: call_manager.db.update_call_status(call_sid, "failed", datetime.now()) openai_handler.cleanup_conversation(call_sid) await websocket.close() def initialize_system(): global call_manager, openai_handler twilio_sid = os.getenv('TWILIO_ACCOUNT_SID') twilio_token = os.getenv('TWILIO_AUTH_TOKEN') twilio_number = os.getenv('TWILIO_PHONE_NUMBER') openai_key = os.getenv('OPENAI_API_KEY') elevenlabs_api_key = os.getenv('ELEVENLABS_API_KEY') if not all([twilio_sid, twilio_token, twilio_number, openai_key, elevenlabs_api_key]): raise ValueError("Missing required environment variables") call_manager = TwilioCallManager(twilio_sid, twilio_token, twilio_number) openai_handler = OpenAIHandler(openai_key, elevenlabs_api_key, kb_file_path="kb_articles.json") call_manager.set_openai_handler(openai_handler) logger.info("System initialized successfully") def example_usage(): initialize_system() script_template = """ You are an AI Sales Representative making a cold outbound call. The recipient has no prior knowledge of your company and there is no prior conversation. Your role is to deliver a friendly, concise introduction that: * Builds quick rapport * Clearly states who you are and why you're calling * Keeps things low-pressure and empathetic ## Your message should: 1. Greet the recipient warmly 2. State your name and the reason for calling 3. Acknowledge that this is unexpected 4. Invite a quick moment of their time ## Examples: * “Hi, this is Joanne from Farmonaut—just reaching out because we help teams like yours with mineral detection using satellite data.” * “Hey there, I know this is out of the blue—mind if I take 20 seconds to explain why I’m calling?” ## Output Rules: * Your message must be natural, human-like, and under 50 words * Do not include any follow-up questions or requests for info in this first message * This is just your intro line—keep it brief and warm * Do not reference any prior interaction or knowledge """ config = CallConfig( phone_number="+918149141050", campaign_name="Simple Promotion Call", script_template=script_template ) webhook_url = os.getenv('PUBLIC_URL', "https://d5bb0fd567df.ngrok-free.app") call_sid = call_manager.make_outbound_call(config, webhook_url) if call_sid: print(f"Call initiated successfully: {call_sid}") else: print("Failed to initiate call") if __name__ == '__main__': #initialize_system() example_usage() import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)