From da5ab1bb4489be5589113b8e06deaa87d9960294 Mon Sep 17 00:00:00 2001 From: charles Date: Sun, 31 May 2026 15:04:41 -0700 Subject: [PATCH] Refactor STT pipeline and CLI documentation Split the STT worker into a collector and a transcription worker to offload heavy processing to a background thread. Add the `--whisper-model` flag and implement LLM latency logging. Expand the README with comprehensive CLI usage instructions. --- README.md | 50 ++++++++++++++++++++++- main.py | 10 ++++- src/llm/processor.py | 11 +++++- src/pipeline/orchestrator.py | 77 +++++++++++++++++++++++++++++------- src/ui/tui.py | 24 +++-------- 5 files changed, 136 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 298b21e..37385cf 100644 --- a/README.md +++ b/README.md @@ -28,8 +28,54 @@ Distill long sessions into concise highlights. Use LLMs to summarize recorded tr ## Interface & Usage -- **CLI**: The primary interface for confirming automated updates and querying current game state. -- **Text Editors**: Since data is stored in Markdown and JSON, you can use any editor (VS Code, Vim, Obsidian) to manually refine your campaign data. +### CLI + +The primary interface for confirming automated updates and querying current game state. + +#### Command Line Arguments + +Use these flags to manage data ingestion and run the live capture pipeline. + +##### RAG Ingestion +Use these flags to add external documents to the RAG (Retrieval-Augmented Generation) system. + +| Flag | Description | +| :--- | :--- | +| `--ingest-pdf ` | Path to a PDF file to ingest | +| `--ingest-file ` | Path to a markdown file to ingest | +| `--ingest-dir ` | Path to a directory of markdown files to ingest | + +##### LLM Configuration +These flags allow you to override the environment variables for the LLM backend. + +| Flag | Description | +| :--- | :--- | +| `--llm-backend ` | Backend to use (`openai`, `ollama`, or `vllm`) | +| `--llm-model ` | The model name to use | +| `--llm-api-key ` | API key for the LLM backend | +| `--llm-base-url ` | Base URL for the LLM backend | + +##### Pipeline Execution +| Flag | Description | +| :--- | :--- | +| `--run-pipeline` | Starts the main orchestration pipeline (TUI + STT + LLM) | + +##### Example Command + +To run the live orchestration pipeline using the configuration specified in your `env.sh`, you can use: + +```bash +python main.py --run-pipeline \ + --llm-backend vllm \ + --llm-model google/gemma-4-26b-a4b-it \ + --llm-api-key no-key-required \ + --whisper-model medium \ + --llm-base-url https://vllm.tipsy.codes/v1 +``` + +### Text Editors + +Since data is stored in Markdown and JSON, you can use any editor (VS Code, Vim, Obsidian) to manually refine your campaign data. ## Technical Stack diff --git a/main.py b/main.py index c233993..13c8c97 100644 --- a/main.py +++ b/main.py @@ -53,6 +53,14 @@ def main(): help="Base URL for the LLM backend", ) + # STT Configuration Arguments + parser.add_argument( + "--whisper-model", + type=str, + default=os.environ.get("WHISPER_MODEL", "base"), + help="The Whisper model to use for STT", + ) + # Pipeline Execution Argument parser.add_argument( "--run-pipeline", @@ -75,7 +83,7 @@ def main(): if args.run_pipeline: async def run_pipeline(): loop = asyncio.get_event_loop() - orchestrator = PipelineOrchestrator(loop, llm_config=llm_config) + orchestrator = PipelineOrchestrator(loop, llm_config=llm_config, whisper_model=args.whisper_model) try: await orchestrator.run() except KeyboardInterrupt: diff --git a/src/llm/processor.py b/src/llm/processor.py index 336e6b2..81bbe2e 100644 --- a/src/llm/processor.py +++ b/src/llm/processor.py @@ -1,5 +1,6 @@ import logging import os +import time from posix import system from this import s from typing import Any, Dict, Optional @@ -109,12 +110,20 @@ class LLMProcessor: logger.debug("--- LLM CALL END ---") try: + start_time = time.perf_counter() response = self.client.chat.completions.create( model=self.model, messages=messages, response_format=response_format, - extra_body={"enable_thinking": False}, + extra_body={ + "chat_template_kwargs": { + "enable_thinking": False + } + }, ) + elapsed_time = time.perf_counter() - start_time + logger.info(f"LLM request completed in {elapsed_time:.2f}s") + content = response.choices[0].message.content # Debugging: Dump outputs diff --git a/src/pipeline/orchestrator.py b/src/pipeline/orchestrator.py index cb0817a..8e1ebab 100644 --- a/src/pipeline/orchestrator.py +++ b/src/pipeline/orchestrator.py @@ -39,13 +39,18 @@ logger = logging.getLogger(__name__) class PipelineOrchestrator: - def __init__(self, loop: asyncio.AbstractEventLoop, llm_config: Optional[dict] = None): + def __init__( + self, + loop: asyncio.AbstractEventLoop, + llm_config: Optional[dict] = None, + whisper_model: str = "base", + ): self.loop = loop self.llm_config = llm_config or {} # Modules self.listener = AudioListener(loop=self.loop) - self.transcriber = Transcriber(model_size="base", device="cuda") + self.transcriber = Transcriber(model_size=whisper_model, device="cuda") self.processor = LLMProcessor(**self.llm_config) self.rag_manager = RAGManager(llm_config=self.llm_config) @@ -57,6 +62,9 @@ class PipelineOrchestrator: self.log_queue = asyncio.Queue() self.persistence_queue = asyncio.Queue() + # Synchronization + self.transcription_event = asyncio.Event() + self.is_running = False # Conversation history for context @@ -84,11 +92,12 @@ class PipelineOrchestrator: return f"Conversation History:\n{context_text}\n\n" - async def stt_worker(self): + async def stt_collector_worker(self): """ - Worker that handles STT: Audio -> Text. + Worker that handles STT Collection: Audio -> Buffer. + This task is highly responsive and only manages the buffer. """ - logger.info("STT Worker started.") + logger.info("STT Collector Worker started.") while self.is_running: try: # Get audio chunk from listener @@ -105,33 +114,68 @@ class PipelineOrchestrator: ): self.audio_buffer.pop(0) - # Concatenate buffer for transcription - full_audio = np.concatenate(self.audio_buffer) + # Signal the transcription worker that new data is available + self.transcription_event.set() - # Transcribe (WhisperX now returns a list of (speaker, text, start, end)) + except Exception as e: + logger.error(f"STT Collector Worker error: {e}") + + # Small sleep to prevent tight loop + await asyncio.sleep(0.01) + + async def stt_transcription_worker(self): + """ + Worker that handles STT Transcription: Buffer -> Text. + This task handles the heavy lifting in a separate thread. + """ + logger.info("STT Transcription Worker started.") + while self.is_running: + try: + # Wait for a signal that new data is available + await self.transcription_event.wait() + self.transcription_event.clear() + + # 1. Take a snapshot of the current buffer to avoid race conditions + # while the collector is appending new chunks. + buffer_snapshot = list(self.audio_buffer) + if not buffer_snapshot: + continue + + # 2. Perform transcription in a separate thread. + # We pass the snapshot to the helper which handles concatenation and transcription. results = await asyncio.to_thread( - self.transcriber.transcribe, full_audio + self._transcribe_buffer_snapshot, buffer_snapshot ) - # Filter for only new segments that start after the last processed segment + # 3. Filter for only new segments that start after the last processed segment new_segments = [ res for res in results if res[2] >= self.last_processed_end_time ] if new_segments: for speaker, text, start, end in new_segments: - logger.info(f"Transcribed: [{speaker}] {text}") + logger.info(f"STT Raw Transcription: [{speaker}] {text}") + # Push raw transcription to log queue for UI visibility + await self.log_queue.put(f"[{speaker}] {text}") await self.stt_to_clean_queue.put((speaker, text)) self.last_processed_end_time = max( self.last_processed_end_time, end ) except Exception as e: - logger.error(f"STT Worker error: {e}") + logger.error(f"STT Transcription Worker error: {e}") - # Small sleep to prevent tight loop if get_chunk is fast + # Small sleep to prevent tight loop await asyncio.sleep(0.1) + def _transcribe_buffer_snapshot(self, buffer_snapshot): + """ + Helper method to be run in a thread. + Concatenates the buffer snapshot and transcribes it. + """ + full_audio = np.concatenate(buffer_snapshot) + return self.transcriber.transcribe(full_audio) + async def clean_worker(self): """ Worker that handles Text Cleaning: Raw STT -> Filtered Text. @@ -204,6 +248,7 @@ class PipelineOrchestrator: while self.is_running: try: + logger.info("LLM Worker: Waiting for input...") speaker, text = await internal_queue.get() logger.info(f"LLM Worker: Processing text from {speaker}: {text}") @@ -213,6 +258,9 @@ class PipelineOrchestrator: # Log the text sent to the LLM for UI affordance await self.log_queue.put(f"[{speaker}] {text}") + # Log the filtered message being sent to the LLM + logger.info(f"LLM Worker: Sending filtered message to LLM: {text}") + # Structured extraction using the processor extraction_result = await asyncio.to_thread( self.processor.extract_structured_data, @@ -301,7 +349,8 @@ class PipelineOrchestrator: # Start workers as background tasks tasks = [ - asyncio.create_task(self.stt_worker()), + asyncio.create_task(self.stt_collector_worker()), + asyncio.create_task(self.stt_transcription_worker()), asyncio.create_task(self.clean_worker()), asyncio.create_task(self.llm_worker()), asyncio.create_task(self.persistence_worker()), diff --git a/src/ui/tui.py b/src/ui/tui.py index 6f08213..2de243a 100644 --- a/src/ui/tui.py +++ b/src/ui/tui.py @@ -76,32 +76,27 @@ class ConfirmationApp(App): } #pending-facts-table { - height: 40%; + height: 30%; border: solid white; } #llm-input-container { height: 10%; border: solid white; - padding: 1; + padding: 0; } #context-pane { - height: 50%; + height: 60%; border: solid white; } #log-pane { - height: 30%; + height: 100%; border: solid white; background: #111; } - #log-footer { - height: 70%; - border: solid white; - } - #modal-container { width: 60%; height: auto; @@ -163,18 +158,11 @@ class ConfirmationApp(App): Horizontal( Vertical( DataTable(id="pending-facts-table"), - Vertical( - Input(placeholder="Message LLM...", id="llm-input"), - id="llm-input-container", - ), + Input(placeholder="Message LLM...", id="llm-input"), ListView(id="context-pane"), id="left-pane", ), - Vertical( - ListView(id="log-pane"), - Static("LATEST LLM INPUTS", id="log-footer"), - id="right-pane", - ), + ListView(id="log-pane"), id="content-wrapper", ), id="main-container",