Skip to main content

1. Initialize the Freeplay Client

2. Fetch your formatted prompt

3. Pass the prompt to Pipecats LLM Service (Processor)

4. Initialize the FreeplayProcessor Service

5. Add freeplay_logger to your Pipeline

6. FreeplayProcessor

The FreeplayProcessor is based on Pipecats LLMLogObserver however, it needs to be a service and not a logger in order to log the correct number of completions.

Examples

###############################################################################
# Note: It is required to modify the processes_frame function in pipecat’s
# base_llm.py to pass along the OpenAILLMContext frame,
# this makes the handling easier in the FreeplayProcessor - process_frame
###############################################################################

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        await super().process_frame(frame, direction)

        context = None
        if isinstance(frame, OpenAILLMContextFrame):
            context: OpenAILLMContext = frame.context
            await self.push_frame(frame, direction) # Add this line here to pass frame along
        elif isinstance(frame, LLMMessagesFrame):
            context = OpenAILLMContext.from_messages(frame.messages)
        elif isinstance(frame, VisionImageRawFrame):
            context = OpenAILLMContext()
            context.add_image_frame_message(
                format=frame.format, size=frame.size, image=frame.image, text=frame.text
            )
        elif isinstance(frame, LLMUpdateSettingsFrame):
            await self._update_settings(frame.settings)
        else:
            await self.push_frame(frame, direction)
	###############################################################################


from helpers.freeplay_frame import FreeplayProcessor
from freeplay import Freeplay, SessionInfo

	fp_client = Freeplay(
        freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
        api_base=os.getenv("FREEPLAY_API_BASE")
    )

    # Get the unformatted prompt from Freeplay
    # Get the unformatted prompt here and then bind it later on.
    # This reduces latency in the system.
    unformatted_prompt = fp_client.prompts.get(
        project_id=os.getenv("FREEPLAY_PROJECT_ID"),
        template_name=os.getenv("PROMPT_NAME"),
        environment="latest",
    )
    formatted_prompt = unformatted_prompt.bind(
        variables={<optional vars>},
        history=[],
    ).format()
    # Pass the formatted prompt to the LLM
    llm = OpenAILLMService(model=formatted_prompt.prompt_info.model,
                           tools=formatted_prompt.tool_schema if formatted_prompt.tool_schema else None,
                           api_key=os.getenv("OPENAI_API_KEY"),
                           **formatted_prompt.prompt_info.model_parameters)

    # Pass the Freeplay client to the FreeplayProcessor
    freeplay_processor = FreeplayProcessor(fp_client=fp_client, template_name="voice-assistant", session=session, debug=True)

 #....Additional Pipeline Configuration...

     pipeline = Pipeline(
        [
            transport.input(),  # Websocket input from client
            stt,  # Speech-To-Text
            context_aggregator.user(),
            llm,  # LLM
            tts,  # Text-To-Speech
          	freeplay_processor, # Freeplay Logger (after tts so it can capture assistant audio)
            transport.output(),  # Websocket output to client
            audiobuffer,  # Used to buffer the audio in the pipeline
            context_aggregator.assistant(),
        ]
    )

#####################
# FreeplayLLMLogger #
#####################
import os
import io
import wave
import time
import base64
import datetime
from pipecat.frames.frames import (
    Frame,
    LLMFullResponseStartFrame,
    LLMFullResponseEndFrame,
    UserStartedSpeakingFrame,
    UserStoppedSpeakingFrame,
    InputAudioRawFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
    MetricsFrame,
    TTSAudioRawFrame,
)
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
from freeplay import (
    Freeplay,
    RecordPayload,
    CallInfo,
    SessionInfo,
)
from freeplay.resources.prompts import PromptInfo
from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData


