Refactor STT pipeline and CLI documentation

Split the STT worker into a collector and a transcription worker
to offload heavy processing to a background thread. Add the
`--whisper-model` flag and implement LLM latency logging. Expand
the README with comprehensive CLI usage instructions.
This commit is contained in:
2026-05-31 15:04:41 -07:00
parent 71ecdb3468
commit da5ab1bb44
5 changed files with 136 additions and 36 deletions
+10 -1
View File
@@ -1,5 +1,6 @@
import logging
import os
import time
from posix import system
from this import s
from typing import Any, Dict, Optional
@@ -109,12 +110,20 @@ class LLMProcessor:
logger.debug("--- LLM CALL END ---")
try:
start_time = time.perf_counter()
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
response_format=response_format,
extra_body={"enable_thinking": False},
extra_body={
"chat_template_kwargs": {
"enable_thinking": False
}
},
)
elapsed_time = time.perf_counter() - start_time
logger.info(f"LLM request completed in {elapsed_time:.2f}s")
content = response.choices[0].message.content
# Debugging: Dump outputs
+63 -14
View File
@@ -39,13 +39,18 @@ logger = logging.getLogger(__name__)
class PipelineOrchestrator:
def __init__(self, loop: asyncio.AbstractEventLoop, llm_config: Optional[dict] = None):
def __init__(
self,
loop: asyncio.AbstractEventLoop,
llm_config: Optional[dict] = None,
whisper_model: str = "base",
):
self.loop = loop
self.llm_config = llm_config or {}
# Modules
self.listener = AudioListener(loop=self.loop)
self.transcriber = Transcriber(model_size="base", device="cuda")
self.transcriber = Transcriber(model_size=whisper_model, device="cuda")
self.processor = LLMProcessor(**self.llm_config)
self.rag_manager = RAGManager(llm_config=self.llm_config)
@@ -57,6 +62,9 @@ class PipelineOrchestrator:
self.log_queue = asyncio.Queue()
self.persistence_queue = asyncio.Queue()
# Synchronization
self.transcription_event = asyncio.Event()
self.is_running = False
# Conversation history for context
@@ -84,11 +92,12 @@ class PipelineOrchestrator:
return f"Conversation History:\n{context_text}\n\n"
async def stt_worker(self):
async def stt_collector_worker(self):
"""
Worker that handles STT: Audio -> Text.
Worker that handles STT Collection: Audio -> Buffer.
This task is highly responsive and only manages the buffer.
"""
logger.info("STT Worker started.")
logger.info("STT Collector Worker started.")
while self.is_running:
try:
# Get audio chunk from listener
@@ -105,33 +114,68 @@ class PipelineOrchestrator:
):
self.audio_buffer.pop(0)
# Concatenate buffer for transcription
full_audio = np.concatenate(self.audio_buffer)
# Signal the transcription worker that new data is available
self.transcription_event.set()
# Transcribe (WhisperX now returns a list of (speaker, text, start, end))
except Exception as e:
logger.error(f"STT Collector Worker error: {e}")
# Small sleep to prevent tight loop
await asyncio.sleep(0.01)
async def stt_transcription_worker(self):
"""
Worker that handles STT Transcription: Buffer -> Text.
This task handles the heavy lifting in a separate thread.
"""
logger.info("STT Transcription Worker started.")
while self.is_running:
try:
# Wait for a signal that new data is available
await self.transcription_event.wait()
self.transcription_event.clear()
# 1. Take a snapshot of the current buffer to avoid race conditions
# while the collector is appending new chunks.
buffer_snapshot = list(self.audio_buffer)
if not buffer_snapshot:
continue
# 2. Perform transcription in a separate thread.
# We pass the snapshot to the helper which handles concatenation and transcription.
results = await asyncio.to_thread(
self.transcriber.transcribe, full_audio
self._transcribe_buffer_snapshot, buffer_snapshot
)
# Filter for only new segments that start after the last processed segment
# 3. Filter for only new segments that start after the last processed segment
new_segments = [
res for res in results if res[2] >= self.last_processed_end_time
]
if new_segments:
for speaker, text, start, end in new_segments:
logger.info(f"Transcribed: [{speaker}] {text}")
logger.info(f"STT Raw Transcription: [{speaker}] {text}")
# Push raw transcription to log queue for UI visibility
await self.log_queue.put(f"[{speaker}] {text}")
await self.stt_to_clean_queue.put((speaker, text))
self.last_processed_end_time = max(
self.last_processed_end_time, end
)
except Exception as e:
logger.error(f"STT Worker error: {e}")
logger.error(f"STT Transcription Worker error: {e}")
# Small sleep to prevent tight loop if get_chunk is fast
# Small sleep to prevent tight loop
await asyncio.sleep(0.1)
def _transcribe_buffer_snapshot(self, buffer_snapshot):
"""
Helper method to be run in a thread.
Concatenates the buffer snapshot and transcribes it.
"""
full_audio = np.concatenate(buffer_snapshot)
return self.transcriber.transcribe(full_audio)
async def clean_worker(self):
"""
Worker that handles Text Cleaning: Raw STT -> Filtered Text.
@@ -204,6 +248,7 @@ class PipelineOrchestrator:
while self.is_running:
try:
logger.info("LLM Worker: Waiting for input...")
speaker, text = await internal_queue.get()
logger.info(f"LLM Worker: Processing text from {speaker}: {text}")
@@ -213,6 +258,9 @@ class PipelineOrchestrator:
# Log the text sent to the LLM for UI affordance
await self.log_queue.put(f"[{speaker}] {text}")
# Log the filtered message being sent to the LLM
logger.info(f"LLM Worker: Sending filtered message to LLM: {text}")
# Structured extraction using the processor
extraction_result = await asyncio.to_thread(
self.processor.extract_structured_data,
@@ -301,7 +349,8 @@ class PipelineOrchestrator:
# Start workers as background tasks
tasks = [
asyncio.create_task(self.stt_worker()),
asyncio.create_task(self.stt_collector_worker()),
asyncio.create_task(self.stt_transcription_worker()),
asyncio.create_task(self.clean_worker()),
asyncio.create_task(self.llm_worker()),
asyncio.create_task(self.persistence_worker()),
+6 -18
View File
@@ -76,32 +76,27 @@ class ConfirmationApp(App):
}
#pending-facts-table {
height: 40%;
height: 30%;
border: solid white;
}
#llm-input-container {
height: 10%;
border: solid white;
padding: 1;
padding: 0;
}
#context-pane {
height: 50%;
height: 60%;
border: solid white;
}
#log-pane {
height: 30%;
height: 100%;
border: solid white;
background: #111;
}
#log-footer {
height: 70%;
border: solid white;
}
#modal-container {
width: 60%;
height: auto;
@@ -163,18 +158,11 @@ class ConfirmationApp(App):
Horizontal(
Vertical(
DataTable(id="pending-facts-table"),
Vertical(
Input(placeholder="Message LLM...", id="llm-input"),
id="llm-input-container",
),
Input(placeholder="Message LLM...", id="llm-input"),
ListView(id="context-pane"),
id="left-pane",
),
Vertical(
ListView(id="log-pane"),
Static("LATEST LLM INPUTS", id="log-footer"),
id="right-pane",
),
ListView(id="log-pane"),
id="content-wrapper",
),
id="main-container",