1. Initialize the Freeplay Client
2. Fetch your formatted prompt
3. Pass the prompt to Pipecats LLM Service (Processor)
4. Initialize the FreeplayProcessor Service
5. Add freeplay_logger to your Pipeline
6. FreeplayProcessor
The FreeplayProcessor is based on Pipecats LLMLogObserver however, it needs to be a service and not a logger in order to log the correct number of completions.Examples
Copy
Ask AI
###############################################################################
# Note: It is required to modify the processes_frame function in pipecat’s
# base_llm.py to pass along the OpenAILLMContext frame,
# this makes the handling easier in the FreeplayProcessor - process_frame
###############################################################################
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
context = None
if isinstance(frame, OpenAILLMContextFrame):
context: OpenAILLMContext = frame.context
await self.push_frame(frame, direction) # Add this line here to pass frame along
elif isinstance(frame, LLMMessagesFrame):
context = OpenAILLMContext.from_messages(frame.messages)
elif isinstance(frame, VisionImageRawFrame):
context = OpenAILLMContext()
context.add_image_frame_message(
format=frame.format, size=frame.size, image=frame.image, text=frame.text
)
elif isinstance(frame, LLMUpdateSettingsFrame):
await self._update_settings(frame.settings)
else:
await self.push_frame(frame, direction)
###############################################################################
from helpers.freeplay_frame import FreeplayProcessor
from freeplay import Freeplay, SessionInfo
fp_client = Freeplay(
freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
api_base=os.getenv("FREEPLAY_API_BASE")
)
# Get the unformatted prompt from Freeplay
# Get the unformatted prompt here and then bind it later on.
# This reduces latency in the system.
unformatted_prompt = fp_client.prompts.get(
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=os.getenv("PROMPT_NAME"),
environment="latest",
)
formatted_prompt = unformatted_prompt.bind(
variables={<optional vars>},
history=[],
).format()
# Pass the formatted prompt to the LLM
llm = OpenAILLMService(model=formatted_prompt.prompt_info.model,
tools=formatted_prompt.tool_schema if formatted_prompt.tool_schema else None,
api_key=os.getenv("OPENAI_API_KEY"),
**formatted_prompt.prompt_info.model_parameters)
# Pass the Freeplay client to the FreeplayProcessor
freeplay_processor = FreeplayProcessor(fp_client=fp_client, template_name="voice-assistant", session=session, debug=True)
#....Additional Pipeline Configuration...
pipeline = Pipeline(
[
transport.input(), # Websocket input from client
stt, # Speech-To-Text
context_aggregator.user(),
llm, # LLM
tts, # Text-To-Speech
freeplay_processor, # Freeplay Logger (after tts so it can capture assistant audio)
transport.output(), # Websocket output to client
audiobuffer, # Used to buffer the audio in the pipeline
context_aggregator.assistant(),
]
)
#####################
# FreeplayLLMLogger #
#####################
import os
import io
import wave
import time
import base64
import datetime
from pipecat.frames.frames import (
Frame,
LLMFullResponseStartFrame,
LLMFullResponseEndFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
InputAudioRawFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
MetricsFrame,
TTSAudioRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from freeplay import (
Freeplay,
RecordPayload,
CallInfo,
SessionInfo,
)
from freeplay.resources.prompts import PromptInfo
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData
class FreeplayProcessor(FrameProcessor):
"""Logs LLM interactions and audio to Freeplay with simplified structure."""
def __init__(
self,
fp_client: Freeplay,
template_name: str,
session: SessionInfo = None,
required_information: str = None,
unformatted_prompt: PromptInfo = None,
):
super().__init__()
self.fp_client = fp_client
self.template_name = template_name
self.conversation_id = self._new_conv_id()
self.total_completion_time = 0
self.required_information = required_information
self.deepgram_latency = 0
# Audio related properties
self.sample_width = 2
self.sample_rate = 8000
self.num_channels = 1
self._user_audio = bytearray()
self._bot_audio = bytearray()
self.user_speaking = False
self.bot_speaking = False
# Freeplay related properties
self.conversation_history = []
self.session = session
self.most_recent_user_message = None
self.most_recent_completion = None
self.unformatted_prompt = unformatted_prompt
self.reset_recent_messages()
def _new_conv_id(self) -> str:
"""Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def reset_recent_messages(self):
"""Reset all temporary message and audio storage."""
self.most_recent_user_message = None
self.most_recent_completion = None
self._user_audio = bytearray()
self._bot_audio = bytearray()
self.total_completion_time = 0
self.deepgram_latency = 0
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and handle Freeplay logging."""
await super().process_frame(frame, direction)
# Handle LLM response frames
if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
print(f"LLMFullResponseFrame: {event}", flush=True)
# Handle LLM context frame - this is where we log to Freeplay
elif isinstance(frame, OpenAILLMContextFrame):
messages = frame.context.messages
# Extract user message and completion from context
user_messages = [m for m in messages if m.get("role") == "user"]
if user_messages:
self.most_recent_user_message = user_messages[-1].get("content")
completions = [m for m in messages if m.get("role") == "assistant"]
if completions:
self.most_recent_completion = completions[-1].get("content")
# Log to Freeplay when we have both user input and completion
if self.most_recent_user_message and self.most_recent_completion:
self._record_to_freeplay()
# Handle audio state changes
elif isinstance(frame, UserStartedSpeakingFrame):
self.user_speaking = True
elif isinstance(frame, UserStoppedSpeakingFrame):
self.user_speaking = False
elif isinstance(frame, BotStartedSpeakingFrame):
self.bot_speaking = True
elif isinstance(frame, BotStoppedSpeakingFrame):
self.bot_speaking = False
# # Handle audio data
elif isinstance(frame, InputAudioRawFrame):
if self.user_speaking:
self._user_audio.extend(frame.audio)
elif isinstance(frame, TTSAudioRawFrame):
if self.bot_speaking:
self._bot_audio.extend(frame.audio)
# Handle metrics for LLM completion time
elif isinstance(frame, MetricsFrame):
self.metrics = frame.data
for metric in frame.data:
if isinstance(metric, ProcessingMetricsData):
if "LLMService" in metric.processor:
self.total_completion_time = metric.value
elif isinstance(metric, TTFBMetricsData):
if "DeepgramSTTService" in metric.processor:
self.deepgram_latency += metric.value
# Pass frame to next processor
await self.push_frame(frame, direction)
def _record_to_freeplay(self):
"""Record the current conversation state to Freeplay."""
# Create a new trace for this interaction
trace = self.session.create_trace(
input=self.most_recent_user_message,
custom_metadata={
"deepgram_latency": self.deepgram_latency,
},
)
self.conversation_history.append(
{
"role": "user",
"content": [
{"type": "text", "text": self.most_recent_user_message},
{
"type": "input_audio",
"input_audio": {
"data": base64.b64encode(
self._make_wav_bytes(
self._user_audio, prepend_silence_secs=1
)
).decode("utf-8"),
"format": "wav",
},
},
],
},
)
# Bind the variables to the prompt
if self.unformatted_prompt:
formatted = self.unformatted_prompt.bind(
variables={"required_information": self.required_information},
history=self.conversation_history,
).format()
else:
# Get formatted prompt. Note this adds latency to the pipeline
formatted = self.fp_client.prompts.get_formatted(
project_id=os.getenv("FREEPLAY_PROJECT_ID"),
template_name=self.template_name,
environment="latest",
history=self.conversation_history,
variables={"required_information": self.required_information},
)
# Calculate latency for the LLM interaction
start, end = time.time(), time.time() + self.total_completion_time
try:
# Prepare metadata and record payload
custom_metadata = {
"conversation_id": str(self.conversation_id),
}
# Add assistant's response to conversation history
last_message = {
"role": "assistant",
"content": [
{"type": "text", "text": self.most_recent_completion},
],
"audio": {
"id": self.conversation_id,
"data": base64.b64encode(
self._make_wav_bytes(self._bot_audio, prepend_silence_secs=1)
).decode("utf-8"),
"expires_at": 1729234747,
"transcript": self.most_recent_completion,
},
}
self.conversation_history.append(last_message)
# Create recording in Freeplay
self.fp_client.recordings.create(
RecordPayload(
project_id=PROJECT_ID,
all_messages=[
*formatted.llm_prompt,
last_message, # Add the last message to the record call
],
session_info=SessionInfo(
self.session.session_id, custom_metadata=custom_metadata
),
inputs={"required_information": self.required_information},
prompt_version_info=formatted.prompt_info,
call_info=CallInfo.from_prompt_info(
formatted.prompt_info, start, end
),
trace_info=trace,
)
)
# Record output to trace
trace.record_output(
os.getenv("FREEPLAY_PROJECT_ID"),
self.most_recent_completion,
)
print(
f"Successfully recorded to Freeplay - completion time: {self.total_completion_time}s",
flush=True,
)
self.reset_recent_messages()
except Exception as e:
print(f"Error recording to Freeplay: {e}", flush=True)
self.reset_recent_messages()
def _make_wav_bytes(self, pcm: bytes, prepend_silence_secs: int = 1) -> bytes:
"""Convert PCM audio data to WAV format with optional silence prepend."""
buf = io.BytesIO()
if prepend_silence_secs > 0:
silence_samples = int(
self.sample_rate
* self.sample_width
* self.num_channels
* prepend_silence_secs
)
silence = b"\x00" * silence_samples
pcm = silence + pcm
with wave.open(buf, "wb") as wf:
wf.setnchannels(self.num_channels)
wf.setsampwidth(self.sample_width)
wf.setframerate(self.sample_rate)
wf.writeframes(pcm)
return buf.getvalue()

