Skip to main content
The Observer pattern in Pipecat allows non-intrusive monitoring of frames as they flow through the pipeline. Observers can watch frame traffic without affecting the pipeline’s core functionality.

Base Observer

All observers must inherit from BaseObserver and can implement these methods:
  • on_push_frame(data: FramePushed): Called when a frame is pushed from one processor to another
  • on_process_frame(data: FrameProcessed): Called when a frame is being processed by a processor
  • on_pipeline_started(): Called after the StartFrame has been processed by all processors in the pipeline
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed

class CustomObserver(BaseObserver):
    async def on_push_frame(self, data: FramePushed):
        # Your frame observation logic here
        pass

    async def on_process_frame(self, data: FrameProcessed):
        # Your frame processing observation logic here
        pass

    async def on_pipeline_started(self):
        # Called when the pipeline has fully started
        pass

Available Observers

Pipecat provides several built-in observers:
  • LLMLogObserver: Logs LLM activity and responses
  • TranscriptionLogObserver: Logs speech-to-text transcription events
  • RTVIObserver: Converts internal frames to RTVI protocol messages for server to client messaging
  • StartupTimingObserver: Measures processor startup times and transport readiness
  • UserBotLatencyObserver: Measures user-to-bot response latency
  • TurnTrackingObserver: Tracks conversation turns and events

Using Multiple Observers

You can attach multiple observers to a pipeline task. Each observer will be notified of all frames:
task = PipelineTask(
    pipeline,
    params=PipelineParams(
        observers=[LLMLogObserver(), TranscriptionLogObserver(), CustomObserver()],
    ),
)

Example: Debug Observer

Here’s an example observer that logs interruptions and bot speaking events:
from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
from pipecat.frames.frames import (
    InterruptionFrame,
    BotStartedSpeakingFrame,
    BotStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from loguru import logger

class DebugObserver(BaseObserver):
    """Observer to log interruptions and bot speaking events to the console.

    Logs all frame instances of:
    - InterruptionFrame
    - BotStartedSpeakingFrame
    - BotStoppedSpeakingFrame

    This allows you to see the frame flow from processor to processor through the pipeline for these frames.
    Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
    """

    async def on_push_frame(self, data: FramePushed):
        time_sec = data.timestamp / 1_000_000_000
        arrow = "→" if data.direction == FrameDirection.DOWNSTREAM else "←"

        if isinstance(data.frame, InterruptionFrame):
            logger.info(f"⚡ INTERRUPTION START: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
        elif isinstance(data.frame, BotStartedSpeakingFrame):
            logger.info(f"🤖 BOT START SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
        elif isinstance(data.frame, BotStoppedSpeakingFrame):
            logger.info(f"🤖 BOT STOP SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")

Common Use Cases

Observers are particularly useful for:
  • Debugging frame flow
  • Logging specific events
  • Monitoring pipeline behavior
  • Collecting metrics
  • Converting internal frames to external messages