Skip to main content

1. Freeplay & LLM Initialization

Initialize the Freeplay client, get the unformatted prompt (optionally bind variables to it). Initialize the LLM service for the pipeline using info from Freeplay

2. AudioBuffer

Add the audio buffer processor, this allows for us to grab the right audio information to log to Freeplay.

3. Add the FreeplayObserver to PipelineTask

4. Callbacks to log to Freeplay

Add call backs to handle user and bot audio, when you have the right details, log to Freeplay

5. FreeplayObserver

The entire FreeplayObserver class

Examples

from helpers.freeplay_observer import FreeplayObserver
from freeplay import Freeplay, SessionInfo
import os
import sys

from dotenv import load_dotenv
from fastapi import WebSocket
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
from pipecat.serializers.twilio import TwilioFrameSerializer
from pipecat.services.cartesia.tts import CartesiaTTSService
from pipecat.services.deepgram.stt import DeepgramSTTService
from pipecat.services.openai.llm import OpenAILLMService
from pipecat.transports.network.fastapi_websocket import (
    FastAPIWebsocketParams,
    FastAPIWebsocketTransport,
)

load_dotenv(override=True)


async def run_bot(
    websocket_client: WebSocket,
    stream_sid: str,
    call_sid: str,
    testing: bool,
    session: SessionInfo,
):
    serializer = TwilioFrameSerializer(
        stream_sid=stream_sid,
        call_sid=call_sid,
        account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
        auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
    )

    transport = FastAPIWebsocketTransport(
        websocket=websocket_client,
        params=FastAPIWebsocketParams(
            audio_in_enabled=True,
            audio_out_enabled=True,
            add_wav_header=False,
            vad_enabled=True,
            vad_analyzer=SileroVADAnalyzer(),
            vad_audio_passthrough=True,
            serializer=serializer,
        ),
    )

    # Example of getting variables from a local function
    required_information = select_meeting_details()

    # Initialize Freeplay client
    fp_client = Freeplay(
        freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
        api_base=os.getenv("FREEPLAY_API_BASE"),
    )
    # Get the unformatted prompt from Freeplay
    unformatted_prompt = fp_client.prompts.get(
        project_id=os.getenv("FREEPLAY_PROJECT_ID"),
        template_name=os.getenv("PROMPT_NAME"),
        environment="latest",
    )
    formatted_prompt = unformatted_prompt.bind(
        variables={"required_information": required_information},
        history=[],
    ).format()

    # Pass the formatted prompt to the LLM
    llm = OpenAILLMService(
        model=unformatted_prompt.prompt_info.model,
        tools=(
            unformatted_prompt.tool_schema if unformatted_prompt.tool_schema else None
        ),
        api_key=os.getenv("OPENAI_API_KEY"),
        **unformatted_prompt.prompt_info.model_parameters,
    )

    # Pass the Deepgram API key to the DeepgramSTTService
    stt = DeepgramSTTService(
        api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True
    )

    tts = CartesiaTTSService(
        api_key=os.getenv("CARTESIA_API_KEY"),
        voice_id=os.getenv("CARTESIA_VOICE_ID"),
        push_silence_after_stop=testing,
    )
    context = OpenAILLMContext(formatted_prompt.llm_prompt)
    context_aggregator = llm.create_context_aggregator(context)

    # NOTE: This buffer is what allows us to capture the audio from the user and bot using the callback handlers.
    # NOTE: Watch out! This will save all the conversation in memory. You can
    # pass `buffer_size` to get periodic callbacks.
    audiobuffer = AudioBufferProcessor(
        sample_rate=16000,  # Optional: desired output sample rate
        num_channels=1,  # 1 for mono, 2 for stereo
        buffer_size=0,  # Size in bytes to trigger buffer callbacks
        user_continuous_stream=False,
        enable_turn_audio=True,
    )

    freeplay_observer = FreeplayObserver(
        fp_client=fp_client,
        unformatted_prompt=unformatted_prompt,
        environment=os.getenv("FREEPLAY_ENVIRONMENT"),
        variables={
            "required_information": required_information,
        },
    )

    pipeline = Pipeline(
        [
            transport.input(),  # Websocket input from client
            stt,  # Speech-To-Text
            context_aggregator.user(),
            llm,  # LLM
            tts,  # Text-To-Speech
            transport.output(),  # Websocket output to client
            audiobuffer,  # Used to buffer the audio in the pipeline
            context_aggregator.assistant(),
        ]
    )

    task = PipelineTask(
        pipeline,
        params=PipelineParams(
            audio_in_sample_rate=8000,
            audio_out_sample_rate=8000,
            allow_interruptions=True,
            enable_metrics=True,
            enable_usage_metrics=True,  # This is used to track the usage of the LLM
        ),
        observers=[
            freeplay_observer
        ],  # Use the FreeplayObserver to record the audio to Freeplay
    )

    # Additional handlers to support Freeplay Observer
    # save audio bytes from user and store in freeplay_observer
    @audiobuffer.event_handler("on_user_turn_audio_data")
    async def on_user_turn_audio_data(buffer, audio, sample_rate, num_channels):
        if audio and not freeplay_observer._bot_audio:
            # aggregate user audio because this event could fire multiple times
            # before bot responds
            freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
                audio
            )
            freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
                freeplay_observer._turn_user_audio,
                sample_rate,
                "user",
                prepend_silence_secs=1,
            )
        elif audio and freeplay_observer._bot_audio:
            freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
                audio
            )
            freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
                freeplay_observer._turn_user_audio,
                sample_rate,
                "user",
                prepend_silence_secs=1,
            )
            await freeplay_observer.record_to_freeplay()

    # save audio bytes from bot and store in freeplay_observer
    @audiobuffer.event_handler("on_bot_turn_audio_data")
    async def on_bot_turn_audio_data(buffer, audio, sample_rate, num_channels):
        # this assumes the user always speaks first and would cut off
        # the first turn of the bot
        if audio and not freeplay_observer._user_audio:
            # aggregate bot audio because this event could fire multiple times
            # before user responds
            freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
                audio, sample_rate, "bot", prepend_silence_secs=1
            )
        elif audio and freeplay_observer._user_audio:
            freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
                audio, sample_rate, "bot", prepend_silence_secs=1
            )
            await freeplay_observer.record_to_freeplay()

