> ## Documentation Index
> Fetch the complete documentation index at: https://docs.freeplay.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipecat Observer

> Add Freeplay observability to Pipecat voice pipelines using the Observer pattern.

### 1. Freeplay & LLM Initialization

Initialize the Freeplay client, get the unformatted prompt (optionally bind variables to it).

Initialize the LLM service for the pipeline using info from Freeplay

### 2. AudioBuffer

Add the audio buffer processor, this allows for us to grab the right audio information to log to Freeplay.

### 3. Add the FreeplayObserver to PipelineTask

### 4. Callbacks to log to Freeplay

Add call backs to handle user and bot audio, when you have the right details, log to Freeplay

### 5. FreeplayObserver

The entire FreeplayObserver class

## Examples

<CodeGroup>
  ```python Python theme={null}
  from helpers.freeplay_observer import FreeplayObserver
  from freeplay import Freeplay, SessionInfo
  import os
  import sys

  from dotenv import load_dotenv
  from fastapi import WebSocket
  from loguru import logger

  from pipecat.audio.vad.silero import SileroVADAnalyzer
  from pipecat.pipeline.pipeline import Pipeline
  from pipecat.pipeline.runner import PipelineRunner
  from pipecat.pipeline.task import PipelineParams, PipelineTask
  from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
  from pipecat.processors.audio.audio_buffer_processor import AudioBufferProcessor
  from pipecat.serializers.twilio import TwilioFrameSerializer
  from pipecat.services.cartesia.tts import CartesiaTTSService
  from pipecat.services.deepgram.stt import DeepgramSTTService
  from pipecat.services.openai.llm import OpenAILLMService
  from pipecat.transports.network.fastapi_websocket import (
      FastAPIWebsocketParams,
      FastAPIWebsocketTransport,
  )

  load_dotenv(override=True)


  async def run_bot(
      websocket_client: WebSocket,
      stream_sid: str,
      call_sid: str,
      testing: bool,
      session: SessionInfo,
  ):
      serializer = TwilioFrameSerializer(
          stream_sid=stream_sid,
          call_sid=call_sid,
          account_sid=os.getenv("TWILIO_ACCOUNT_SID", ""),
          auth_token=os.getenv("TWILIO_AUTH_TOKEN", ""),
      )

      transport = FastAPIWebsocketTransport(
          websocket=websocket_client,
          params=FastAPIWebsocketParams(
              audio_in_enabled=True,
              audio_out_enabled=True,
              add_wav_header=False,
              vad_enabled=True,
              vad_analyzer=SileroVADAnalyzer(),
              vad_audio_passthrough=True,
              serializer=serializer,
          ),
      )

      # Example of getting variables from a local function
      required_information = select_meeting_details()

      # Initialize Freeplay client
      fp_client = Freeplay(
          freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
          api_base=os.getenv("FREEPLAY_API_BASE"),
      )
      # Get the unformatted prompt from Freeplay
      unformatted_prompt = fp_client.prompts.get(
          project_id=os.getenv("FREEPLAY_PROJECT_ID"),
          template_name=os.getenv("PROMPT_NAME"),
          environment="latest",
      )
      formatted_prompt = unformatted_prompt.bind(
          variables={"required_information": required_information},
          history=[],
      ).format()

      # Pass the formatted prompt to the LLM
      llm = OpenAILLMService(
          model=unformatted_prompt.prompt_info.model,
          tools=(
              unformatted_prompt.tool_schema if unformatted_prompt.tool_schema else None
          ),
          api_key=os.getenv("OPENAI_API_KEY"),
          **unformatted_prompt.prompt_info.model_parameters,
      )

      # Pass the Deepgram API key to the DeepgramSTTService
      stt = DeepgramSTTService(
          api_key=os.getenv("DEEPGRAM_API_KEY"), audio_passthrough=True
      )

      tts = CartesiaTTSService(
          api_key=os.getenv("CARTESIA_API_KEY"),
          voice_id=os.getenv("CARTESIA_VOICE_ID"),
          push_silence_after_stop=testing,
      )
      context = OpenAILLMContext(formatted_prompt.llm_prompt)
      context_aggregator = llm.create_context_aggregator(context)

      # NOTE: This buffer is what allows us to capture the audio from the user and bot using the callback handlers.
      # NOTE: Watch out! This will save all the conversation in memory. You can
      # pass `buffer_size` to get periodic callbacks.
      audiobuffer = AudioBufferProcessor(
          sample_rate=16000,  # Optional: desired output sample rate
          num_channels=1,  # 1 for mono, 2 for stereo
          buffer_size=0,  # Size in bytes to trigger buffer callbacks
          user_continuous_stream=False,
          enable_turn_audio=True,
      )

      freeplay_observer = FreeplayObserver(
          fp_client=fp_client,
          unformatted_prompt=unformatted_prompt,
          environment=os.getenv("FREEPLAY_ENVIRONMENT"),
          variables={
              "required_information": required_information,
          },
      )

      pipeline = Pipeline(
          [
              transport.input(),  # Websocket input from client
              stt,  # Speech-To-Text
              context_aggregator.user(),
              llm,  # LLM
              tts,  # Text-To-Speech
              transport.output(),  # Websocket output to client
              audiobuffer,  # Used to buffer the audio in the pipeline
              context_aggregator.assistant(),
          ]
      )

      task = PipelineTask(
          pipeline,
          params=PipelineParams(
              audio_in_sample_rate=8000,
              audio_out_sample_rate=8000,
              allow_interruptions=True,
              enable_metrics=True,
              enable_usage_metrics=True,  # This is used to track the usage of the LLM
          ),
          observers=[
              freeplay_observer
          ],  # Use the FreeplayObserver to record the audio to Freeplay
      )

      # Additional handlers to support Freeplay Observer
      # save audio bytes from user and store in freeplay_observer
      @audiobuffer.event_handler("on_user_turn_audio_data")
      async def on_user_turn_audio_data(buffer, audio, sample_rate, num_channels):
          if audio and not freeplay_observer._bot_audio:
              # aggregate user audio because this event could fire multiple times
              # before bot responds
              freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
                  audio
              )
              freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
                  freeplay_observer._turn_user_audio,
                  sample_rate,
                  "user",
                  prepend_silence_secs=1,
              )
          elif audio and freeplay_observer._bot_audio:
              freeplay_observer._user_audio = freeplay_observer._turn_user_audio.extend(
                  audio
              )
              freeplay_observer._user_audio = await freeplay_observer.make_wav_bytes(
                  freeplay_observer._turn_user_audio,
                  sample_rate,
                  "user",
                  prepend_silence_secs=1,
              )
              await freeplay_observer.record_to_freeplay()

      # save audio bytes from bot and store in freeplay_observer
      @audiobuffer.event_handler("on_bot_turn_audio_data")
      async def on_bot_turn_audio_data(buffer, audio, sample_rate, num_channels):
          # this assumes the user always speaks first and would cut off
          # the first turn of the bot
          if audio and not freeplay_observer._user_audio:
              # aggregate bot audio because this event could fire multiple times
              # before user responds
              freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
                  audio, sample_rate, "bot", prepend_silence_secs=1
              )
          elif audio and freeplay_observer._user_audio:
              freeplay_observer._bot_audio = await freeplay_observer.make_wav_bytes(
                  audio, sample_rate, "bot", prepend_silence_secs=1
              )
              await freeplay_observer.record_to_freeplay()

  ##############################################################################
  # FreeplayObserver.py
  ##############################################################################
  import os

  import io
  import wave
  import time
  import base64
  import datetime
  import uuid
  from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
  from pipecat.processors.frame_processor import FrameDirection
  from freeplay import Freeplay, RecordPayload, CallInfo, SessionInfo

  from dotenv import load_dotenv
  import asyncio
  import functools

  from pipecat.frames.frames import (
      LLMFullResponseStartFrame,
      LLMFullResponseEndFrame,
      TranscriptionFrame,
  )
  from pipecat.observers.base_observer import BaseObserver, FramePushed
  from pipecat.processors.aggregators.llm_response import LLMUserContextAggregator
  from pipecat.services.openai.base_llm import BaseOpenAILLMService

  load_dotenv(override=True)


  class FreeplayObserver(BaseObserver):
      def __init__(
          self,
          fp_client: Freeplay,
          unformatted_prompt: str = None,
          template_name: str = os.getenv("PROMPT_NAME") or None,
          environment: str = "latest",
          variables: dict = {},
      ):
          super().__init__()
          self.start_llm_interaction = 0
          self.end_llm_interaction = 0
          self.llm_completion_latency = 0
          self.call_id = str(uuid.uuid4())  # Has to be str to record to Freeplay

          # Audio related properties
          self.sample_width = 2
          self.num_channels = 1
          self.sample_rate = 16000
          self._bot_audio = bytearray()
          self._user_audio = bytearray()
          self._turn_user_audio = bytearray()
          self.user_speaking = False
          self.bot_speaking = False
          self.fp_client = fp_client
          self.session = self.fp_client.sessions.create()

          # Freeplay Params
          self.template_name = template_name
          self.environment = environment
          self.unformatted_prompt = unformatted_prompt
          self.variables = variables
          # Conversation Params
          self.conversation_id = self._new_conv_id()
          self.conversation_history = []
          self.most_recent_user_message = None
          self.most_recent_completion = None

      def _new_conv_id(self) -> str:
          """Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
          return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

      def _reset_recent_messages(self):
          """Reset all temporary message and audio storage."""
          self.most_recent_user_message = None
          self.most_recent_completion = None
          self._user_audio = bytearray()
          self._bot_audio = bytearray()
          self._turn_user_audio = bytearray()
          # self.llm_completion_latency = 0

      async def record_to_freeplay(self):
          """Record the current interaction to Freeplay as a new trace."""
          # Create a new trace for this interaction
          trace = self.session.create_trace(
              input=self.most_recent_user_message,
              custom_metadata={
                  "conversation_id": str(self.conversation_id),
              },
          )

          # Add user message to conversation history
          self.conversation_history.append(
              {
                  "role": "user",
                  "content": [
                      {"type": "text", "text": self.most_recent_user_message},
                      {
                          "type": "input_audio",
                          "input_audio": {
                              "data": base64.b64encode(self._user_audio).decode("utf-8"),
                              "format": "wav",
                          },
                      },
                  ],
              },
          )

          # Bind the variables to the prompt
          if self.unformatted_prompt:
              formatted = self.unformatted_prompt.bind(
                  variables=self.variables,
                  history=self.conversation_history,
              ).format()
          else:
              # Run in executor to avoid blocking
              loop = asyncio.get_running_loop()
              formatted = await loop.run_in_executor(
                  None,
                  self.fp_client.prompts.get_formatted,
                  project_id=os.getenv("FREEPLAY_PROJECT_ID"),
                  template_name=self.template_name,
                  environment=self.environment,
                  history=self.conversation_history,
                  variables=self.variables,
              )

          # Calculate latency for the LLM interaction
          # Convert nanoseconds to seconds for proper timing
          latency_seconds = self.get_llm_response_latency_seconds()
          end = time.time()
          start = end - latency_seconds
          try:
              print(f"_____* self._bot_audio: {len(self._bot_audio)}")

              # Prepare metadata and record payload
              custom_metadata = {"caller_id": self.call_id}

              # Prepare assistants response message (mimicing the format of the llm provider message)
              assistant_msg = {
                  "role": "assistant",
                  "content": [
                      {"type": "text", "text": self.most_recent_completion},
                  ],
                  "audio": {
                      "id": self.conversation_id,
                      "data": base64.b64encode(self._bot_audio).decode("utf-8"),
                      "expires_at": 1729234747,
                      "transcript": self.most_recent_completion,
                  },
              }

              # Add assistant's response to conversation history
              self.conversation_history.append(assistant_msg)

              record = RecordPayload(
                project_id=PROJECT_ID,
  							all_messages=[
                      *formatted.llm_prompt,
                      assistant_msg,  # Add the assistant's response to the record call
                  ],
                  session_info=SessionInfo(
                      self.session.session_id, custom_metadata=custom_metadata
                  ),
                  inputs={},
                  prompt_version_info=formatted.prompt_info,
                  call_info=CallInfo.from_prompt_info(formatted.prompt_info, start, end),
                  trace_info=trace,
              )

              # Create recording in Freeplay
              loop = asyncio.get_running_loop()
              await loop.run_in_executor(None, self.fp_client.recordings.create, record)

              # Record output to trace

              await loop.run_in_executor(
                  None,
                  functools.partial(
                      trace.record_output,
                      project_id=os.getenv("FREEPLAY_PROJECT_ID"),
                      output=self.most_recent_completion,
                      eval_results={},
                  ),
              )

              print(
                  f"✅ Recorded interaction #{len(self.conversation_history) // 2} to Freeplay - LLM response time: {self.get_llm_response_latency_seconds():.3f}s",
                  flush=True,
              )

              # Reset only audio and current message data, keep conversation history
              self._reset_recent_messages()

          except Exception as e:
              print(f"❌ Error recording to Freeplay: {e}", flush=True)
              # Still reset audio buffers to prevent accumulation
              # audio buffers are overwritten in event handler
              self._reset_recent_messages()

      async def make_wav_bytes(
          self, pcm: bytes, sample_rate: int, voice: str, prepend_silence_secs: int = 1
      ) -> bytes:
          """Convert PCM audio data to WAV format with optional silence prepend."""
          if prepend_silence_secs > 0:
              silence_samples = int(
                  self.sample_rate
                  * self.sample_width
                  * self.num_channels
                  * prepend_silence_secs
              )
              silence = b"\x00" * silence_samples
              pcm = silence + pcm
          with io.BytesIO() as buf:
              with wave.open(buf, "wb") as wf:
                  wf.setnchannels(self.num_channels)
                  wf.setsampwidth(self.sample_width)
                  wf.setframerate(sample_rate)
                  wf.writeframes(pcm)
              return buf.getvalue()

      async def on_push_frame(self, data: FramePushed):
          src = data.source
          dst = data.destination
          frame = data.frame
          direction = data.direction
          timestamp = data.timestamp

          # Create direction arrow
          arrow = "→" if direction == FrameDirection.DOWNSTREAM else "←"

          if isinstance(frame, LLMFullResponseStartFrame):
              print(f"LLMFullResponseFrame: START {src} {arrow} {dst}", flush=True)
          elif isinstance(frame, LLMFullResponseEndFrame):
              print(f"LLMFullResponseFrame: END {src} {arrow} {dst}", flush=True)
          elif isinstance(frame, TranscriptionFrame):
              # Capture user if bot talks first
              if self.most_recent_user_message is None:
                  self.most_recent_user_message = frame.text
          elif isinstance(frame, OpenAILLMContextFrame):
              messages = frame.context.messages
              # Extract user message and completion from context
              # NOTE: this replaces the TranscriptionFrame results, as this maps excatly what the llm recived.
              user_messages = [m for m in messages if m.get("role") == "user"]
              if user_messages:
                  self.most_recent_user_message = user_messages[-1].get("content")
              completions = [m for m in messages if m.get("role") == "assistant"]
              if completions:
                  self.most_recent_completion = completions[-1].get("content")

              # if (
              #     self.llm_completion_latency
              #     and self.most_recent_user_message
              #     and self.most_recent_completion
              # ):
              #     # reset latency
              #     self.llm_completion_latency = 0

          # Get relevant latency metrics for the LLM interaction
          if (
              isinstance(frame, OpenAILLMContextFrame)
              and isinstance(src, LLMUserContextAggregator)
              and isinstance(dst, BaseOpenAILLMService)
          ):
              self.start_llm_interaction = timestamp
              print(f"_____freeplay-observer.py OpenAILLMContextFrame START: {timestamp}")
          elif isinstance(frame, LLMFullResponseEndFrame) and isinstance(
              src, BaseOpenAILLMService
          ):
              self.end_llm_interaction = timestamp

              # update latency tally
              self.llm_completion_latency = (
                  self.end_llm_interaction - self.start_llm_interaction
              )
              print(
                  f"_____freeplay-observer.py * set self.llm_completion_latency: {self.llm_completion_latency} ({self.get_llm_response_latency_seconds():.3f}s)"
              )

      def get_llm_response_latency_seconds(self):
          """Convert the raw nanosecond LLM response latency to seconds."""
          return self.llm_completion_latency / 1_000_000_000
  ```
</CodeGroup>