class FreeplayProcessor(FrameProcessor):
    """Logs LLM interactions and audio to Freeplay with simplified structure."""

    def __init__(
        self,
        fp_client: Freeplay,
        template_name: str,
        session: SessionInfo = None,
        required_information: str = None,
        unformatted_prompt: PromptInfo = None,
    ):
        super().__init__()
        self.fp_client = fp_client
        self.template_name = template_name
        self.conversation_id = self._new_conv_id()
        self.total_completion_time = 0
        self.required_information = required_information
        self.deepgram_latency = 0

        # Audio related properties
        self.sample_width = 2
        self.sample_rate = 8000
        self.num_channels = 1
        self._user_audio = bytearray()
        self._bot_audio = bytearray()
        self.user_speaking = False
        self.bot_speaking = False

        # Freeplay related properties
        self.conversation_history = []
        self.session = session
        self.most_recent_user_message = None
        self.most_recent_completion = None
        self.unformatted_prompt = unformatted_prompt
        self.reset_recent_messages()

    def _new_conv_id(self) -> str:
        """Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
        return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

    def reset_recent_messages(self):
        """Reset all temporary message and audio storage."""
        self.most_recent_user_message = None
        self.most_recent_completion = None
        self._user_audio = bytearray()
        self._bot_audio = bytearray()
        self.total_completion_time = 0
        self.deepgram_latency = 0

    async def process_frame(self, frame: Frame, direction: FrameDirection):
        """Process incoming frames and handle Freeplay logging."""
        await super().process_frame(frame, direction)

        # Handle LLM response frames
        if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
            event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
            print(f"LLMFullResponseFrame: {event}", flush=True)

        # Handle LLM context frame - this is where we log to Freeplay
        elif isinstance(frame, OpenAILLMContextFrame):
            messages = frame.context.messages
            # Extract user message and completion from context
            user_messages = [m for m in messages if m.get("role") == "user"]
            if user_messages:
                self.most_recent_user_message = user_messages[-1].get("content")

            completions = [m for m in messages if m.get("role") == "assistant"]
            if completions:
                self.most_recent_completion = completions[-1].get("content")

            # Log to Freeplay when we have both user input and completion
            if self.most_recent_user_message and self.most_recent_completion:
                self._record_to_freeplay()

        # Handle audio state changes
        elif isinstance(frame, UserStartedSpeakingFrame):
            self.user_speaking = True
        elif isinstance(frame, UserStoppedSpeakingFrame):
            self.user_speaking = False
        elif isinstance(frame, BotStartedSpeakingFrame):
            self.bot_speaking = True
        elif isinstance(frame, BotStoppedSpeakingFrame):
            self.bot_speaking = False

        # # Handle audio data
        elif isinstance(frame, InputAudioRawFrame):
            if self.user_speaking:
                self._user_audio.extend(frame.audio)
        elif isinstance(frame, TTSAudioRawFrame):
            if self.bot_speaking:
                self._bot_audio.extend(frame.audio)

        # Handle metrics for LLM completion time
        elif isinstance(frame, MetricsFrame):
            self.metrics = frame.data
            for metric in frame.data:
                if isinstance(metric, ProcessingMetricsData):
                    if "LLMService" in metric.processor:
                        self.total_completion_time = metric.value
                elif isinstance(metric, TTFBMetricsData):
                    if "DeepgramSTTService" in metric.processor:
                        self.deepgram_latency += metric.value

        # Pass frame to next processor
        await self.push_frame(frame, direction)

    def _record_to_freeplay(self):
        """Record the current conversation state to Freeplay."""
        # Create a new trace for this interaction
        trace = self.session.create_trace(
            input=self.most_recent_user_message,
            custom_metadata={
                "deepgram_latency": self.deepgram_latency,
            },
        )

        self.conversation_history.append(
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": self.most_recent_user_message},
                    {
                        "type": "input_audio",
                        "input_audio": {
                            "data": base64.b64encode(
                                self._make_wav_bytes(
                                    self._user_audio, prepend_silence_secs=1
                                )
                            ).decode("utf-8"),
                            "format": "wav",
                        },
                    },
                ],
            },
        )

        # Bind the variables to the prompt
        if self.unformatted_prompt:
            formatted = self.unformatted_prompt.bind(
                variables={"required_information": self.required_information},
                history=self.conversation_history,
            ).format()
        else:
            # Get formatted prompt. Note this adds latency to the pipeline
            formatted = self.fp_client.prompts.get_formatted(
                project_id=os.getenv("FREEPLAY_PROJECT_ID"),
                template_name=self.template_name,
                environment="latest",
                history=self.conversation_history,
                variables={"required_information": self.required_information},
            )

        # Calculate latency for the LLM interaction
        start, end = time.time(), time.time() + self.total_completion_time

        try:
            # Prepare metadata and record payload
            custom_metadata = {
                "conversation_id": str(self.conversation_id),
            }

            # Add assistant's response to conversation history
            last_message = {
                "role": "assistant",
                "content": [
                    {"type": "text", "text": self.most_recent_completion},
                ],
                "audio": {
                    "id": self.conversation_id,
                    "data": base64.b64encode(
                        self._make_wav_bytes(self._bot_audio, prepend_silence_secs=1)
                    ).decode("utf-8"),
                    "expires_at": 1729234747,
                    "transcript": self.most_recent_completion,
                },
            }
            self.conversation_history.append(last_message)

            # Create recording in Freeplay
            self.fp_client.recordings.create(
              RecordPayload(
                    project_id=PROJECT_ID,
                    all_messages=[
                        *formatted.llm_prompt,
                        last_message,  # Add the last message to the record call
                    ],
                    session_info=SessionInfo(
                        self.session.session_id, custom_metadata=custom_metadata
                    ),
                    inputs={"required_information": self.required_information},
                    prompt_version_info=formatted.prompt_info,
                    call_info=CallInfo.from_prompt_info(
                        formatted.prompt_info, start, end
                    ),
                    trace_info=trace,
                )
            )

            # Record output to trace
            trace.record_output(
                os.getenv("FREEPLAY_PROJECT_ID"),
                self.most_recent_completion,
            )

            print(
                f"Successfully recorded to Freeplay - completion time: {self.total_completion_time}s",
                flush=True,
            )
            self.reset_recent_messages()

        except Exception as e:
            print(f"Error recording to Freeplay: {e}", flush=True)
            self.reset_recent_messages()

    def _make_wav_bytes(self, pcm: bytes, prepend_silence_secs: int = 1) -> bytes:
        """Convert PCM audio data to WAV format with optional silence prepend."""
        buf = io.BytesIO()
        if prepend_silence_secs > 0:
            silence_samples = int(
                self.sample_rate
                * self.sample_width
                * self.num_channels
                * prepend_silence_secs
            )
            silence = b"\x00" * silence_samples
            pcm = silence + pcm
        with wave.open(buf, "wb") as wf:
            wf.setnchannels(self.num_channels)
            wf.setsampwidth(self.sample_width)
            wf.setframerate(self.sample_rate)
            wf.writeframes(pcm)

        return buf.getvalue()