##############################################################################
# FreeplayObserver.py
##############################################################################
import os

import io
import wave
import time
import base64
import datetime
import uuid
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
from pipecat.processors.frame_processor import FrameDirection
from freeplay import Freeplay, RecordPayload, CallInfo, SessionInfo

from dotenv import load_dotenv
import asyncio
import functools

from pipecat.frames.frames import (
    LLMFullResponseStartFrame,
    LLMFullResponseEndFrame,
    TranscriptionFrame,
)
from pipecat.observers.base_observer import BaseObserver, FramePushed
from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
from pipecat.services.openai.base_llm import BaseOpenAILLMService

load_dotenv(override=True)


class FreeplayObserver(BaseObserver):
    def __init__(
        self,
        fp_client: Freeplay,
        unformatted_prompt: str = None,
        template_name: str = os.getenv("PROMPT_NAME") or None,
        environment: str = "latest",
        variables: dict = {},
    ):
        super().__init__()
        self.start_llm_interaction = 0
        self.end_llm_interaction = 0
        self.llm_completion_latency = 0
        self.call_id = str(uuid.uuid4())  # Has to be str to record to Freeplay

        # Audio related properties
        self.sample_width = 2
        self.num_channels = 1
        self.sample_rate = 16000
        self._bot_audio = bytearray()
        self._user_audio = bytearray()
        self._turn_user_audio = bytearray()
        self.user_speaking = False
        self.bot_speaking = False
        self.fp_client = fp_client
        self.session = self.fp_client.sessions.create()

        # Freeplay Params
        self.template_name = template_name
        self.environment = environment
        self.unformatted_prompt = unformatted_prompt
        self.variables = variables
        # Conversation Params
        self.conversation_id = self._new_conv_id()
        self.conversation_history = []
        self.most_recent_user_message = None
        self.most_recent_completion = None

    def _new_conv_id(self) -> str:
        """Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
        return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

    def _reset_recent_messages(self):
        """Reset all temporary message and audio storage."""
        self.most_recent_user_message = None
        self.most_recent_completion = None
        self._user_audio = bytearray()
        self._bot_audio = bytearray()
        self._turn_user_audio = bytearray()
        # self.llm_completion_latency = 0

    async def record_to_freeplay(self):
        """Record the current interaction to Freeplay as a new trace."""
        # Create a new trace for this interaction
        trace = self.session.create_trace(
            input=self.most_recent_user_message,
            custom_metadata={
                "conversation_id": str(self.conversation_id),
            },
        )

        # Add user message to conversation history
        self.conversation_history.append(
            {
                "role": "user",
                "content": [
                    {"type": "text", "text": self.most_recent_user_message},
                    {
                        "type": "input_audio",
                        "input_audio": {
                            "data": base64.b64encode(self._user_audio).decode("utf-8"),
                            "format": "wav",
                        },
                    },
                ],
            },
        )

        # Bind the variables to the prompt
        if self.unformatted_prompt:
            formatted = self.unformatted_prompt.bind(
                variables=self.variables,
                history=self.conversation_history,
            ).format()
        else:
            # Run in executor to avoid blocking
            loop = asyncio.get_running_loop()
            formatted = await loop.run_in_executor(
                None,
                self.fp_client.prompts.get_formatted,
                project_id=os.getenv("FREEPLAY_PROJECT_ID"),
                template_name=self.template_name,
                environment=self.environment,
                history=self.conversation_history,
                variables=self.variables,
            )

        # Calculate latency for the LLM interaction
        # Convert nanoseconds to seconds for proper timing
        latency_seconds = self.get_llm_response_latency_seconds()
        end = time.time()
        start = end - latency_seconds
        try:
            print(f"_____* self._bot_audio: {len(self._bot_audio)}")

            # Prepare metadata and record payload
            custom_metadata = {"caller_id": self.call_id}

            # Prepare assistants response message (mimicing the format of the llm provider message)
            assistant_msg = {
                "role": "assistant",
                "content": [
                    {"type": "text", "text": self.most_recent_completion},
                ],
                "audio": {
                    "id": self.conversation_id,
                    "data": base64.b64encode(self._bot_audio).decode("utf-8"),
                    "expires_at": 1729234747,
                    "transcript": self.most_recent_completion,
                },
            }

            # Add assistant's response to conversation history
            self.conversation_history.append(assistant_msg)

            record = RecordPayload(
              project_id=PROJECT_ID,
							all_messages=[
                    *formatted.llm_prompt,
                    assistant_msg,  # Add the assistant's response to the record call
                ],
                session_info=SessionInfo(
                    self.session.session_id, custom_metadata=custom_metadata
                ),
                inputs={},
                prompt_version_info=formatted.prompt_info,
                call_info=CallInfo.from_prompt_info(formatted.prompt_info, start, end),
                trace_info=trace,
            )

            # Create recording in Freeplay
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, self.fp_client.recordings.create, record)

            # Record output to trace

            await loop.run_in_executor(
                None,
                functools.partial(
                    trace.record_output,
                    project_id=os.getenv("FREEPLAY_PROJECT_ID"),
                    output=self.most_recent_completion,
                    eval_results={},
                ),
            )

            print(
                f"✅ Recorded interaction #{len(self.conversation_history) // 2} to Freeplay - LLM response time: {self.get_llm_response_latency_seconds():.3f}s",
                flush=True,
            )

            # Reset only audio and current message data, keep conversation history
            self._reset_recent_messages()

        except Exception as e:
            print(f"❌ Error recording to Freeplay: {e}", flush=True)
            # Still reset audio buffers to prevent accumulation
            # audio buffers are overwritten in event handler
            self._reset_recent_messages()

    async def make_wav_bytes(
        self, pcm: bytes, sample_rate: int, voice: str, prepend_silence_secs: int = 1
    ) -> bytes:
        """Convert PCM audio data to WAV format with optional silence prepend."""
        if prepend_silence_secs > 0:
            silence_samples = int(
                self.sample_rate
                * self.sample_width
                * self.num_channels
                * prepend_silence_secs
            )
            silence = b"\x00" * silence_samples
            pcm = silence + pcm
        with io.BytesIO() as buf:
            with wave.open(buf, "wb") as wf:
                wf.setnchannels(self.num_channels)
                wf.setsampwidth(self.sample_width)
                wf.setframerate(sample_rate)
                wf.writeframes(pcm)
            return buf.getvalue()

    async def on_push_frame(self, data: FramePushed):
        src = data.source
        dst = data.destination
        frame = data.frame
        direction = data.direction
        timestamp = data.timestamp

        # Create direction arrow
        arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"

        if isinstance(frame, LLMFullResponseStartFrame):
            print(f"LLMFullResponseFrame: START {src} {arrow} {dst}", flush=True)
        elif isinstance(frame, LLMFullResponseEndFrame):
            print(f"LLMFullResponseFrame: END {src} {arrow} {dst}", flush=True)
        elif isinstance(frame, TranscriptionFrame):
            # Capture user if bot talks first
            if self.most_recent_user_message is None:
                self.most_recent_user_message = frame.text
        elif isinstance(frame, OpenAILLMContextFrame):
            messages = frame.context.messages
            # Extract user message and completion from context
            # NOTE: this replaces the TranscriptionFrame results, as this maps excatly what the llm recived.
            user_messages = [m for m in messages if m.get("role") == "user"]
            if user_messages:
                self.most_recent_user_message = user_messages[-1].get("content")
            completions = [m for m in messages if m.get("role") == "assistant"]
            if completions:
                self.most_recent_completion = completions[-1].get("content")

            # if (
            #     self.llm_completion_latency
            #     and self.most_recent_user_message
            #     and self.most_recent_completion
            # ):
            #     # reset latency
            #     self.llm_completion_latency = 0

        # Get relevant latency metrics for the LLM interaction
        if (
            isinstance(frame, OpenAILLMContextFrame)
            and isinstance(src, LLMUserContextAggregator)
            and isinstance(dst, BaseOpenAILLMService)
        ):
            self.start_llm_interaction = timestamp
            print(f"_____freeplay-observer.py OpenAILLMContextFrame START: {timestamp}")
        elif isinstance(frame, LLMFullResponseEndFrame) and isinstance(
            src, BaseOpenAILLMService
        ):
            self.end_llm_interaction = timestamp

            # update latency tally
            self.llm_completion_latency = (
                self.end_llm_interaction - self.start_llm_interaction
            )
            print(
                f"_____freeplay-observer.py * set self.llm_completion_latency: {self.llm_completion_latency} ({self.get_llm_response_latency_seconds():.3f}s)"
            )

    def get_llm_response_latency_seconds(self):
        """Convert the raw nanosecond LLM response latency to seconds."""
        return self.llm_completion_latency / 1_000_000_000