Documentation Index
Fetch the complete documentation index at: https://docs.freeplay.ai/llms.txt
Use this file to discover all available pages before exploring further.
1. Initialize the Freeplay Client
2. Fetch your formatted prompt
3. Pass the prompt to Pipecats LLM Service (Processor)
4. Initialize the FreeplayProcessor Service
5. Add freeplay_logger to your Pipeline
6. FreeplayProcessor
The FreeplayProcessor is based on Pipecats LLMLogObserver however, it needs to be a service and not a logger in order to log the correct number of completions.Examples
###############################################################################
# Note: It is required to modify the processes_frame function in pipecat’s
# base_llm.py to pass along the OpenAILLMContext frame,
# this makes the handling easier in the FreeplayProcessor - process_frame
###############################################################################
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
context = None
if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
await self.push_frame(frame, direction) # Add this line here to pass frame along
elif isinstance(frame, LLMMessagesFrame):
context = OpenAILLMContext.from_messages(frame.messages)
elif isinstance(frame, VisionImageRawFrame):
context = OpenAILLMContext()
context.add_image_frame_message(
format=frame.format, size=frame.size, image=frame.image, text=frame.text
)
elif isinstance(frame, LLMUpdateSettingsFrame):
await self._update_settings(frame.settings)
else:
await self.push_frame(frame, direction)
###############################################################################
from helpers.freeplay_frame import FreeplayProcessor
from freeplay import Freeplay, SessionInfo
fp_client = Freeplay(
freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
api_base=os.getenv("FREEPLAY_API_BASE")
)
# Get the unformatted prompt from Freeplay
# Get the unformatted prompt here and then bind it later on.
# This reduces latency in the system.
unformatted_prompt = fp_client.prompts.get(
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=os.getenv("PROMPT_NAME"),
environment="latest",
)
formatted_prompt = unformatted_prompt.bind(
variables={<optional vars>},
history=[],
).format()
# Pass the formatted prompt to the LLM
llm = OpenAILLMService(model=formatted_prompt.prompt_info.model,
tools=formatted_prompt.tool_schema if formatted_prompt.tool_schema else None,
api_key=os.getenv("OPENAI_API_KEY"),
**formatted_prompt.prompt_info.model_parameters)
# Pass the Freeplay client to the FreeplayProcessor
freeplay_processor = FreeplayProcessor(fp_client=fp_client, template_name="voice-assistant", session=session, debug=True)
#....Additional Pipeline Configuration...
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
freeplay_processor, # Freeplay Logger (after tts so it can capture assistant audio)
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
#####################
# FreeplayLLMLogger #
#####################
import os
import io
import wave
import time
import base64
import datetime
from pipecat.frames.frames import (
Frame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
InputAudioRawFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
MetricsFrame,
TTSAudioRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from freeplay import (
Freeplay,
RecordPayload,
CallInfo,
SessionInfo,
)
from freeplay.resources.prompts import PromptInfo
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
class FreeplayProcessor(FrameProcessor):
"""Logs LLM interactions and audio to Freeplay with simplified structure."""
def __init__(
self,
fp_client: Freeplay,
template_name: str,
session: SessionInfo = None,
required_information: str = None,
unformatted_prompt: PromptInfo = None,
):
super().__init__()
self.fp_client = fp_client
self.template_name = template_name
self.conversation_id = self._new_conv_id()
self.total_completion_time = 0
self.required_information = required_information
self.deepgram_latency = 0
# Audio related properties
self.sample_width = 2
self.sample_rate = 8000
self.num_channels = 1
self._user_audio = bytearray()
self._bot_audio = bytearray()
self.user_speaking = False
self.bot_speaking = False
# Freeplay related properties
self.conversation_history = []
self.session = session
self.most_recent_user_message = None
self.most_recent_completion = None
self.unformatted_prompt = unformatted_prompt
self.reset_recent_messages()
def _new_conv_id(self) -> str:
"""Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def reset_recent_messages(self):
"""Reset all temporary message and audio storage."""
self.most_recent_user_message = None
self.most_recent_completion = None
self._user_audio = bytearray()
self._bot_audio = bytearray()
self.total_completion_time = 0
self.deepgram_latency = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle Freeplay logging."""
await super().process_frame(frame, direction)
# Handle LLM response frames
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
print(f"LLMFullResponseFrame: {event}", flush=True)
# Handle LLM context frame - this is where we log to Freeplay
elif isinstance(frame, OpenAILLMContextFrame):
messages = frame.context.messages
# Extract user message and completion from context
user_messages = [m for m in messages if m.get("role") == "user"]
if user_messages:
self.most_recent_user_message = user_messages[-1].get("content")
completions = [m for m in messages if m.get("role") == "assistant"]
if completions:
self.most_recent_completion = completions[-1].get("content")
# Log to Freeplay when we have both user input and completion
if self.most_recent_user_message and self.most_recent_completion:
self._record_to_freeplay()
# Handle audio state changes
elif isinstance(frame, UserStartedSpeakingFrame):
self.user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self.user_speaking = False
elif isinstance(frame, BotStartedSpeakingFrame):
self.bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self.bot_speaking = False
# # Handle audio data
elif isinstance(frame, InputAudioRawFrame):
if self.user_speaking:
self._user_audio.extend(frame.audio)
elif isinstance(frame, TTSAudioRawFrame):
if self.bot_speaking:
self._bot_audio.extend(frame.audio)
# Handle metrics for LLM completion time
elif isinstance(frame, MetricsFrame):
self.metrics = frame.data
for metric in frame.data:
if isinstance(metric, ProcessingMetricsData):
if "LLMService" in metric.processor:
self.total_completion_time = metric.value
elif isinstance(metric, TTFBMetricsData):
if "DeepgramSTTService" in metric.processor:
self.deepgram_latency += metric.value
# Pass frame to next processor
await self.push_frame(frame, direction)
def _record_to_freeplay(self):
"""Record the current conversation state to Freeplay."""
# Create a new trace for this interaction
trace = self.session.create_trace(
input=self.most_recent_user_message,
custom_metadata={
"deepgram_latency": self.deepgram_latency,
},
)
self.conversation_history.append(
{
"role": "user",
"content": [
{"type": "text", "text": self.most_recent_user_message},
{
"type": "input_audio",
"input_audio": {
"data": base64.b64encode(
self._make_wav_bytes(
self._user_audio, prepend_silence_secs=1
)
).decode("utf-8"),
"format": "wav",
},
},
],
},
)
# Bind the variables to the prompt
if self.unformatted_prompt:
formatted = self.unformatted_prompt.bind(
variables={"required_information": self.required_information},
history=self.conversation_history,
).format()
else:
# Get formatted prompt. Note this adds latency to the pipeline
formatted = self.fp_client.prompts.get_formatted(
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=self.template_name,
environment="latest",
history=self.conversation_history,
variables={"required_information": self.required_information},
)
# Calculate latency for the LLM interaction
start, end = time.time(), time.time() + self.total_completion_time
try:
# Prepare metadata and record payload
custom_metadata = {
"conversation_id": str(self.conversation_id),
}
# Add assistant's response to conversation history
last_message = {
"role": "assistant",
"content": [
{"type": "text", "text": self.most_recent_completion},
],
"audio": {
"id": self.conversation_id,
"data": base64.b64encode(
self._make_wav_bytes(self._bot_audio, prepend_silence_secs=1)
).decode("utf-8"),
"expires_at": 1729234747,
"transcript": self.most_recent_completion,
},
}
self.conversation_history.append(last_message)
# Create recording in Freeplay
self.fp_client.recordings.create(
RecordPayload(
project_id=PROJECT_ID,
all_messages=[
*formatted.llm_prompt,
last_message, # Add the last message to the record call
],
session_info=SessionInfo(
self.session.session_id, custom_metadata=custom_metadata
),
inputs={"required_information": self.required_information},
prompt_version_info=formatted.prompt_info,
call_info=CallInfo.from_prompt_info(
formatted.prompt_info, start, end
),
trace_info=trace,
)
)
# Record output to trace
trace.record_output(
os.getenv("FREEPLAY_PROJECT_ID"),
self.most_recent_completion,
)
print(
f"Successfully recorded to Freeplay - completion time: {self.total_completion_time}s",
flush=True,
)
self.reset_recent_messages()
except Exception as e:
print(f"Error recording to Freeplay: {e}", flush=True)
self.reset_recent_messages()
def _make_wav_bytes(self, pcm: bytes, prepend_silence_secs: int = 1) -> bytes:
"""Convert PCM audio data to WAV format with optional silence prepend."""
buf = io.BytesIO()
if prepend_silence_secs > 0:
silence_samples = int(
self.sample_rate
* self.sample_width
* self.num_channels
* prepend_silence_secs
)
silence = b"\x00" * silence_samples
pcm = silence + pcm
with wave.open(buf, "wb") as wf:
wf.setnchannels(self.num_channels)
wf.setsampwidth(self.sample_width)
wf.setframerate(self.sample_rate)
wf.writeframes(pcm)
return buf.getvalue()

