> ## Documentation Index
> Fetch the complete documentation index at: https://docs.freeplay.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Pipecat Processor

> Add Freeplay observability to Pipecat voice pipelines using the Processor pattern.

### 1. Initialize the Freeplay Client

### 2. Fetch your formatted prompt

### 3. Pass the prompt to Pipecats LLM Service (Processor)

### 4. Initialize the FreeplayProcessor Service

### 5. Add freeplay\_logger to your Pipeline

### 6. FreeplayProcessor

The FreeplayProcessor is based on Pipecats LLMLogObserver however, it needs to be a service and not a logger in order to log the correct number of completions.

## Examples

<CodeGroup>
  ```python Python theme={null}
  ###############################################################################
  # Note: It is required to modify the processes_frame function in pipecat’s
  # base_llm.py to pass along the OpenAILLMContext frame,
  # this makes the handling easier in the FreeplayProcessor - process_frame
  ###############################################################################

      async def process_frame(self, frame: Frame, direction: FrameDirection):
          await super().process_frame(frame, direction)

          context = None
          if isinstance(frame, OpenAILLMContextFrame):
              context: OpenAILLMContext = frame.context
              await self.push_frame(frame, direction) # Add this line here to pass frame along
          elif isinstance(frame, LLMMessagesFrame):
              context = OpenAILLMContext.from_messages(frame.messages)
          elif isinstance(frame, VisionImageRawFrame):
              context = OpenAILLMContext()
              context.add_image_frame_message(
                  format=frame.format, size=frame.size, image=frame.image, text=frame.text
              )
          elif isinstance(frame, LLMUpdateSettingsFrame):
              await self._update_settings(frame.settings)
          else:
              await self.push_frame(frame, direction)
  	###############################################################################


  from helpers.freeplay_frame import FreeplayProcessor
  from freeplay import Freeplay, SessionInfo

  	fp_client = Freeplay(
          freeplay_api_key=os.getenv("FREEPLAY_API_KEY"),
          api_base=os.getenv("FREEPLAY_API_BASE")
      )

      # Get the unformatted prompt from Freeplay
      # Get the unformatted prompt here and then bind it later on.
      # This reduces latency in the system.
      unformatted_prompt = fp_client.prompts.get(
          project_id=os.getenv("FREEPLAY_PROJECT_ID"),
          template_name=os.getenv("PROMPT_NAME"),
          environment="latest",
      )
      formatted_prompt = unformatted_prompt.bind(
          variables={<optional vars>},
          history=[],
      ).format()
      # Pass the formatted prompt to the LLM
      llm = OpenAILLMService(model=formatted_prompt.prompt_info.model,
                             tools=formatted_prompt.tool_schema if formatted_prompt.tool_schema else None,
                             api_key=os.getenv("OPENAI_API_KEY"),
                             **formatted_prompt.prompt_info.model_parameters)

      # Pass the Freeplay client to the FreeplayProcessor
      freeplay_processor = FreeplayProcessor(fp_client=fp_client, template_name="voice-assistant", session=session, debug=True)

   #....Additional Pipeline Configuration...

       pipeline = Pipeline(
          [
              transport.input(),  # Websocket input from client
              stt,  # Speech-To-Text
              context_aggregator.user(),
              llm,  # LLM
              tts,  # Text-To-Speech
              freeplay_processor, # Freeplay Logger (after tts so it can capture assistant audio)
              transport.output(),  # Websocket output to client
              audiobuffer,  # Used to buffer the audio in the pipeline
              context_aggregator.assistant(),
          ]
      )

  #####################
  # FreeplayLLMLogger #
  #####################
  import os
  import io
  import wave
  import time
  import base64
  import datetime
  from pipecat.frames.frames import (
      Frame,
      LLMFullResponseStartFrame,
      LLMFullResponseEndFrame,
      UserStartedSpeakingFrame,
      UserStoppedSpeakingFrame,
      InputAudioRawFrame,
      BotStartedSpeakingFrame,
      BotStoppedSpeakingFrame,
      MetricsFrame,
      TTSAudioRawFrame,
  )
  from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContextFrame
  from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
  from freeplay import (
      Freeplay,
      RecordPayload,
      CallInfo,
      SessionInfo,
  )
  from freeplay.resources.prompts import PromptInfo
  from pipecat.metrics.metrics import ProcessingMetricsData, TTFBMetricsData


  class FreeplayProcessor(FrameProcessor):
      """Logs LLM interactions and audio to Freeplay with simplified structure."""

      def __init__(
          self,
          fp_client: Freeplay,
          template_name: str,
          session: SessionInfo = None,
          required_information: str = None,
          unformatted_prompt: PromptInfo = None,
      ):
          super().__init__()
          self.fp_client = fp_client
          self.template_name = template_name
          self.conversation_id = self._new_conv_id()
          self.total_completion_time = 0
          self.required_information = required_information
          self.deepgram_latency = 0

          # Audio related properties
          self.sample_width = 2
          self.sample_rate = 8000
          self.num_channels = 1
          self._user_audio = bytearray()
          self._bot_audio = bytearray()
          self.user_speaking = False
          self.bot_speaking = False

          # Freeplay related properties
          self.conversation_history = []
          self.session = session
          self.most_recent_user_message = None
          self.most_recent_completion = None
          self.unformatted_prompt = unformatted_prompt
          self.reset_recent_messages()

      def _new_conv_id(self) -> str:
          """Generate a new conversation ID based on the current timestamp (this represents a customer id or similar)."""
          return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

      def reset_recent_messages(self):
          """Reset all temporary message and audio storage."""
          self.most_recent_user_message = None
          self.most_recent_completion = None
          self._user_audio = bytearray()
          self._bot_audio = bytearray()
          self.total_completion_time = 0
          self.deepgram_latency = 0

      async def process_frame(self, frame: Frame, direction: FrameDirection):
          """Process incoming frames and handle Freeplay logging."""
          await super().process_frame(frame, direction)

          # Handle LLM response frames
          if isinstance(frame, (LLMFullResponseStartFrame, LLMFullResponseEndFrame)):
              event = "START" if isinstance(frame, LLMFullResponseStartFrame) else "END"
              print(f"LLMFullResponseFrame: {event}", flush=True)

          # Handle LLM context frame - this is where we log to Freeplay
          elif isinstance(frame, OpenAILLMContextFrame):
              messages = frame.context.messages
              # Extract user message and completion from context
              user_messages = [m for m in messages if m.get("role") == "user"]
              if user_messages:
                  self.most_recent_user_message = user_messages[-1].get("content")

              completions = [m for m in messages if m.get("role") == "assistant"]
              if completions:
                  self.most_recent_completion = completions[-1].get("content")

              # Log to Freeplay when we have both user input and completion
              if self.most_recent_user_message and self.most_recent_completion:
                  self._record_to_freeplay()

          # Handle audio state changes
          elif isinstance(frame, UserStartedSpeakingFrame):
              self.user_speaking = True
          elif isinstance(frame, UserStoppedSpeakingFrame):
              self.user_speaking = False
          elif isinstance(frame, BotStartedSpeakingFrame):
              self.bot_speaking = True
          elif isinstance(frame, BotStoppedSpeakingFrame):
              self.bot_speaking = False

          # # Handle audio data
          elif isinstance(frame, InputAudioRawFrame):
              if self.user_speaking:
                  self._user_audio.extend(frame.audio)
          elif isinstance(frame, TTSAudioRawFrame):
              if self.bot_speaking:
                  self._bot_audio.extend(frame.audio)

          # Handle metrics for LLM completion time
          elif isinstance(frame, MetricsFrame):
              self.metrics = frame.data
              for metric in frame.data:
                  if isinstance(metric, ProcessingMetricsData):
                      if "LLMService" in metric.processor:
                          self.total_completion_time = metric.value
                  elif isinstance(metric, TTFBMetricsData):
                      if "DeepgramSTTService" in metric.processor:
                          self.deepgram_latency += metric.value

          # Pass frame to next processor
          await self.push_frame(frame, direction)

      def _record_to_freeplay(self):
          """Record the current conversation state to Freeplay."""
          # Create a new trace for this interaction
          trace = self.session.create_trace(
              input=self.most_recent_user_message,
              custom_metadata={
                  "deepgram_latency": self.deepgram_latency,
              },
          )

          self.conversation_history.append(
              {
                  "role": "user",
                  "content": [
                      {"type": "text", "text": self.most_recent_user_message},
                      {
                          "type": "input_audio",
                          "input_audio": {
                              "data": base64.b64encode(
                                  self._make_wav_bytes(
                                      self._user_audio, prepend_silence_secs=1
                                  )
                              ).decode("utf-8"),
                              "format": "wav",
                          },
                      },
                  ],
              },
          )

          # Bind the variables to the prompt
          if self.unformatted_prompt:
              formatted = self.unformatted_prompt.bind(
                  variables={"required_information": self.required_information},
                  history=self.conversation_history,
              ).format()
          else:
              # Get formatted prompt. Note this adds latency to the pipeline
              formatted = self.fp_client.prompts.get_formatted(
                  project_id=os.getenv("FREEPLAY_PROJECT_ID"),
                  template_name=self.template_name,
                  environment="latest",
                  history=self.conversation_history,
                  variables={"required_information": self.required_information},
              )

          # Calculate latency for the LLM interaction
          start, end = time.time(), time.time() + self.total_completion_time

          try:
              # Prepare metadata and record payload
              custom_metadata = {
                  "conversation_id": str(self.conversation_id),
              }

              # Add assistant's response to conversation history
              last_message = {
                  "role": "assistant",
                  "content": [
                      {"type": "text", "text": self.most_recent_completion},
                  ],
                  "audio": {
                      "id": self.conversation_id,
                      "data": base64.b64encode(
                          self._make_wav_bytes(self._bot_audio, prepend_silence_secs=1)
                      ).decode("utf-8"),
                      "expires_at": 1729234747,
                      "transcript": self.most_recent_completion,
                  },
              }
              self.conversation_history.append(last_message)

              # Create recording in Freeplay
              self.fp_client.recordings.create(
                RecordPayload(
                      project_id=PROJECT_ID,
                      all_messages=[
                          *formatted.llm_prompt,
                          last_message,  # Add the last message to the record call
                      ],
                      session_info=SessionInfo(
                          self.session.session_id, custom_metadata=custom_metadata
                      ),
                      inputs={"required_information": self.required_information},
                      prompt_version_info=formatted.prompt_info,
                      call_info=CallInfo.from_prompt_info(
                          formatted.prompt_info, start, end
                      ),
                      trace_info=trace,
                  )
              )

              # Record output to trace
              trace.record_output(
                  os.getenv("FREEPLAY_PROJECT_ID"),
                  self.most_recent_completion,
              )

              print(
                  f"Successfully recorded to Freeplay - completion time: {self.total_completion_time}s",
                  flush=True,
              )
              self.reset_recent_messages()

          except Exception as e:
              print(f"Error recording to Freeplay: {e}", flush=True)
              self.reset_recent_messages()

      def _make_wav_bytes(self, pcm: bytes, prepend_silence_secs: int = 1) -> bytes:
          """Convert PCM audio data to WAV format with optional silence prepend."""
          buf = io.BytesIO()
          if prepend_silence_secs > 0:
              silence_samples = int(
                  self.sample_rate
                  * self.sample_width
                  * self.num_channels
                  * prepend_silence_secs
              )
              silence = b"\x00" * silence_samples
              pcm = silence + pcm
          with wave.open(buf, "wb") as wf:
              wf.setnchannels(self.num_channels)
              wf.setsampwidth(self.sample_width)
              wf.setframerate(self.sample_rate)
              wf.writeframes(pcm)

          return buf.getvalue()

  ```
</CodeGroup>
