import asyncio import logging from typing import Tuple from src.llm.models import ContextUpdate from src.rag.manager import RAGManager logger = logging.getLogger(__name__) class ContextPipeline: def __init__(self, rag_manager: RAGManager): self.rag_manager = rag_manager async def process_message( self, speaker: str, text: str, context_queue: asyncio.Queue ): """ Processes a single message and pushes summarized insights to the context queue. """ try: # Use RAGManager.retrieve with summarize=True to get concise insights # Run in a thread to avoid blocking the event loop insights = await asyncio.to_thread( self.rag_manager.retrieve, text, summarize=True ) if insights: logger.info( f"ContextPipeline: Found {len(insights)} insights for text: {text}" ) for insight in insights: await context_queue.put(insight) else: logger.debug(f"ContextPipeline: No insights found for text: {text}") except Exception as e: logger.error(f"ContextPipeline error processing message: {e}") async def run( self, transcript_queue: asyncio.Queue, context_queue: asyncio.Queue, stop_event: asyncio.Event, ): """ Main loop that listens to the transcript queue and triggers RAG lookups. """ logger.info("Context Pipeline started.") while not stop_event.is_set(): try: # Get raw text from transcript queue (speaker, text) speaker, text = await transcript_queue.get() # For now, implement the basic flow: every message triggers a lookup. # If performance becomes an issue, a filter can be added here. await self.process_message(speaker, text, context_queue) # Mark the task as done transcript_queue.task_done() except Exception as e: logger.error(f"Context Pipeline loop error: {e}") # Small sleep to avoid tight loop await asyncio.sleep(0.1) logger.info("Context Pipeline stopped.")