from helpers.freeplay_observer import FreeplayObserver
from freeplay import Freeplay, SessionInfo
import os
import sys
from dotenv import load_dotenv
from fastapi import WebSocket
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
load_dotenv(override=True)
async def run_bot(
websocket_client: WebSocket,
stream_sid: str,
call_sid: str,
testing: bool,
session: SessionInfo,
):
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=serializer,
),
)
# Example of getting variables from a local function
required_information = select_meeting_details()
# Initialize Freeplay client
fp_client = Freeplay(
freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
api_base=os.getenv("FREEPLAY_API_BASE"),
)
# Get the unformatted prompt from Freeplay
unformatted_prompt = fp_client.prompts.get(
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=os.getenv("PROMPT_NAME"),
environment="latest",
)
formatted_prompt = unformatted_prompt.bind(
variables={"required_information": required_information},
history=[],
).format()
# Pass the formatted prompt to the LLM
llm = OpenAILLMService(
model=unformatted_prompt.prompt_info.model,
tools=(
unformatted_prompt.tool_schema if unformatted_prompt.tool_schema else None
),
api_key=os.getenv("OPENAI_API_KEY"),
**unformatted_prompt.prompt_info.model_parameters,
)
# Pass the Deepgram API key to the DeepgramSTTService
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id=os.getenv("CARTESIA_VOICE_ID"),
push_silence_after_stop=testing,
)
context = OpenAILLMContext(formatted_prompt.llm_prompt)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: This buffer is what allows us to capture the audio from the user and bot using the callback handlers.
# NOTE: Watch out! This will save all the conversation in memory. You can
# pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor(
sample_rate=16000, # Optional: desired output sample rate
num_channels=1, # 1 for mono, 2 for stereo
buffer_size=0, # Size in bytes to trigger buffer callbacks
user_continuous_stream=False,
enable_turn_audio=True,
)
freeplay_observer = FreeplayObserver(
fp_client=fp_client,
unformatted_prompt=unformatted_prompt,
environment=os.getenv("FREEPLAY_ENVIRONMENT"),
variables={
"required_information": required_information,
},
)
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True, # This is used to track the usage of the LLM
),
observers=[
freeplay_observer
], # Use the FreeplayObserver to record the audio to Freeplay
)
# Additional handlers to support Freeplay Observer
# save audio bytes from user and store in freeplay_observer
@audiobuffer.event_handler("on_user_turn_audio_data")
async def on_user_turn_audio_data(buffer, audio, sample_rate, num_channels):
if audio and not freeplay_observer._bot_audio:
# aggregate user audio because this event could fire multiple times
# before bot responds
freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
audio
)
freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
freeplay_observer._turn_user_audio,
sample_rate,
"user",
prepend_silence_secs=1,
)
elif audio and freeplay_observer._bot_audio:
freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
audio
)
freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
freeplay_observer._turn_user_audio,
sample_rate,
"user",
prepend_silence_secs=1,
)
await freeplay_observer.record_to_freeplay()
# save audio bytes from bot and store in freeplay_observer
@audiobuffer.event_handler("on_bot_turn_audio_data")
async def on_bot_turn_audio_data(buffer, audio, sample_rate, num_channels):
# this assumes the user always speaks first and would cut off
# the first turn of the bot
if audio and not freeplay_observer._user_audio:
# aggregate bot audio because this event could fire multiple times
# before user responds
freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
audio, sample_rate, "bot", prepend_silence_secs=1
)
elif audio and freeplay_observer._user_audio:
freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
audio, sample_rate, "bot", prepend_silence_secs=1
)
await freeplay_observer.record_to_freeplay()
##############################################################################
# FreeplayObserver.py
##############################################################################
import os
import io
import wave
import time
import base64
import datetime
import uuid
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection
from freeplay import Freeplay, RecordPayload, CallInfo, SessionInfo
from dotenv import load_dotenv
import asyncio
import functools
from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TranscriptionFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.services.openai.base_llm import BaseOpenAILLMService
load_dotenv(override=True)
class FreeplayObserver(BaseObserver):
def __init__(
self,
fp_client: Freeplay,
unformatted_prompt: str = None,
template_name: str = os.getenv("PROMPT_NAME") or None,
environment: str = "latest",
variables: dict = {},
):
super().__init__()
self.start_llm_interaction = 0
self.end_llm_interaction = 0
self.llm_completion_latency = 0
self.call_id = str(uuid.uuid4()) # Has to be str to record to Freeplay
# Audio related properties
self.sample_width = 2
self.num_channels = 1
self.sample_rate = 16000
self._bot_audio = bytearray()
self._user_audio = bytearray()
self._turn_user_audio = bytearray()
self.user_speaking = False
self.bot_speaking = False
self.fp_client = fp_client
self.session = self.fp_client.sessions.create()
# Freeplay Params
self.template_name = template_name
self.environment = environment
self.unformatted_prompt = unformatted_prompt
self.variables = variables
# Conversation Params
self.conversation_id = self._new_conv_id()
self.conversation_history = []
self.most_recent_user_message = None
self.most_recent_completion = None
def _new_conv_id(self) -> str:
"""Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _reset_recent_messages(self):
"""Reset all temporary message and audio storage."""
self.most_recent_user_message = None
self.most_recent_completion = None
self._user_audio = bytearray()
self._bot_audio = bytearray()
self._turn_user_audio = bytearray()
# self.llm_completion_latency = 0
async def record_to_freeplay(self):
"""Record the current interaction to Freeplay as a new trace."""
# Create a new trace for this interaction
trace = self.session.create_trace(
input=self.most_recent_user_message,
custom_metadata={
"conversation_id": str(self.conversation_id),
},
)
# Add user message to conversation history
self.conversation_history.append(
{
"role": "user",
"content": [
{"type": "text", "text": self.most_recent_user_message},
{
"type": "input_audio",
"input_audio": {
"data": base64.b64encode(self._user_audio).decode("utf-8"),
"format": "wav",
},
},
],
},
)
# Bind the variables to the prompt
if self.unformatted_prompt:
formatted = self.unformatted_prompt.bind(
variables=self.variables,
history=self.conversation_history,
).format()
else:
# Run in executor to avoid blocking
loop = asyncio.get_running_loop()
formatted = await loop.run_in_executor(
None,
self.fp_client.prompts.get_formatted,
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=self.template_name,
environment=self.environment,
history=self.conversation_history,
variables=self.variables,
)
# Calculate latency for the LLM interaction
# Convert nanoseconds to seconds for proper timing
latency_seconds = self.get_llm_response_latency_seconds()
end = time.time()
start = end - latency_seconds
try:
print(f"_____* self._bot_audio: {len(self._bot_audio)}")
# Prepare metadata and record payload
custom_metadata = {"caller_id": self.call_id}
# Prepare assistants response message (mimicing the format of the llm provider message)
assistant_msg = {
"role": "assistant",
"content": [
{"type": "text", "text": self.most_recent_completion},
],
"audio": {
"id": self.conversation_id,
"data": base64.b64encode(self._bot_audio).decode("utf-8"),
"expires_at": 1729234747,
"transcript": self.most_recent_completion,
},
}
# Add assistant's response to conversation history
self.conversation_history.append(assistant_msg)
record = RecordPayload(
project_id=PROJECT_ID,
all_messages=[
*formatted.llm_prompt,
assistant_msg, # Add the assistant's response to the record call
],
session_info=SessionInfo(
self.session.session_id, custom_metadata=custom_metadata
),
inputs={},
prompt_version_info=formatted.prompt_info,
call_info=CallInfo.from_prompt_info(formatted.prompt_info, start, end),
trace_info=trace,
)
# Create recording in Freeplay
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.fp_client.recordings.create, record)
# Record output to trace
await loop.run_in_executor(
None,
functools.partial(
trace.record_output,
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
output=self.most_recent_completion,
eval_results={},
),
)
print(
f"✅ Recorded interaction #{len(self.conversation_history) // 2} to Freeplay - LLM response time: {self.get_llm_response_latency_seconds():.3f}s",
flush=True,
)
# Reset only audio and current message data, keep conversation history
self._reset_recent_messages()
except Exception as e:
print(f"❌ Error recording to Freeplay: {e}", flush=True)
# Still reset audio buffers to prevent accumulation
# audio buffers are overwritten in event handler
self._reset_recent_messages()
async def make_wav_bytes(
self, pcm: bytes, sample_rate: int, voice: str, prepend_silence_secs: int = 1
) -> bytes:
"""Convert PCM audio data to WAV format with optional silence prepend."""
if prepend_silence_secs > 0:
silence_samples = int(
self.sample_rate
* self.sample_width
* self.num_channels
* prepend_silence_secs
)
silence = b"\x00" * silence_samples
pcm = silence + pcm
with io.BytesIO() as buf:
with wave.open(buf, "wb") as wf:
wf.setnchannels(self.num_channels)
wf.setsampwidth(self.sample_width)
wf.setframerate(sample_rate)
wf.writeframes(pcm)
return buf.getvalue()
async def on_push_frame(self, data: FramePushed):
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
# Create direction arrow
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
if isinstance(frame, LLMFullResponseStartFrame):
print(f"LLMFullResponseFrame: START {src} {arrow} {dst}", flush=True)
elif isinstance(frame, LLMFullResponseEndFrame):
print(f"LLMFullResponseFrame: END {src} {arrow} {dst}", flush=True)
elif isinstance(frame, TranscriptionFrame):
# Capture user if bot talks first
if self.most_recent_user_message is None:
self.most_recent_user_message = frame.text
elif isinstance(frame, OpenAILLMContextFrame):
messages = frame.context.messages
# Extract user message and completion from context
# NOTE: this replaces the TranscriptionFrame results, as this maps excatly what the llm recived.
user_messages = [m for m in messages if m.get("role") == "user"]
if user_messages:
self.most_recent_user_message = user_messages[-1].get("content")
completions = [m for m in messages if m.get("role") == "assistant"]
if completions:
self.most_recent_completion = completions[-1].get("content")
# if (
# self.llm_completion_latency
# and self.most_recent_user_message
# and self.most_recent_completion
# ):
# # reset latency
# self.llm_completion_latency = 0
# Get relevant latency metrics for the LLM interaction
if (
isinstance(frame, OpenAILLMContextFrame)
and isinstance(src, LLMUserContextAggregator)
and isinstance(dst, BaseOpenAILLMService)
):
self.start_llm_interaction = timestamp
print(f"_____freeplay-observer.py OpenAILLMContextFrame START: {timestamp}")
elif isinstance(frame, LLMFullResponseEndFrame) and isinstance(
src, BaseOpenAILLMService
):
self.end_llm_interaction = timestamp
# update latency tally
self.llm_completion_latency = (
self.end_llm_interaction - self.start_llm_interaction
)
print(
f"_____freeplay-observer.py * set self.llm_completion_latency: {self.llm_completion_latency} ({self.get_llm_response_latency_seconds():.3f}s)"
)
def get_llm_response_latency_seconds(self):
"""Convert the raw nanosecond LLM response latency to seconds."""
return self.llm_completion_latency / 1_000_000_000