1. Freeplay & LLM Initialization
Initialize the Freeplay client, get the unformatted prompt (optionally bind variables to it). Initialize the LLM service for the pipeline using info from Freeplay2. AudioBuffer
Add the audio buffer processor, this allows for us to grab the right audio information to log to Freeplay.3. Add the FreeplayObserver to PipelineTask
4. Callbacks to log to Freeplay
Add call backs to handle user and bot audio, when you have the right details, log to Freeplay5. FreeplayObserver
The entire FreeplayObserver classExamples
Copy
Ask AI
from helpers.freeplay_observer import FreeplayObserver
from freeplay import Freeplay, SessionInfo
import os
import sys
from dotenv import load_dotenv
from fastapi import WebSocket
from loguru import logger
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.network.fastapi_websocket import (
FastAPIWebsocketParams,
FastAPIWebsocketTransport,
)
load_dotenv(override=True)
async def run_bot(
websocket_client: WebSocket,
stream_sid: str,
call_sid: str,
testing: bool,
session: SessionInfo,
):
serializer = TwilioFrameSerializer(
stream_sid=stream_sid,
call_sid=call_sid,
account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
)
transport = FastAPIWebsocketTransport(
websocket=websocket_client,
params=FastAPIWebsocketParams(
audio_in_enabled=True,
audio_out_enabled=True,
add_wav_header=False,
vad_enabled=True,
vad_analyzer=SileroVADAnalyzer(),
vad_audio_passthrough=True,
serializer=serializer,
),
)
# Example of getting variables from a local function
required_information = select_meeting_details()
# Initialize Freeplay client
fp_client = Freeplay(
freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
api_base=os.getenv("FREEPLAY_API_BASE"),
)
# Get the unformatted prompt from Freeplay
unformatted_prompt = fp_client.prompts.get(
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=os.getenv("PROMPT_NAME"),
environment="latest",
)
formatted_prompt = unformatted_prompt.bind(
variables={"required_information": required_information},
history=[],
).format()
# Pass the formatted prompt to the LLM
llm = OpenAILLMService(
model=unformatted_prompt.prompt_info.model,
tools=(
unformatted_prompt.tool_schema if unformatted_prompt.tool_schema else None
),
api_key=os.getenv("OPENAI_API_KEY"),
**unformatted_prompt.prompt_info.model_parameters,
)
# Pass the Deepgram API key to the DeepgramSTTService
stt = DeepgramSTTService(
api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True
)
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id=os.getenv("CARTESIA_VOICE_ID"),
push_silence_after_stop=testing,
)
context = OpenAILLMContext(formatted_prompt.llm_prompt)
context_aggregator = llm.create_context_aggregator(context)
# NOTE: This buffer is what allows us to capture the audio from the user and bot using the callback handlers.
# NOTE: Watch out! This will save all the conversation in memory. You can
# pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor(
sample_rate=16000, # Optional: desired output sample rate
num_channels=1, # 1 for mono, 2 for stereo
buffer_size=0, # Size in bytes to trigger buffer callbacks
user_continuous_stream=False,
enable_turn_audio=True,
)
freeplay_observer = FreeplayObserver(
fp_client=fp_client,
unformatted_prompt=unformatted_prompt,
environment=os.getenv("FREEPLAY_ENVIRONMENT"),
variables={
"required_information": required_information,
},
)
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
audio_in_sample_rate=8000,
audio_out_sample_rate=8000,
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True, # This is used to track the usage of the LLM
),
observers=[
freeplay_observer
], # Use the FreeplayObserver to record the audio to Freeplay
)
# Additional handlers to support Freeplay Observer
# save audio bytes from user and store in freeplay_observer
@audiobuffer.event_handler("on_user_turn_audio_data")
async def on_user_turn_audio_data(buffer, audio, sample_rate, num_channels):
if audio and not freeplay_observer._bot_audio:
# aggregate user audio because this event could fire multiple times
# before bot responds
freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
audio
)
freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
freeplay_observer._turn_user_audio,
sample_rate,
"user",
prepend_silence_secs=1,
)
elif audio and freeplay_observer._bot_audio:
freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
audio
)
freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
freeplay_observer._turn_user_audio,
sample_rate,
"user",
prepend_silence_secs=1,
)
await freeplay_observer.record_to_freeplay()
# save audio bytes from bot and store in freeplay_observer
@audiobuffer.event_handler("on_bot_turn_audio_data")
async def on_bot_turn_audio_data(buffer, audio, sample_rate, num_channels):
# this assumes the user always speaks first and would cut off
# the first turn of the bot
if audio and not freeplay_observer._user_audio:
# aggregate bot audio because this event could fire multiple times
# before user responds
freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
audio, sample_rate, "bot", prepend_silence_secs=1
)
elif audio and freeplay_observer._user_audio:
freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
audio, sample_rate, "bot", prepend_silence_secs=1
)
await freeplay_observer.record_to_freeplay()
##############################################################################
# FreeplayObserver.py
##############################################################################
import os
import io
import wave
import time
import base64
import datetime
import uuid
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection
from freeplay import Freeplay, RecordPayload, CallInfo, SessionInfo
from dotenv import load_dotenv
import asyncio
import functools
from pipecat.frames.frames import (
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
TranscriptionFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.services.openai.base_llm import BaseOpenAILLMService
load_dotenv(override=True)
class FreeplayObserver(BaseObserver):
def __init__(
self,
fp_client: Freeplay,
unformatted_prompt: str = None,
template_name: str = os.getenv("PROMPT_NAME") or None,
environment: str = "latest",
variables: dict = {},
):
super().__init__()
self.start_llm_interaction = 0
self.end_llm_interaction = 0
self.llm_completion_latency = 0
self.call_id = str(uuid.uuid4()) # Has to be str to record to Freeplay
# Audio related properties
self.sample_width = 2
self.num_channels = 1
self.sample_rate = 16000
self._bot_audio = bytearray()
self._user_audio = bytearray()
self._turn_user_audio = bytearray()
self.user_speaking = False
self.bot_speaking = False
self.fp_client = fp_client
self.session = self.fp_client.sessions.create()
# Freeplay Params
self.template_name = template_name
self.environment = environment
self.unformatted_prompt = unformatted_prompt
self.variables = variables
# Conversation Params
self.conversation_id = self._new_conv_id()
self.conversation_history = []
self.most_recent_user_message = None
self.most_recent_completion = None
def _new_conv_id(self) -> str:
"""Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def _reset_recent_messages(self):
"""Reset all temporary message and audio storage."""
self.most_recent_user_message = None
self.most_recent_completion = None
self._user_audio = bytearray()
self._bot_audio = bytearray()
self._turn_user_audio = bytearray()
# self.llm_completion_latency = 0
async def record_to_freeplay(self):
"""Record the current interaction to Freeplay as a new trace."""
# Create a new trace for this interaction
trace = self.session.create_trace(
input=self.most_recent_user_message,
custom_metadata={
"conversation_id": str(self.conversation_id),
},
)
# Add user message to conversation history
self.conversation_history.append(
{
"role": "user",
"content": [
{"type": "text", "text": self.most_recent_user_message},
{
"type": "input_audio",
"input_audio": {
"data": base64.b64encode(self._user_audio).decode("utf-8"),
"format": "wav",
},
},
],
},
)
# Bind the variables to the prompt
if self.unformatted_prompt:
formatted = self.unformatted_prompt.bind(
variables=self.variables,
history=self.conversation_history,
).format()
else:
# Run in executor to avoid blocking
loop = asyncio.get_running_loop()
formatted = await loop.run_in_executor(
None,
self.fp_client.prompts.get_formatted,
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=self.template_name,
environment=self.environment,
history=self.conversation_history,
variables=self.variables,
)
# Calculate latency for the LLM interaction
# Convert nanoseconds to seconds for proper timing
latency_seconds = self.get_llm_response_latency_seconds()
end = time.time()
start = end - latency_seconds
try:
print(f"_____* self._bot_audio: {len(self._bot_audio)}")
# Prepare metadata and record payload
custom_metadata = {"caller_id": self.call_id}
# Prepare assistants response message (mimicing the format of the llm provider message)
assistant_msg = {
"role": "assistant",
"content": [
{"type": "text", "text": self.most_recent_completion},
],
"audio": {
"id": self.conversation_id,
"data": base64.b64encode(self._bot_audio).decode("utf-8"),
"expires_at": 1729234747,
"transcript": self.most_recent_completion,
},
}
# Add assistant's response to conversation history
self.conversation_history.append(assistant_msg)
record = RecordPayload(
project_id=PROJECT_ID,
all_messages=[
*formatted.llm_prompt,
assistant_msg, # Add the assistant's response to the record call
],
session_info=SessionInfo(
self.session.session_id, custom_metadata=custom_metadata
),
inputs={},
prompt_version_info=formatted.prompt_info,
call_info=CallInfo.from_prompt_info(formatted.prompt_info, start, end),
trace_info=trace,
)
# Create recording in Freeplay
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self.fp_client.recordings.create, record)
# Record output to trace
await loop.run_in_executor(
None,
functools.partial(
trace.record_output,
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
output=self.most_recent_completion,
eval_results={},
),
)
print(
f"✅ Recorded interaction #{len(self.conversation_history) // 2} to Freeplay - LLM response time: {self.get_llm_response_latency_seconds():.3f}s",
flush=True,
)
# Reset only audio and current message data, keep conversation history
self._reset_recent_messages()
except Exception as e:
print(f"❌ Error recording to Freeplay: {e}", flush=True)
# Still reset audio buffers to prevent accumulation
# audio buffers are overwritten in event handler
self._reset_recent_messages()
async def make_wav_bytes(
self, pcm: bytes, sample_rate: int, voice: str, prepend_silence_secs: int = 1
) -> bytes:
"""Convert PCM audio data to WAV format with optional silence prepend."""
if prepend_silence_secs > 0:
silence_samples = int(
self.sample_rate
* self.sample_width
* self.num_channels
* prepend_silence_secs
)
silence = b"\x00" * silence_samples
pcm = silence + pcm
with io.BytesIO() as buf:
with wave.open(buf, "wb") as wf:
wf.setnchannels(self.num_channels)
wf.setsampwidth(self.sample_width)
wf.setframerate(sample_rate)
wf.writeframes(pcm)
return buf.getvalue()
async def on_push_frame(self, data: FramePushed):
src = data.source
dst = data.destination
frame = data.frame
direction = data.direction
timestamp = data.timestamp
# Create direction arrow
arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"
if isinstance(frame, LLMFullResponseStartFrame):
print(f"LLMFullResponseFrame: START {src} {arrow} {dst}", flush=True)
elif isinstance(frame, LLMFullResponseEndFrame):
print(f"LLMFullResponseFrame: END {src} {arrow} {dst}", flush=True)
elif isinstance(frame, TranscriptionFrame):
# Capture user if bot talks first
if self.most_recent_user_message is None:
self.most_recent_user_message = frame.text
elif isinstance(frame, OpenAILLMContextFrame):
messages = frame.context.messages
# Extract user message and completion from context
# NOTE: this replaces the TranscriptionFrame results, as this maps excatly what the llm recived.
user_messages = [m for m in messages if m.get("role") == "user"]
if user_messages:
self.most_recent_user_message = user_messages[-1].get("content")
completions = [m for m in messages if m.get("role") == "assistant"]
if completions:
self.most_recent_completion = completions[-1].get("content")
# if (
# self.llm_completion_latency
# and self.most_recent_user_message
# and self.most_recent_completion
# ):
# # reset latency
# self.llm_completion_latency = 0
# Get relevant latency metrics for the LLM interaction
if (
isinstance(frame, OpenAILLMContextFrame)
and isinstance(src, LLMUserContextAggregator)
and isinstance(dst, BaseOpenAILLMService)
):
self.start_llm_interaction = timestamp
print(f"_____freeplay-observer.py OpenAILLMContextFrame START: {timestamp}")
elif isinstance(frame, LLMFullResponseEndFrame) and isinstance(
src, BaseOpenAILLMService
):
self.end_llm_interaction = timestamp
# update latency tally
self.llm_completion_latency = (
self.end_llm_interaction - self.start_llm_interaction
)
print(
f"_____freeplay-observer.py * set self.llm_completion_latency: {self.llm_completion_latency} ({self.get_llm_response_latency_seconds():.3f}s)"
)
def get_llm_response_latency_seconds(self):
"""Convert the raw nanosecond LLM response latency to seconds."""
return self.llm_completion_latency / 1_000_000_000

