from pipecat.observers.base_observer import BaseObserver, FramePushed, FrameProcessed
from pipecat.frames.frames import (
InterruptionFrame,
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
)
from pipecat.processors.frame_processor import FrameDirection
from loguru import logger
class DebugObserver(BaseObserver):
"""Observer to log interruptions and bot speaking events to the console.
Logs all frame instances of:
- InterruptionFrame
- BotStartedSpeakingFrame
- BotStoppedSpeakingFrame
This allows you to see the frame flow from processor to processor through the pipeline for these frames.
Log format: [EVENT TYPE]: [source processor] → [destination processor] at [timestamp]s
"""
async def on_push_frame(self, data: FramePushed):
time_sec = data.timestamp / 1_000_000_000
arrow = "→" if data.direction == FrameDirection.DOWNSTREAM else "←"
if isinstance(data.frame, InterruptionFrame):
logger.info(f"⚡ INTERRUPTION START: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
elif isinstance(data.frame, BotStartedSpeakingFrame):
logger.info(f"🤖 BOT START SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")
elif isinstance(data.frame, BotStoppedSpeakingFrame):
logger.info(f"🤖 BOT STOP SPEAKING: {data.source} {arrow} {data.destination} at {time_sec:.2f}s")