Compare commits

...

18 Commits

Author SHA1 Message Date
charles 284c50acd8 refactor(stt): remove speaker identification (diarization) from transcriber
Removes the speaker diarization pipeline and alignment model from the STT module to reduce resource usage and complexity.
The transcription API remains compatible by returning 'Unknown' as the speaker ID for all transcribed segments.

- Removed DiarizationPipeline and align_model from Transcriber
- Simplified transcribe method to return basic transcription segments
- Updated logging and docstrings to reflect changes
2026-06-06 20:52:04 -07:00
charles 01b049cf37 Update main.py 2026-06-05 23:10:39 -07:00
charles da5ab1bb44 Refactor STT pipeline and CLI documentation
Split the STT worker into a collector and a transcription worker
to offload heavy processing to a background thread. Add the
`--whisper-model` flag and implement LLM latency logging. Expand
the README with comprehensive CLI usage instructions.
2026-05-31 15:04:41 -07:00
charles 71ecdb3468 Add LLM configuration and pipeline execution 2026-05-31 14:13:58 -07:00
charles 2858c7e235 Update tui.py 2026-05-30 23:03:04 -07:00
charles 15dfbfb467 Add LLM backend support and improve debugging observability
- Add LLM_BACKEND to environment configuration
- Implement detailed debug logging for LLM request/response cycles
- Add missing llama-index dependencies for embeddings and chroma
- Update prompt constraints to prevent lore redundancy
- Enable CUDA for transcription and set logging to DEBUG level
- Add entry point for running the orchestrator directly
- Cleanup unused comment in TUI context updates
2026-05-28 23:06:25 -07:00
charles 49127d695a Small changes 2026-05-28 22:08:00 -07:00
charles 2363cde160 Refactor LLM processor and improve async handling
Move contextual information handling from noise filtering to extraction
and centralize LLM call logic. Wrap blocking transcription and state
update calls in asyncio.to_thread to prevent event loop blocking.
Update transcriber model size to base.
2026-05-28 18:54:09 -07:00
charles afa8d17f10 Mostly working 2026-05-28 00:08:52 -07:00
charles 1cfba3a0ae Add LLM input logging and UI log pane
- Add log_queue to PipelineOrchestrator and log LLM inputs to UI
- Use entity_name for lore update logs instead of topic
- Pass log_queue into ConfirmationApp to display logs in UI
- Introduce a log pane and left/right pane layout in the UI
- Poll and render log messages via a new poll_log_updates worker
- Run log polling with Textual workers to avoid GC issues
- Fix ListView insertion by wrapping ListItem in a list
- Relax RAG similarity threshold from 0.7 to 0.5
2026-05-27 23:09:11 -07:00
charles 1098bdb2f9 Stable state 2026-05-27 22:30:20 -07:00
charles 58f736a5f8 refactor(ui): rewrite ConfirmationApp with three-pane layout
- Implement Pending Facts, LLM Input, and Context Pane using Textual
- Add keyboard shortcuts for Accept, Reject, and Edit actions
2026-05-27 20:05:29 -07:00
charles b25f82cefc Implement RAG summarization and context pipeline
- Add ContextPipeline for async RAG lookups
- Implement RAG result summarization via LLMProcessor
- Add CLI flag for PDF ingestion
- Strip markdown code blocks from LLM responses
- Update TUI context display to use ListItems
2026-05-27 00:17:47 -07:00
charles b83d9b5e6a Update UI and prompts 2026-05-26 23:25:53 -07:00
charles 679eca3fef fix: suppress whisperx.asr warnings 2026-05-26 22:17:50 -07:00
charles 954f2f50d8 feat: implement RAG capabilities and Context Pane integration
- Add RAG capabilities using LlamaIndex and ChromaDB
- Implement RAGManager for PHB indexing and retrieval
- Integrate RAG pipeline into orchestrator to trigger queries based on extracted entities
- Update TUI to include a 3-column layout with a real-time Context Pane
- Define ContextUpdate data models in src/llm/models.py
- Update requirements.txt with new dependencies
2026-05-26 22:07:12 -07:00
charles f4c98fb2b9 Migrate to WhisperX for speaker diarization
Implement a sliding window audio buffer and update the transcriber to
use WhisperX for transcription, alignment, and speaker identification.
Update the pipeline to handle and store speaker-attributed transcripts.

Additionally, update the LLM processor's reasoning parameter to
"enable_thinking".
2026-05-26 21:48:30 -07:00
charles d0fcdfab01 Improvements 2026-05-26 21:07:58 -07:00
32 changed files with 1293 additions and 354 deletions
+2 -1
View File
@@ -1,7 +1,8 @@
# D&D Helpers Configuration # D&D Helpers Configuration
OPENAI_API_KEY=no-key-required OPENAI_API_KEY=no-key-required
OPENAI_BASE_URL=https://vllm.tipsy.codes/v1 OPENAI_BASE_URL=https://vllm.tipsy.codes/v1
LLM_MODEL=Intel/gemma-4-31B-it-int4-AutoRound LLM_MODEL=google/gemma-4-26b-a4b-it
LLM_BACKEND=vllm
#LLM_BACKEND=ollama #LLM_BACKEND=ollama
#LLM_MODEL=gemma:2b #LLM_MODEL=gemma:2b
WHISPER_MODEL=base WHISPER_MODEL=base
+2 -1
View File
@@ -1,2 +1,3 @@
artifacts/ artifacts/
__pycache__ **/__pycache__/
data
+1
View File
@@ -0,0 +1 @@
3.12
+48 -2
View File
@@ -28,8 +28,54 @@ Distill long sessions into concise highlights. Use LLMs to summarize recorded tr
## Interface & Usage ## Interface & Usage
- **CLI**: The primary interface for confirming automated updates and querying current game state. ### CLI
- **Text Editors**: Since data is stored in Markdown and JSON, you can use any editor (VS Code, Vim, Obsidian) to manually refine your campaign data.
The primary interface for confirming automated updates and querying current game state.
#### Command Line Arguments
Use these flags to manage data ingestion and run the live capture pipeline.
##### RAG Ingestion
Use these flags to add external documents to the RAG (Retrieval-Augmented Generation) system.
| Flag | Description |
| :--- | :--- |
| `--ingest-pdf <path>` | Path to a PDF file to ingest |
| `--ingest-file <path>` | Path to a markdown file to ingest |
| `--ingest-dir <path>` | Path to a directory of markdown files to ingest |
##### LLM Configuration
These flags allow you to override the environment variables for the LLM backend.
| Flag | Description |
| :--- | :--- |
| `--llm-backend <backend>` | Backend to use (`openai`, `ollama`, or `vllm`) |
| `--llm-model <model>` | The model name to use |
| `--llm-api-key <key>` | API key for the LLM backend |
| `--llm-base-url <url>` | Base URL for the LLM backend |
##### Pipeline Execution
| Flag | Description |
| :--- | :--- |
| `--run-pipeline` | Starts the main orchestration pipeline (TUI + STT + LLM) |
##### Example Command
To run the live orchestration pipeline using the configuration specified in your `env.sh`, you can use:
```bash
python main.py --run-pipeline \
--llm-backend vllm \
--llm-model google/gemma-4-26b-a4b-it \
--llm-api-key no-key-required \
--whisper-model medium \
--llm-base-url https://vllm.tipsy.codes/v1
```
### Text Editors
Since data is stored in Markdown and JSON, you can use any editor (VS Code, Vim, Obsidian) to manually refine your campaign data.
## Technical Stack ## Technical Stack
+117
View File
@@ -0,0 +1,117 @@
import argparse
import asyncio
import os
from src.pipeline.orchestrator import PipelineOrchestrator
from src.rag.manager import RAGManager
def main():
parser = argparse.ArgumentParser(description="D&D Helpers CLI")
# RAG Ingestion Arguments
parser.add_argument(
"--ingest-pdf",
type=str,
help="Path to a PDF file to ingest into the RAG system",
)
parser.add_argument(
"--ingest-file",
type=str,
help="Path to a markdown file to ingest into the RAG system",
)
parser.add_argument(
"--ingest-dir",
type=str,
help="Path to a directory of markdown files to ingest into the RAG system",
)
# LLM Configuration Arguments
parser.add_argument(
"--llm-backend",
type=str,
choices=["openai", "ollama", "vllm"],
default=os.environ.get("LLM_BACKEND", "openai"),
help="LLM backend to use",
)
parser.add_argument(
"--llm-model",
type=str,
default=os.environ.get("LLM_MODEL", "gpt-4o"),
help="The model to use for processing",
)
parser.add_argument(
"--llm-api-key",
type=str,
default=os.environ.get("OPENAI_API_KEY"),
help="API key for the LLM backend",
)
parser.add_argument(
"--llm-base-url",
type=str,
default=os.environ.get("OPENAI_BASE_URL"),
help="Base URL for the LLM backend",
)
# STT Configuration Arguments
parser.add_argument(
"--whisper-model",
type=str,
default=os.environ.get("WHISPER_MODEL", "turbo"),
help="The Whisper model to use for STT",
)
# Pipeline Execution Argument
parser.add_argument(
"--run-pipeline",
action="store_true",
help="Start the main orchestration pipeline (TUI + STT + LLM)",
)
args = parser.parse_args()
llm_config = {
"backend": args.llm_backend,
"model": args.llm_model,
"api_key": args.llm_api_key,
"base_url": args.llm_base_url,
}
# Remove None values to allow defaults to take over if not provided
llm_config = {k: v for k, v in llm_config.items() if v is not None}
if args.run_pipeline:
async def run_pipeline():
loop = asyncio.get_event_loop()
orchestrator = PipelineOrchestrator(loop, llm_config=llm_config, whisper_model=args.whisper_model)
try:
await orchestrator.run()
except KeyboardInterrupt:
orchestrator.stop()
asyncio.run(run_pipeline())
return
rag_manager = RAGManager(llm_config=llm_config)
if args.ingest_pdf:
print(f"Ingesting PDF: {args.ingest_pdf}...")
rag_manager.ingest_pdf(args.ingest_pdf)
print("PDF ingestion complete.")
if args.ingest_file:
print(f"Ingesting File: {args.ingest_file}...")
rag_manager.ingest_file(args.ingest_file)
print("File ingestion complete.")
if args.ingest_dir:
print(f"Ingesting Directory: {args.ingest_dir}...")
rag_manager.ingest_directory(args.ingest_dir)
print("Directory ingestion complete.")
if not any([args.ingest_pdf, args.ingest_file, args.ingest_dir, args.run_pipeline]):
print("Hello from dnd-helpers! Use --help to see available commands.")
if __name__ == "__main__":
main()
+7
View File
@@ -0,0 +1,7 @@
[project]
name = "dnd-helpers"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = []
+6 -1
View File
@@ -1,8 +1,13 @@
# Core dependencies for D&D Helpers # Core dependencies for D&D Helpers
faster-whisper whisperx
sounddevice sounddevice
pydantic pydantic
textual textual
typer typer
openai openai
python-dotenv python-dotenv
llama-index
chromadb
pdfplumber
llama-index-embeddings-huggingface
llama-index-vector_stores-chroma
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
+21
View File
@@ -44,6 +44,22 @@ class CharacterStateUpdate(BaseModel):
) )
class ContextUpdate(BaseModel):
query: str = Field(..., description="The search query used to retrieve the context")
snippet: str = Field(
..., description="The relevant text snippet retrieved from the source"
)
source: str = Field(
..., description="The source of the snippet (e.g., 'PHB p. 12')"
)
class FilterResult(BaseModel):
filtered_text: str = Field(
..., description="Cleaned transcript used for structured data extraction"
)
class ExtractionResult(BaseModel): class ExtractionResult(BaseModel):
lore_updates: List[LoreUpdate] = Field( lore_updates: List[LoreUpdate] = Field(
default_factory=list, description="List of discovered lore facts", alias="lore" default_factory=list, description="List of discovered lore facts", alias="lore"
@@ -58,6 +74,11 @@ class ExtractionResult(BaseModel):
description="List of significant plot points or events", description="List of significant plot points or events",
alias="events", alias="events",
) )
context_updates: List[ContextUpdate] = Field(
default_factory=list,
description="List of context updates",
alias="context",
)
class Config: class Config:
populate_by_name = True populate_by_name = True
+123 -51
View File
@@ -1,11 +1,21 @@
import logging
import os import os
import time
from posix import system
from this import s
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from openai import OpenAI from openai import OpenAI
from pydantic import ValidationError from pydantic import ValidationError
from .models import ExtractionResult from .models import ExtractionResult, FilterResult
from .prompts import EXTRACTION_SYSTEM_PROMPT, NOISE_FILTER_SYSTEM_PROMPT from .prompts import (
EXTRACTION_SYSTEM_PROMPT,
NOISE_FILTER_SYSTEM_PROMPT,
QUERY_ANSWER_SYSTEM_PROMPT,
)
logger = logging.getLogger(__name__)
class LLMProcessor: class LLMProcessor:
@@ -14,6 +24,7 @@ class LLMProcessor:
api_key: Optional[str] = None, api_key: Optional[str] = None,
base_url: Optional[str] = None, base_url: Optional[str] = None,
model: Optional[str] = None, model: Optional[str] = None,
backend: Optional[str] = None,
): ):
""" """
Initializes the LLMProcessor. Initializes the LLMProcessor.
@@ -21,14 +32,16 @@ class LLMProcessor:
:param api_key: OpenAI API key. If None, it looks for OPENAI_API_KEY in environment variables. :param api_key: OpenAI API key. If None, it looks for OPENAI_API_KEY in environment variables.
:param base_url: OpenAI-compatible base URL (e.g., for vLLM). :param base_url: OpenAI-compatible base URL (e.g., for vLLM).
:param model: The model to use for processing. If None, it looks for LLM_MODEL in environment variables. :param model: The model to use for processing. If None, it looks for LLM_MODEL in environment variables.
:param backend: The LLM backend to use (openai, ollama, or vllm).
""" """
backend = os.environ.get("LLM_BACKEND", "openai").lower() # Use provided backend or fallback to environment variable
backend_env = backend or os.environ.get("LLM_BACKEND", "openai").lower()
if backend == "ollama": if backend_env == "ollama":
# Ollama's OpenAI-compatible API # Ollama's OpenAI-compatible API
final_base_url = base_url or "http://localhost:11434/v1" final_base_url = base_url or "http://localhost:11434/v1"
final_api_key = api_key or "ollama" final_api_key = api_key or "ollama"
elif backend == "vllm": elif backend_env == "vllm":
# Remote vLLM server # Remote vLLM server
final_base_url = base_url or os.environ.get("OPENAI_BASE_URL") final_base_url = base_url or os.environ.get("OPENAI_BASE_URL")
final_api_key = api_key or os.environ.get("OPENAI_API_KEY") final_api_key = api_key or os.environ.get("OPENAI_API_KEY")
@@ -36,93 +49,152 @@ class LLMProcessor:
final_base_url = base_url or os.environ.get("OPENAI_BASE_URL") final_base_url = base_url or os.environ.get("OPENAI_BASE_URL")
final_api_key = api_key or os.environ.get("OPENAI_API_KEY") final_api_key = api_key or os.environ.get("OPENAI_API_KEY")
logger.info(f"Using LLM backend: {backend_env}")
try: try:
self.client = OpenAI( self.client = OpenAI(
api_key=final_api_key, api_key=final_api_key,
base_url=final_base_url, base_url=final_base_url,
) )
# Simple connectivity check for local backends # Simple connectivity check for local backends
if backend == "ollama": if backend_env == "ollama":
# We can't easily check connectivity without making a call, # We can't easily check connectivity without making a call,
# but we can ensure the client is initialized. # but we can ensure the client is initialized.
pass pass
except Exception as e: except Exception as e:
print(f"Error initializing LLM client for backend {backend}: {e}") logger.error(f"Error initializing LLM client for backend {backend_env}: {e}")
raise raise
self.model = model or os.environ.get("LLM_MODEL", "gpt-4o") self.model = model or os.environ.get("LLM_MODEL", "gpt-4o")
def _strip_markdown_code_blocks(self, content: str) -> str:
"""
Strips markdown code blocks (e.g., ```json ... ```) from the content.
"""
import re
# Remove opening and closing code blocks
content = re.sub(
r"^```(?:json)?\n?|```$", "", content, flags=re.MULTILINE
).strip()
return content
def _call_llm( def _call_llm(
self, self,
system_prompt: str, system_prompt: str,
user_prompt: str, user_prompt: str,
context: Optional[str] = None,
response_format: Optional[Any] = None, response_format: Optional[Any] = None,
) -> str: ) -> str:
""" """
Generic method to call the LLM. Generic method to call the LLM.
""" """
try:
response = self.client.chat.completions.create(
model=self.model,
messages = [ messages = [
{"role": "system", "content": system_prompt}, {"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}, ]
], if context:
response_format=response_format, messages.append(
extra_body={"include_reasoning": False}, {
"role": "system",
"content": f"Context from previous conversation:\n{context}",
}
) )
return response.choices[0].message.content
messages.append({"role": "user", "content": user_prompt})
# Debugging: Dump inputs
logger.debug("--- LLM CALL START ---")
logger.debug(f"Model: {self.model}")
logger.debug(f"Messages: {messages}")
if response_format:
logger.debug(f"Response Format: {response_format}")
logger.debug("--- LLM CALL END ---")
try:
start_time = time.perf_counter()
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
response_format=response_format,
extra_body={
"chat_template_kwargs": {
"enable_thinking": False
}
},
)
elapsed_time = time.perf_counter() - start_time
logger.info(f"LLM request completed in {elapsed_time:.2f}s")
content = response.choices[0].message.content
# Debugging: Dump outputs
logger.debug("--- LLM RESPONSE START ---")
logger.debug(f"Content: {content}")
logger.debug("--- LLM RESPONSE END ---")
return self._strip_markdown_code_blocks(content)
except Exception as e: except Exception as e:
print(f"LLM Error: {e}") logger.error(f"LLM Error: {e}")
return "" return ""
def filter_transcript(self, text: str) -> str: def generate_answer(self, query: str, context: str) -> str:
"""
Generates a natural language answer to a DM query.
"""
return self._call_llm(
QUERY_ANSWER_SYSTEM_PROMPT,
query,
context=context,
)
def filter_transcript(
self, text: str, context: Optional[str] = None
) -> FilterResult:
""" """
Stage 1: Raw Transcript -> Filtered Text. Stage 1: Raw Transcript -> Filtered Text.
""" """
result = self._call_llm(NOISE_FILTER_SYSTEM_PROMPT, text) result = self._call_llm(
print(f"LLM Processor (Filter): {text} -> {result}") NOISE_FILTER_SYSTEM_PROMPT,
return result text,
context=context,
def extract_structured_data(self, filtered_text: str) -> ExtractionResult:
"""
Stage 2: Filtered Text -> Structured Data.
"""
print(f"LLM Processor (Extract): Calling extraction for: {filtered_text}")
try:
# Using standard chat.completions.create with JSON mode for better compatibility with vLLM
print("LLM Processor (Extract): Sending request to backend...")
response = self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": EXTRACTION_SYSTEM_PROMPT},
{"role": "user", "content": filtered_text},
],
response_format={"type": "json_object"}, response_format={"type": "json_object"},
extra_body={"include_reasoning": False},
) )
print("LLM Processor (Extract): Response received from backend.") logger.info(f"LLM Processor (Filter): {text} -> {result}")
import json import json
content = response.choices[0].message.content try:
print(f"LLM Processor (Extract): Raw JSON response: {content}") data = json.loads(result)
data = json.loads(content) return FilterResult(**data)
except (json.JSONDecodeError, ValidationError) as e:
logger.error(f"Filter Parsing Error: {e}")
return FilterResult(contextual_info="", filtered_text=result)
def extract_structured_data(
self, filtered_text: str, context: Optional[str] = None
) -> ExtractionResult:
"""
Stage 2: Filtered Text -> Structured Data.
"""
logger.info(f"LLM Processor (Extract): Calling extraction for: {filtered_text}")
try:
system_prompt = EXTRACTION_SYSTEM_PROMPT
if context:
system_prompt += f"\n{context}"
result = self._call_llm(
system_prompt=system_prompt,
user_prompt=filtered_text,
response_format={"type": "json_object"},
)
import json
data = json.loads(result)
# Map the JSON data to the Pydantic model # Map the JSON data to the Pydantic model
return ExtractionResult(**data) return ExtractionResult(**data)
except Exception as e: except Exception as e:
print(f"Extraction Error: {e}") logger.error(f"Extraction Error: {e}")
# Return an empty ExtractionResult if parsing fails # Return an empty ExtractionResult if parsing fails
return ExtractionResult() return ExtractionResult()
def process_pipeline(self, raw_text: str) -> ExtractionResult:
"""
Executes the two-stage pipeline: Raw Transcript -> Filtered Text -> Structured Data.
"""
filtered_text = self.filter_transcript(raw_text)
if not filtered_text:
return ExtractionResult()
return self.extract_structured_data(filtered_text)
+29 -7
View File
@@ -1,8 +1,19 @@
# System prompts for the LLM pipeline QUERY_ANSWER_SYSTEM_PROMPT = """
You are a helpful D&D Game Master's assistant. Your goal is to provide accurate, concise, and helpful answers to the DM's questions based on the provided context (conversation history and RAG snippets).
Guidelines:
- Use the provided context as your primary source of truth.
- If the answer is not in the context, state that you don't have enough information, but feel free to provide general D&D 5e rules as a fallback.
- Keep responses natural and professional.
- Be concise.
"""
NOISE_FILTER_SYSTEM_PROMPT = """ NOISE_FILTER_SYSTEM_PROMPT = """
You are a D&D Game Master's assistant. Given a transcript, remove all out-of-character (OOC) chatter, logistical discussions (e.g., 'Where is my d20?'), and non-relevant noise. You are a D&D Game Master's assistant. Given a transcript, remove all out-of-character (OOC) chatter, logistical discussions (e.g., 'Where is my d20?'), and non-relevant noise.
Output only the in-character dialogue and game-relevant events.
You must output your response as a JSON object with the following keys:
- "filtered_text": The cleaned transcript. IMPORTANT: Keep all player questions, requests for rule clarifications, and mentions of spells, NPCs, or locations in this field, as they are used to trigger knowledge base lookups.
Keep the original speakers' names if they are present in the transcript. Keep the original speakers' names if they are present in the transcript.
Do not add any commentary or summaries. Just filter the text. Do not add any commentary or summaries. Just filter the text.
""" """
@@ -10,15 +21,14 @@ Do not add any commentary or summaries. Just filter the text.
EXTRACTION_SYSTEM_PROMPT = """ EXTRACTION_SYSTEM_PROMPT = """
You are a D&D session analyzer. Your goal is to extract structured data from a filtered transcript. You are a D&D session analyzer. Your goal is to extract structured data from a filtered transcript.
Extract any changes to character states (HP, status effects, inventory) and any new lore facts (NPCs, locations, world-building). Extract any changes to character states (HP, status effects, inventory) and any new lore facts (NPCs, locations, world-building).
In addition extracting updates to character state and lore, look for the oppertunity to provide useful context,
DO NOT THINK. such as the answer to a player's question or the resolution of a lore fact.
CONSTRAINTS: CONSTRAINTS:
- OUTPUT ONLY VALID JSON. - OUTPUT ONLY VALID JSON.
- DO NOT include any commentary, explanations, or "thought" blocks.
- DO NOT include any keys other than "lore", "character_state", and "events".
- If no relevant information is found, return empty lists for all keys. - If no relevant information is found, return empty lists for all keys.
- If a character name is not specified (e.g., "Your character"), use "Player Character". - If a character name is not specified (e.g., "Your character"), use "Player Character".
- Do not repeat lore if it is already known; only provide new or updated facts.
Strict Output Format: Strict Output Format:
Return a JSON object with exactly these keys: Return a JSON object with exactly these keys:
@@ -26,6 +36,7 @@ Return a JSON object with exactly these keys:
- "category": (string) 'NPC', 'Location', 'WorldBuilding', or 'Plot' - "category": (string) 'NPC', 'Location', 'WorldBuilding', or 'Plot'
- "entity_name": (string) The name of the NPC, Location, or entity - "entity_name": (string) The name of the NPC, Location, or entity
- "content": (string) The actual lore fact or description - "content": (string) The actual lore fact or description
- "context": (string, optional) Helpful information for the DM (e.g., descriptions of characters, spell details, game mechanics) discovered via the knowledge base or the transcript.
2. "character_state": A list of objects. Each object MUST have: 2. "character_state": A list of objects. Each object MUST have:
- "character_name": (string) Name of the character - "character_name": (string) Name of the character
- "hp_change": (integer, optional) Change in HP - "hp_change": (integer, optional) Change in HP
@@ -33,6 +44,10 @@ Return a JSON object with exactly these keys:
- "status_effects_removed": (list of strings) - "status_effects_removed": (list of strings)
- "inventory_changes": (list of objects with "item", "quantity", "action") - "inventory_changes": (list of objects with "item", "quantity", "action")
3. "events": A list of strings. Each string should be a concise description of a significant plot development. 3. "events": A list of strings. Each string should be a concise description of a significant plot development.
4. "context": A list of objects. Each object MUST have:
- "query": (string) The original query or topic this context relates to
- "snippet": (string) The contextual information or rule explanation
- "source": (string) The source of the context (e.g., "players handbook, page 68")
Example Output: Example Output:
{ {
@@ -40,7 +55,7 @@ Example Output:
{ {
"category": "NPC", "category": "NPC",
"entity_name": "Thorne", "entity_name": "Thorne",
"content": "A gruff dwarf who runs the local tavern." "content": "A gruff dwarf who runs the local tavern.",
} }
], ],
"character_state": [ "character_state": [
@@ -54,6 +69,13 @@ Example Output:
], ],
"events": [ "events": [
"The party discovered the secret entrance to the crypt." "The party discovered the secret entrance to the crypt."
],
"context": [
{
"query": "fireball",
"snippet": "fireball does 1d6 damage, with an area of effect of 10 feet circle",
"source": "players handbook, page 68"
}
] ]
} }
Binary file not shown.
Binary file not shown.
+67
View File
@@ -0,0 +1,67 @@
import asyncio
import logging
from typing import Tuple
from src.llm.models import ContextUpdate
from src.rag.manager import RAGManager
logger = logging.getLogger(__name__)
class ContextPipeline:
def __init__(self, rag_manager: RAGManager):
self.rag_manager = rag_manager
async def process_message(
self, speaker: str, text: str, context_queue: asyncio.Queue
):
"""
Processes a single message and pushes summarized insights to the context queue.
"""
try:
# Use RAGManager.retrieve with summarize=True to get concise insights
# Run in a thread to avoid blocking the event loop
insights = await asyncio.to_thread(
self.rag_manager.retrieve, text, summarize=True
)
if insights:
logger.info(
f"ContextPipeline: Found {len(insights)} insights for text: {text}"
)
for insight in insights:
await context_queue.put(insight)
else:
logger.debug(f"ContextPipeline: No insights found for text: {text}")
except Exception as e:
logger.error(f"ContextPipeline error processing message: {e}")
async def run(
self,
transcript_queue: asyncio.Queue,
context_queue: asyncio.Queue,
stop_event: asyncio.Event,
):
"""
Main loop that listens to the transcript queue and triggers RAG lookups.
"""
logger.info("Context Pipeline started.")
while not stop_event.is_set():
try:
# Get raw text from transcript queue (speaker, text)
speaker, text = await transcript_queue.get()
# For now, implement the basic flow: every message triggers a lookup.
# If performance becomes an issue, a filter can be added here.
await self.process_message(speaker, text, context_queue)
# Mark the task as done
transcript_queue.task_done()
except Exception as e:
logger.error(f"Context Pipeline loop error: {e}")
# Small sleep to avoid tight loop
await asyncio.sleep(0.1)
logger.info("Context Pipeline stopped.")
+303 -42
View File
@@ -1,81 +1,280 @@
import asyncio import asyncio
import logging import logging
import os
from pathlib import Path
from typing import List, Optional
from src.llm.models import ExtractionResult import numpy as np
from src.llm.models import (
CharacterStateUpdate,
ContextUpdate,
ExtractionResult,
LoreUpdate,
)
from src.llm.processor import LLMProcessor from src.llm.processor import LLMProcessor
from src.llm.prompts import EXTRACTION_SYSTEM_PROMPT, NOISE_FILTER_SYSTEM_PROMPT
from src.persistence.characters import update_character_state
from src.persistence.lore import update_lore
from src.rag.manager import RAGManager
from src.stt.listener import AudioListener from src.stt.listener import AudioListener
from src.stt.transcriber import Transcriber from src.stt.transcriber import Transcriber
from src.ui.tui import ConfirmationApp from src.ui.tui import ConfirmationApp
logging.basicConfig(level=logging.INFO) # Configure logging to write to a file instead of stdout
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
logging.FileHandler("pipeline.log"),
],
)
# Suppress verbose logging from STT libraries to keep the TUI clean
logging.getLogger("whisper").setLevel(logging.WARNING)
logging.getLogger("faster_whisper").setLevel(logging.WARNING)
logging.getLogger("pyannote").setLevel(logging.WARNING)
logging.getLogger("whisperx").setLevel(logging.ERROR)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PipelineOrchestrator: class PipelineOrchestrator:
def __init__(self, loop: asyncio.AbstractEventLoop): def __init__(
self,
loop: asyncio.AbstractEventLoop,
llm_config: Optional[dict] = None,
whisper_model: str = "base",
):
self.loop = loop self.loop = loop
self.llm_config = llm_config or {}
# Modules # Modules
self.listener = AudioListener(loop=self.loop) self.listener = AudioListener(loop=self.loop)
self.transcriber = Transcriber() self.transcriber = Transcriber(model_size=whisper_model, device="cuda")
self.processor = LLMProcessor() self.processor = LLMProcessor(**self.llm_config)
self.rag_manager = RAGManager(llm_config=self.llm_config)
# Queues # Queues
self.transcript_queue = asyncio.Queue() self.stt_to_clean_queue = asyncio.Queue()
self.proposal_queue = asyncio.Queue() self.ui_to_llm_queue = asyncio.Queue()
self.clean_to_llm_queue = asyncio.Queue()
self.llm_to_ui_queue = asyncio.Queue()
self.log_queue = asyncio.Queue()
self.persistence_queue = asyncio.Queue()
# Synchronization
self.transcription_event = asyncio.Event()
self.is_running = False self.is_running = False
async def stt_worker(self): # Conversation history for context
self.history = [] # List of strings (transcripts)
self.history_max_words = 1000
# STT Sliding Window Buffer
self.audio_buffer = [] # List of audio chunks
self.buffer_max_seconds = 30
self.sample_rate = 16000
self.buffer_max_samples = self.buffer_max_seconds * self.sample_rate
self.last_processed_end_time = 0.0
def _get_combined_context(self) -> str:
""" """
Worker that handles STT: Audio -> Text. Returns the trimmed conversation history as a context string.
""" """
logger.info("STT Worker started.") full_history_text = " ".join(self.history)
words = full_history_text.split()
if len(words) > self.history_max_words:
kept_words = words[-self.history_max_words :]
context_text = " ".join(kept_words)
else:
context_text = full_history_text
return f"Conversation History:\n{context_text}\n\n"
async def stt_collector_worker(self):
"""
Worker that handles STT Collection: Audio -> Buffer.
This task is highly responsive and only manages the buffer.
"""
logger.info("STT Collector Worker started.")
while self.is_running: while self.is_running:
try: try:
# Get audio chunk from listener # Get audio chunk from listener
audio_chunk = await self.listener.get_chunk() audio_chunk = await self.listener.get_chunk()
# Transcribe # Maintain sliding window buffer
text = self.transcriber.transcribe(audio_chunk) self.audio_buffer.append(audio_chunk)
current_buffer_samples = sum(len(c) for c in self.audio_buffer)
if text: if current_buffer_samples > self.buffer_max_samples:
logger.info(f"Transcribed: {text}") # Remove oldest chunks until we are within the buffer limit
await self.transcript_queue.put(text) while (
sum(len(c) for c in self.audio_buffer) > self.buffer_max_samples
):
self.audio_buffer.pop(0)
# Signal the transcription worker that new data is available
self.transcription_event.set()
except Exception as e: except Exception as e:
logger.error(f"STT Worker error: {e}") logger.error(f"STT Collector Worker error: {e}")
# Small sleep to prevent tight loop if get_chunk is fast # Small sleep to prevent tight loop
await asyncio.sleep(0.01)
async def stt_transcription_worker(self):
"""
Worker that handles STT Transcription: Buffer -> Text.
This task handles the heavy lifting in a separate thread.
"""
logger.info("STT Transcription Worker started.")
while self.is_running:
try:
# Wait for a signal that new data is available
await self.transcription_event.wait()
self.transcription_event.clear()
# 1. Take a snapshot of the current buffer to avoid race conditions
# while the collector is appending new chunks.
buffer_snapshot = list(self.audio_buffer)
if not buffer_snapshot:
continue
# 2. Perform transcription in a separate thread.
# We pass the snapshot to the helper which handles concatenation and transcription.
results = await asyncio.to_thread(
self._transcribe_buffer_snapshot, buffer_snapshot
)
# 3. Filter for only new segments that start after the last processed segment
new_segments = [
res for res in results if res[2] >= self.last_processed_end_time
]
if new_segments:
for speaker, text, start, end in new_segments:
logger.info(f"STT Raw Transcription: [{speaker}] {text}")
# Push raw transcription to log queue for UI visibility
await self.log_queue.put(f"[{speaker}] {text}")
await self.stt_to_clean_queue.put((speaker, text))
self.last_processed_end_time = max(
self.last_processed_end_time, end
)
except Exception as e:
logger.error(f"STT Transcription Worker error: {e}")
# Small sleep to prevent tight loop
await asyncio.sleep(0.1)
def _transcribe_buffer_snapshot(self, buffer_snapshot):
"""
Helper method to be run in a thread.
Concatenates the buffer snapshot and transcribes it.
"""
full_audio = np.concatenate(buffer_snapshot)
return self.transcriber.transcribe(full_audio)
async def clean_worker(self):
"""
Worker that handles Text Cleaning: Raw STT -> Filtered Text.
"""
logger.info("Clean Worker started.")
while self.is_running:
try:
# Get raw transcript from STT
speaker, raw_text = await self.stt_to_clean_queue.get()
logger.info(f"Clean Worker: Filtering text from {speaker}: {raw_text}")
# RAG Retrieval for context
context = await asyncio.to_thread(self.rag_manager.retrieve, raw_text)
# Filtering using the processor
filter_result = await asyncio.to_thread(
self.processor.filter_transcript,
raw_text,
context=context,
)
# Push filtered text to LLM queue
if filter_result.filtered_text:
await self.clean_to_llm_queue.put(
(speaker, filter_result.filtered_text)
)
logger.info(f"Clean Worker: Pushed filtered text to LLM queue.")
else:
logger.info("Clean Worker: No filtered text to push.")
except Exception as e:
logger.error(f"Clean Worker error: {e}")
# Small sleep to prevent tight loop
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
async def llm_worker(self): async def llm_worker(self):
""" """
Worker that handles LLM: Text -> Proposal. Worker that handles LLM: Filtered Text/UI Input -> Structured Data & UI Updates.
""" """
logger.info("LLM Worker started.") logger.info("LLM Worker started.")
# Internal queue to serialize processing from multiple sources
internal_queue = asyncio.Queue()
async def feed_clean():
while self.is_running: while self.is_running:
try: try:
# Get raw text from transcript queue item = await self.clean_to_llm_queue.get()
raw_text = await self.transcript_queue.get() await internal_queue.put(item)
except Exception as e:
logger.error(f"LLM Feeder (Clean) error: {e}")
logger.info(f"LLM Worker: Processing text: {raw_text}") async def feed_ui():
while self.is_running:
# Process via LLM (Filter -> Extract) try:
# Note: this is currently a synchronous call, which blocks the loop. item = await self.ui_to_llm_queue.get()
result = self.processor.process_pipeline(raw_text) if isinstance(item, (LoreUpdate, CharacterStateUpdate)):
await self.persistence_queue.put(item)
if (
result.lore_updates
or result.character_updates
or result.significant_events
):
logger.info(
f"LLM Worker: Proposal generated. Putting into proposal queue. (Lore: {len(result.lore_updates)}, Char: {len(result.character_updates)})"
)
await self.proposal_queue.put(result)
else: else:
logger.info("LLM Worker: No relevant game data extracted.") await internal_queue.put(("UI", item))
except Exception as e:
logger.error(f"LLM Feeder (UI) error: {e}")
# Start feeder tasks
feeders = [
asyncio.create_task(feed_clean()),
asyncio.create_task(feed_ui()),
]
while self.is_running:
try:
logger.info("LLM Worker: Waiting for input...")
speaker, text = await internal_queue.get()
logger.info(f"LLM Worker: Processing text from {speaker}: {text}")
# RAG Retrieval for context
context = await asyncio.to_thread(self.rag_manager.retrieve, text)
# Log the text sent to the LLM for UI affordance
await self.log_queue.put(f"[{speaker}] {text}")
# Log the filtered message being sent to the LLM
logger.info(f"LLM Worker: Sending filtered message to LLM: {text}")
# Structured extraction using the processor
extraction_result = await asyncio.to_thread(
self.processor.extract_structured_data,
text,
context=context,
)
# Send the entire result to UI for confirmation
await self.llm_to_ui_queue.put(extraction_result)
# UI Notification: Context Updates
for context_update in extraction_result.context_updates:
await self.llm_to_ui_queue.put(context_update)
logger.info(f"LLM Worker: Pushed context update to UI.")
except Exception as e: except Exception as e:
logger.error(f"LLM Worker error: {e}") logger.error(f"LLM Worker error: {e}")
@@ -83,18 +282,57 @@ class PipelineOrchestrator:
# Small sleep # Small sleep
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
# Clean up feeders
for f in feeders:
f.cancel()
async def persistence_worker(self):
"""
Worker that handles persistence: Confirmed updates -> Disk & RAG.
"""
logger.info("Persistence Worker started.")
while self.is_running:
try:
update = await self.persistence_queue.get()
if isinstance(update, LoreUpdate):
file_path = await asyncio.to_thread(update_lore, update)
await asyncio.to_thread(self.rag_manager.ingest_file, file_path)
logger.info(
f"Persistence Worker: Lore updated and ingested into RAG: {update.entity_name}"
)
elif isinstance(update, CharacterStateUpdate):
await asyncio.to_thread(update_character_state, update)
logger.info(
f"Persistence Worker: Character {update.character_name} state updated."
)
if hasattr(self.persistence_queue, "task_done"):
self.persistence_queue.task_done()
except Exception as e:
logger.error(f"Persistence Worker error: {e}")
await asyncio.sleep(0.1)
async def tui_worker(self): async def tui_worker(self):
""" """
Worker that handles TUI: Proposal -> Persistence. Worker that handles TUI: UI interactions.
""" """
logger.info("TUI Worker started.") logger.info("TUI Worker started.")
try: try:
# Launch TUI exactly once. # Launch TUI.
# Pass the proposal queue to the app. # Use the new queues for the TUI.
app = ConfirmationApp(proposal_queue=self.proposal_queue) app = ConfirmationApp(
ui_to_llm_queue=self.ui_to_llm_queue,
llm_to_ui_queue=self.llm_to_ui_queue,
log_queue=self.log_queue,
)
await app.run_async() await app.run_async()
self.stop()
except Exception as e: except Exception as e:
logger.error(f"TUI Worker error: {e}") logger.error(f"TUI Worker error: {e}")
self.stop()
except asyncio.CancelledError:
pass
async def run(self): async def run(self):
""" """
@@ -103,10 +341,19 @@ class PipelineOrchestrator:
self.is_running = True self.is_running = True
self.listener.start() self.listener.start()
# Initialize Context Pipeline
from src.pipeline.context_pipeline import ContextPipeline
self.context_pipeline = ContextPipeline(self.rag_manager)
stop_event = asyncio.Event()
# Start workers as background tasks # Start workers as background tasks
tasks = [ tasks = [
asyncio.create_task(self.stt_worker()), asyncio.create_task(self.stt_collector_worker()),
asyncio.create_task(self.stt_transcription_worker()),
asyncio.create_task(self.clean_worker()),
asyncio.create_task(self.llm_worker()), asyncio.create_task(self.llm_worker()),
asyncio.create_task(self.persistence_worker()),
asyncio.create_task(self.tui_worker()), asyncio.create_task(self.tui_worker()),
] ]
@@ -118,6 +365,7 @@ class PipelineOrchestrator:
pass pass
finally: finally:
self.is_running = False self.is_running = False
stop_event.set()
self.listener.stop() self.listener.stop()
for task in tasks: for task in tasks:
task.cancel() task.cancel()
@@ -127,6 +375,19 @@ class PipelineOrchestrator:
def stop(self): def stop(self):
""" """
Stops the pipeline. Stops.
""" """
self.is_running = False self.is_running = False
if __name__ == "__main__":
import asyncio
async def main():
loop = asyncio.get_event_loop()
orchestrator = PipelineOrchestrator(loop)
try:
await orchestrator.run()
except KeyboardInterrupt:
orchestrator.stop()
asyncio.run(main())
+194
View File
@@ -0,0 +1,194 @@
import os
from typing import Any, List, Optional
import chromadb
import pdfplumber
from llama_index.core import Document, Settings, StorageContext, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.vector_stores.chroma import ChromaVectorStore
from src.llm.models import ContextUpdate
from src.llm.processor import LLMProcessor
class RAGManager:
def __init__(self, persist_dir: str = "data/rag_index", llm_config: Optional[dict] = None):
self.persist_dir = persist_dir
self.llm_config = llm_config or {}
self.db = chromadb.PersistentClient(path=self.persist_dir)
self.collection_name = "phb_collection"
# Initialize Chroma Vector Store
self.vector_store = ChromaVectorStore(
chroma_collection=self.db.get_or_create_collection(self.collection_name)
)
# Initialize Storage Context
self.storage_context = StorageContext.from_defaults(
vector_store=self.vector_store
)
# Use a local HuggingFace embedding model to avoid API key issues during verification
Settings.embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")
# Load index if it exists, otherwise initialize
try:
self.index = VectorStoreIndex.from_vector_store(
self.vector_store, storage_context=self.storage_context
)
except Exception:
self.index = None
def ingest_pdf(self, pdf_path: str):
"""
Parses a PDF, chunks it, and stores embeddings in ChromaDB.
"""
documents = []
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages):
text = page.extract_text()
if text:
# Create a document for each page
# In a real scenario, we might use a recursive character splitter
# but for PHB, page-level chunking is a good start.
doc = Document(
text=text, metadata={"source": f"PHB p. {i + 1}", "page": i + 1}
)
documents.append(doc)
if not documents:
print(f"No text extracted from {pdf_path}")
return
# Create index from documents
self.index = VectorStoreIndex.from_documents(
documents, storage_context=self.storage_context
)
print(f"Successfully ingested {pdf_path} into the vector store.")
def ingest_file(self, file_path: str):
"""
Loads a single markdown file into the index.
"""
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
# Use the filename as the source
source = os.path.basename(file_path)
doc = Document(text=text, metadata={"source": source})
# If index doesn't exist, initialize it
if not self.index:
self.index = VectorStoreIndex.from_documents(
[doc], storage_context=self.storage_context
)
else:
# Insert into existing index
self.index.insert(doc)
print(f"Successfully ingested {file_path} into the vector store.")
def ingest_directory(self, dir_path: str):
"""
Recursively loads all markdown files in a directory into the index.
"""
files_processed = 0
for root, _, files in os.walk(dir_path):
for file in files:
if file.endswith(".md"):
file_path = os.path.join(root, file)
self.ingest_file(file_path)
files_processed += 1
print(
f"Successfully ingested {files_processed} files from {dir_path} into the vector store."
)
def summarize_results(self, query: str, nodes: List[Any]) -> List[ContextUpdate]:
"""
Uses an LLM to transform raw snippets into concise "insights", filtering out irrelevant content.
"""
if not nodes:
return []
processor = LLMProcessor(**self.llm_config)
# Construct the context from retrieved nodes
context_text = "\n\n".join(
[
f"Source: {node.metadata.get('source', 'Unknown')}\nContent: {node.text}"
for node in nodes
]
)
system_prompt = (
"You are a precise research assistant. Your task is to analyze provided text snippets "
"and extract only the information that is directly relevant to the user's query. "
"1. If a snippet is irrelevant to the query, discard it completely. "
"2. For relevant information, synthesize it into a concise, single-sentence 'insight'. "
"3. Do not simply repeat the raw text; summarize it for clarity and brevity. "
"4. If no snippets are relevant to the query, return an empty list. "
"5. Be factual and do not hallucinate. Use only the provided snippets."
)
user_prompt = (
f"Query: {query}\n\n"
f"Snippets:\n{context_text}\n\n"
"Return a JSON object with a key 'insights' containing a list of objects, each with 'snippet' and 'source'."
)
result = processor._call_llm(
system_prompt,
user_prompt,
response_format={"type": "json_object"},
)
import json
try:
data = json.loads(result)
# Expecting a format like {"insights": [{"snippet": "...", "source": "..."}, ...]}
insights = data.get("insights", []) if isinstance(data, dict) else data
if not insights:
print(f"Summarization: No relevant insights found for query: {query}")
return [
ContextUpdate(
query=query, snippet=item["snippet"], source=item["source"]
)
for item in insights
]
except (json.JSONDecodeError, KeyError, TypeError) as e:
print(f"Summarization parsing error: {e}")
return []
def retrieve(
self, query: str, top_k: int = 5, summarize: bool = False
) -> List[ContextUpdate]:
"""
Retrieves the top-K most relevant snippets for a given query,
filtering for those with a similarity score > 0.7.
"""
if not self.index:
print("Index not initialized. Please ingest documents first.")
return []
# Create a retriever
retriever = self.index.as_retriever(similarity_top_k=top_k)
nodes = retriever.retrieve(query)
# Filter nodes by similarity score (threshold > 0.7)
nodes = [node for node in nodes if node.score >= 0.5]
if summarize:
return self.summarize_results(query, nodes)
results = []
for node in nodes:
# Extract metadata
source = node.metadata.get("source", "Unknown Source")
results.append(ContextUpdate(query=query, snippet=node.text, source=source))
return results
Binary file not shown.
Binary file not shown.
Binary file not shown.
+1 -1
View File
@@ -5,7 +5,7 @@ import numpy as np
import sounddevice as sd import sounddevice as sd
import torch import torch
logging.basicConfig(level=logging.INFO) # Do not call basicConfig here, as it's called in the orchestrator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
+37 -22
View File
@@ -1,69 +1,84 @@
import logging import logging
from faster_whisper import WhisperModel import numpy as np
import whisperx
logging.basicConfig(level=logging.INFO) # Do not call basicConfig here, as it's called in the orchestrator
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Transcriber: class Transcriber:
""" """
Converts audio chunks (numpy arrays) into text using faster-whisper. Converts audio chunks (numpy arrays) into text using WhisperX.
""" """
def __init__(self, model_size="base", device="cpu", compute_type="int8"): def __init__(
self, model_size="base", device="cpu", compute_type="int8", language="en"
):
""" """
Initializes the faster-whisper model. Initializes the WhisperX model.
Args: Args:
model_size (str): The size of the model to use (e.g., "tiny", "base", "small"). model_size (str): The size of the model to use (e.g., "tiny", "base", "small").
device (str): The device to run the model on ("cpu" or "cuda"). device (str): The device to run the model on ("cpu" or "cuda").
compute_type (str): The compute type to use (e.g., "int8", "float16"). compute_type (str): The compute type to use (e.g., "int8", "float16").
language (str): The language code for alignment (e.g., "en").
""" """
self.device = device
self.compute_type = compute_type
self.language = language
logger.info( logger.info(
f"Loading faster-whisper model: {model_size} on {device} ({compute_type})..." f"Loading WhisperX model: {model_size} on {device} ({compute_type})..."
) )
try: try:
self.model = WhisperModel( # Load transcription model
self.model = whisperx.load_model(
model_size, device=device, compute_type=compute_type model_size, device=device, compute_type=compute_type
) )
logger.info("Model loaded successfully.")
logger.info("WhisperX model loaded successfully.")
except Exception as e: except Exception as e:
logger.error(f"Failed to load faster-whisper model: {e}") logger.error(f"Failed to load WhisperX models: {e}")
raise raise
def transcribe(self, audio_chunk): def transcribe(self, audio_chunk):
""" """
Transcribes a single audio chunk. Transcribes an audio chunk.
Args: Args:
audio_chunk (np.ndarray): The audio data as a numpy array. audio_chunk (np.ndarray): The audio data as a numpy array.
Returns: Returns:
str: The transcribed text. list: A list of tuples (speaker_id, text, start, end).
""" """
if audio_chunk is None: if audio_chunk is None:
return "" return []
try: try:
# faster-whisper expects audio in float32 and 1D array # WhisperX expects audio in float32 and 1D array
audio_data = audio_chunk.astype("float32").flatten() audio = audio_chunk.astype("float32").flatten()
# Transcribe the audio # 1. Perform transcription
segments, info = self.model.transcribe(audio_data, beam_size=5) # batch_size is set to 16 for efficiency; can be adjusted based on VRAM
result = self.model.transcribe(audio, batch_size=16)
# Combine segments into a single string # Extract ("Unknown", text, start, end) tuples from the transcription result
text = " ".join([segment.text.strip() for segment in segments]) output = []
for segment in result.get("segments", []):
text = segment.get("text", "").strip()
start = segment.get("start", 0.0)
end = segment.get("end", 0.0)
if text:
output.append(("Unknown", text, start, end))
return text.strip() return output
except Exception as e: except Exception as e:
logger.error(f"Transcription error: {e}") logger.error(f"Transcription error: {e}")
return "" return []
def close(self): def close(self):
""" """
Explicitly release model resources if necessary. Explicitly release model resources if necessary.
""" """
# faster-whisper's WhisperModel doesn't have a standard close(),
# but we'll provide this for consistency.
pass pass
Binary file not shown.
Binary file not shown.
Binary file not shown.
+245 -234
View File
@@ -3,315 +3,326 @@ from typing import List, Optional, Union
from textual.app import App, ComposeResult from textual.app import App, ComposeResult
from textual.containers import Container, Horizontal, Vertical from textual.containers import Container, Horizontal, Vertical
from textual.widgets import Button, DataTable, Footer, Input, Label, Static from textual.message import Message
from textual.screen import ModalScreen
from textual.widgets import (
Button,
DataTable,
Footer,
Input,
Label,
ListItem,
ListView,
Static,
)
from src.llm.models import CharacterStateUpdate, ExtractionResult, LoreUpdate from src.llm.models import CharacterStateUpdate, ContextUpdate, ExtractionResult, LoreUpdate
from src.persistence.characters import update_character_state from src.persistence.characters import update_character_state
from src.persistence.lore import update_lore from src.persistence.lore import update_lore
class EditModal(ModalScreen):
def __init__(self, initial_text: str, initial_type: str, initial_target: str, on_save: callable):
super().__init__()
self.initial_text = initial_text
self.initial_type = initial_type
self.initial_target = initial_target
self.on_save = on_save
def compose(self) -> ComposeResult:
with Vertical(id="modal-container"):
yield Label("Type:")
yield Input(value=self.initial_type, id="edit-type")
yield Label("Target:")
yield Input(value=self.initial_target, id="edit-target")
yield Label("Content:")
yield Input(value=self.initial_text, id="edit-input")
with Horizontal(id="modal-actions"):
yield Button("Save", id="btn-save")
yield Button("Cancel", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
if event.button.id == "btn-save":
edit_input = self.query_one("#edit-input", Input)
type_input = self.query_one("#edit-type", Input)
target_input = self.query_one("#edit-target", Input)
self.on_save(edit_input.value, type_input.value, target_input.value)
self.dismiss()
elif event.button.id == "btn-cancel":
self.dismiss()
class ConfirmationApp(App): class ConfirmationApp(App):
CSS = """ CSS = """
Screen { #main-container {
layout: vertical;
height: 100%;
}
#content-wrapper {
layout: horizontal; layout: horizontal;
height: 100%;
} }
#left-pane { #left-pane {
width: 40%; width: 70%;
border: solid; layout: vertical;
padding: 1;
} }
#right-pane { #right-pane {
width: 60%; width: 30%;
border: solid;
padding: 1;
layout: vertical; layout: vertical;
border: solid white;
} }
#details-container { #pending-facts-table {
height: auto; height: 30%;
margin-bottom: 1; border: solid white;
} }
#actions-container { #llm-input-container {
height: 10%;
border: solid white;
padding: 0;
}
#context-pane {
height: 60%;
border: solid white;
}
#log-pane {
height: 100%;
border: solid white;
background: #111;
}
#modal-container {
width: 60%;
height: auto; height: auto;
layout: horizontal; border: double white;
background: #222;
padding: 2;
align: center middle; align: center middle;
} }
#edit-container { #modal-actions {
display: none;
height: auto; height: auto;
layout: vertical; margin-top: 1;
border: solid; align: right middle;
padding: 1;
} }
Button { #edit-input, #edit-type, #edit-target {
margin: 0 1; margin: 1 0;
}
#llm-input {
width: 100%;
}
ListItem Static {
border: solid grey;
margin: 1 0;
padding: 1;
} }
""" """
BINDINGS = [ BINDINGS = [
("q", "quit", "Quit"), ("q", "quit", "Quit"),
("a", "accept", "Accept"),
("r", "reject", "Reject"),
("e", "edit", "Edit"),
("enter", "send", "Send"),
] ]
def __init__( def __init__(
self, self,
result: Optional[ExtractionResult] = None, result: Optional[ExtractionResult] = None,
proposal_queue: Optional[asyncio.Queue] = None, ui_to_llm_queue: Optional[asyncio.Queue] = None,
llm_to_ui_queue: Optional[asyncio.Queue] = None,
log_queue: Optional[asyncio.Queue] = None,
): ):
super().__init__() super().__init__()
self.result = result self.result = result
self.proposal_queue = proposal_queue self.ui_to_llm_queue = ui_to_llm_queue
self.llm_to_ui_queue = llm_to_ui_queue
self.log_queue = log_queue
self.pending_updates: List[Union[LoreUpdate, CharacterStateUpdate]] = [] self.pending_updates: List[Union[LoreUpdate, CharacterStateUpdate]] = []
if result: if result:
# Populate pending updates from result
self.pending_updates.extend(result.lore_updates) self.pending_updates.extend(result.lore_updates)
self.pending_updates.extend(result.character_updates) self.pending_updates.extend(result.character_updates)
self.selected_index = -1
def compose(self) -> ComposeResult: def compose(self) -> ComposeResult:
yield Container( yield Vertical(
Horizontal( Horizontal(
Vertical( Vertical(
DataTable(id="update-table"), DataTable(id="pending-facts-table"),
Input(placeholder="Message LLM...", id="llm-input"),
ListView(id="context-pane"),
id="left-pane", id="left-pane",
), ),
Vertical( ListView(id="log-pane"),
Vertical( id="content-wrapper",
Label("Details:", id="details-label"),
Static("No update selected", id="details-text"),
id="details-container",
), ),
Vertical( id="main-container",
Label("Edit Value:"),
Input(id="edit-input"),
Button("Save Edit", id="save-edit"),
id="edit-container",
),
Horizontal(
Button("Accept", id="btn-accept"),
Button("Reject", id="btn-reject"),
Button("Edit", id="btn-edit"),
id="actions-container",
),
id="right-pane",
),
),
Footer(),
) )
yield Footer()
def on_mount(self) -> None: def on_mount(self) -> None:
table = self.query_one("#update-table", DataTable) table = self.query_one("#pending-facts-table", DataTable)
table.cursor_type = "row" table.cursor_type = "row"
table.add_columns("Type", "Target", "Update") table.add_columns("Type", "Target", "Content")
for i, update in enumerate(self.pending_updates): for i, update in enumerate(self.pending_updates):
if isinstance(update, LoreUpdate): self.add_update_to_table(update, i)
table.add_row(
"Lore", update.entity_name or "General", update.content, key=str(i)
)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += (
f", Removed: {', '.join(update.status_effects_removed)}"
)
table.add_row("Char", update.character_name, change_text, key=str(i))
if self.pending_updates: if self.ui_to_llm_queue:
self.handle_row_highlight(0) # We don't need a poller for this, just the action_send
self.query_one("#btn-accept", Button).focus()
if self.proposal_queue:
self.run_worker(self.poll_proposal_queue, thread=False)
async def poll_proposal_queue(self) -> None:
"""
Background worker that polls the proposal queue for new extraction results.
"""
while True:
try:
result = await self.proposal_queue.get()
self.add_result(result)
except Exception as e:
# Log error but keep the worker running
self.log(f"Error polling proposal queue: {e}")
finally:
# Signal that the item has been processed
if hasattr(self.proposal_queue, "task_done"):
self.proposal_queue.task_done()
def add_result(self, result: ExtractionResult) -> None:
"""
Adds results from the LLM processor to the TUI table.
"""
table = self.query_one("#update-table", DataTable)
start_index = len(self.pending_updates)
for update in result.lore_updates + result.character_updates:
self.pending_updates.append(update)
actual_index = len(self.pending_updates) - 1
if isinstance(update, LoreUpdate):
table.add_row(
"Lore",
update.entity_name or "General",
update.content,
key=str(actual_index),
)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += (
f", Removed: {', '.join(update.status_effects_removed)}"
)
table.add_row(
"Char", update.character_name, change_text, key=str(actual_index)
)
# If the table was previously empty and we added updates, focus the first one.
if start_index == 0 and self.pending_updates:
self.handle_row_highlight(0)
self.query_one("#btn-accept", Button).focus()
def on_data_table_row_highlighted(self, event: DataTable.RowHighlighted) -> None:
self.handle_row_highlight(event.cursor_row)
def handle_row_highlight(self, row: int) -> None:
self.selected_index = row
if self.selected_index < 0 or self.selected_index >= len(self.pending_updates):
return
update = self.pending_updates[self.selected_index]
details_text = self.query_one("#details-text", Static)
if isinstance(update, LoreUpdate):
details_text.update(
f"Category: {update.category}\nTarget: {update.entity_name}\nContent: {update.content}"
)
elif isinstance(update, CharacterStateUpdate):
details_text.update(
f"Character: {update.character_name}\nHP Change: {update.hp_change}\nAdded Effects: {update.status_effects_added}\nRemoved Effects: {update.status_effects_removed}"
)
# Reset to detail view
self.query_one("#edit-container", Vertical).styles.display = "none"
self.query_one("#details-container", Vertical).styles.display = "block"
def on_button_pressed(self, event: Button.Pressed) -> None:
if self.selected_index == -1:
return
update = self.pending_updates[self.selected_index]
if event.button.id == "btn-accept":
if isinstance(update, LoreUpdate):
update_lore(update)
elif isinstance(update, CharacterStateUpdate):
update_character_state(update)
self.remove_update(self.selected_index)
elif event.button.id == "btn-reject":
self.remove_update(self.selected_index)
elif event.button.id == "btn-edit":
self.show_edit_mode(update)
elif event.button.id == "save-edit":
self.save_edit(update)
def show_edit_mode(self, update: Union[LoreUpdate, CharacterStateUpdate]) -> None:
edit_input = self.query_one("#edit-input", Input)
if isinstance(update, LoreUpdate):
edit_input.value = update.content
elif isinstance(update, CharacterStateUpdate):
# For simplicity, only allow editing HP change in this TUI
edit_input.value = str(update.hp_change or 0)
self.query_one("#edit-container", Vertical).styles.display = "block"
self.query_one("#details-container", Vertical).styles.display = "none"
def save_edit(self, update: Union[LoreUpdate, CharacterStateUpdate]) -> None:
new_val = self.query_one("#edit-input", Input).value
if isinstance(update, LoreUpdate):
update.content = new_val
elif isinstance(update, CharacterStateUpdate):
try:
update.hp_change = int(new_val)
except ValueError:
# Ignore invalid integer input
pass pass
if self.llm_to_ui_queue:
# Use Textual workers so the task isn't garbage-collected and
# exceptions are surfaced via the worker manager.
self.run_worker(self.poll_llm_updates(), exclusive=False)
if self.log_queue:
self.run_worker(self.poll_log_updates(), exclusive=False)
# Refresh the table self.query_one("#llm-input", Input).focus()
table = self.query_one("#update-table", DataTable)
# Textual DataTable doesn't have a simple 'update_row', so we clear and refill
# or we can use update_cell.
# Update the table row def add_update_to_table(
self, update: Union[LoreUpdate, CharacterStateUpdate], index: int
) -> None:
table = self.query_one("#pending-facts-table", DataTable)
if isinstance(update, LoreUpdate): if isinstance(update, LoreUpdate):
table.update_cell(self.selected_index, 2, update.content) table.add_row(
update.category, update.entity_name or "General", update.content, key=str(index)
)
elif isinstance(update, CharacterStateUpdate): elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}" change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added: if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}" change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed: if update.status_effects_removed:
change_text += f", Removed: {', '.join(update.status_effects_removed)}" change_text += f", Removed: {', '.join(update.status_effects_removed)}"
table.update_cell(self.selected_index, 2, change_text) table.add_row("Char", update.character_name, change_text, key=str(index))
self.show_edit_mode(update) # just to refresh the value maybe? No, async def poll_llm_updates(self) -> None:
# Actually let's go back to detail view while True:
self.query_one("#edit-container", Vertical).styles.display = "none" try:
self.query_one("#details-container", Vertical).styles.display = "block" update = await self.llm_to_ui_queue.get()
if isinstance(update, ExtractionResult):
self.handle_proposal_result(update)
elif isinstance(update, ContextUpdate):
display_text = f"Query: {update.query}\nSource: {update.source}\n\n{update.snippet}"
context_list = self.query_one("#context-pane", ListView)
# Insert at the top to show most recent first.
await context_list.insert(0, [ListItem(Static(display_text))])
if hasattr(self.llm_to_ui_queue, "task_done"):
self.llm_to_ui_queue.task_done()
except Exception as e:
self.log(f"Error polling LLM updates: {e}")
async def poll_log_updates(self) -> None:
while True:
try:
log_text = await self.log_queue.get()
log_list = self.query_one("#log-pane", ListView)
# See poll_llm_updates: wrap the ListItem in a list.
# Insert at the top to show most recent first.
await log_list.insert(0, [ListItem(Static(log_text))])
if hasattr(self.log_queue, "task_done"):
self.log_queue.task_done()
except Exception as e:
self.log(f"Error polling log updates: {e}")
def handle_proposal_result(self, result: ExtractionResult) -> None:
table = self.query_one("#pending-facts-table", DataTable)
for update in result.lore_updates + result.character_updates:
index = len(self.pending_updates)
self.pending_updates.append(update)
self.add_update_to_table(update, index)
async def poll_context_queue(self) -> None:
# Obsolete
pass
async def poll_response_queue(self) -> None:
# Obsolete
pass
def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input.id == "llm-input":
self.action_send()
def action_send(self) -> None:
input_widget = self.query_one("#llm-input", Input)
text = input_widget.value
if text and self.ui_to_llm_queue:
self.ui_to_llm_queue.put_nowait(text)
input_widget.value = ""
async def action_accept(self) -> None:
table = self.query_one("#pending-facts-table", DataTable)
row_index = table.cursor_row
if row_index < 0 or row_index >= len(self.pending_updates):
return
update = self.pending_updates[row_index]
if self.ui_to_llm_queue:
self.ui_to_llm_queue.put_nowait(update)
self.remove_update(row_index)
def action_reject(self) -> None:
table = self.query_one("#pending-facts-table", DataTable)
row_index = table.cursor_row
if row_index < 0 or row_index >= len(self.pending_updates):
return
self.remove_update(row_index)
def action_edit(self) -> None:
table = self.query_one("#pending-facts-table", DataTable)
row_index = table.cursor_row
if row_index < 0 or row_index >= len(self.pending_updates):
return
update = self.pending_updates[row_index]
initial_text = ""
initial_type = ""
initial_target = ""
# Update details text
details_text = self.query_one("#details-text", Static)
if isinstance(update, LoreUpdate): if isinstance(update, LoreUpdate):
details_text.update( initial_text = update.content
f"Category: {update.category}\nTarget: {update.entity_name}\nContent: {update.content}" initial_type = update.category
) initial_target = update.entity_name or ""
elif isinstance(update, CharacterStateUpdate): elif isinstance(update, CharacterStateUpdate):
details_text.update( initial_text = str(update.hp_change or 0)
f"Character: {update.character_name}\nHP Change: {update.hp_change}\nAdded Effects: {update.status_effects_added}\nRemoved Effects: {update.status_effects_removed}" initial_type = "Char"
) initial_target = update.character_name
def save_callback(new_text: str, new_type: str, new_target: str):
if isinstance(update, LoreUpdate):
update.content = new_text
update.category = new_type
update.entity_name = new_target if new_target else None
elif isinstance(update, CharacterStateUpdate):
try:
update.hp_change = int(new_text)
except ValueError:
pass
update.character_name = new_target
# Update the table
self.refresh_table()
self.push_screen(EditModal(initial_text, initial_type, initial_target, save_callback))
def remove_update(self, index: int) -> None: def remove_update(self, index: int) -> None:
# Remove from the pending list
del self.pending_updates[index] del self.pending_updates[index]
self.refresh_table()
# Clear and refill the table def refresh_table(self) -> None:
table = self.query_one("#update-table", DataTable) table = self.query_one("#pending-facts-table", DataTable)
table.clear() table.clear()
for i, update in enumerate(self.pending_updates): for i, update in enumerate(self.pending_updates):
if isinstance(update, LoreUpdate): self.add_update_to_table(update, i)
table.add_row(
"Lore", update.entity_name or "General", update.content, key=str(i)
)
elif isinstance(update, CharacterStateUpdate):
change_text = f"HP: {update.hp_change or 0}"
if update.status_effects_added:
change_text += f", Added: {', '.join(update.status_effects_added)}"
if update.status_effects_removed:
change_text += (
f", Removed: {', '.join(update.status_effects_removed)}"
)
table.add_row("Char", update.character_name, change_text, key=str(i))
if self.pending_updates:
self.handle_row_highlight(0)
self.query_one("#btn-accept", Button).focus()
else:
self.selected_index = -1
self.query_one("#details-text", Static).update("All updates processed.")
+98
View File
@@ -0,0 +1,98 @@
import os
from reportlab.pdfgen import canvas
from src.rag.manager import RAGManager
def create_dummy_phb(pdf_path: str):
"""
Creates a dummy PDF file to simulate the Player's Handbook for verification.
"""
print(f"Creating dummy PHB at {pdf_path}...")
c = canvas.Canvas(pdf_path)
# Page 1: Fireball
c.drawString(100, 750, "Fireball")
c.drawString(
100,
730,
"A bright streak flashes from your pointing finger to a point you choose within range.",
)
c.drawString(
100,
710,
"Each creature in a 20-foot-radius sphere centered on that point must make a Dexterity saving throw.",
)
c.showPage()
# Page 2: Grappling
c.drawString(100, 750, "Grappling")
c.drawString(
100,
730,
"Grappling is a special option available to any attack that hits with a melee attack.",
)
c.drawString(
100,
710,
"A creature is grappled if it is the target of the grapple attack and the attack hits.",
)
c.showPage()
# Page 3: General Rules
c.drawString(100, 750, "General Rules")
c.drawString(
100,
730,
"Combat is resolved in rounds, and each round represents 6 seconds of in-game time.",
)
c.showPage()
c.save()
print(f"Dummy PHB created successfully at {pdf_path}")
def test_rag():
pdf_path = "data/phb_dummy.pdf"
create_dummy_phb(pdf_path)
# Initialize RAG Manager
rag = RAGManager(persist_dir="data/rag_index_test")
# Task 2.2: Ingest PDF
print("\nTesting Ingestion...")
rag.ingest_pdf(pdf_path)
# Task 2.3: Retrieve Logic
print("\nTesting Retrieval...")
# Test 1: Fireball
query1 = "What is Fireball?"
results1 = rag.retrieve(query1)
print(f"Query: {query1}")
for res in results1:
print(f"Source: {res.source} | Snippet: {res.snippet[:100]}...")
assert len(results1) > 0, "Should have retrieved at least one result for 'Fireball'"
assert "Fireball" in results1[0].snippet, "The top result should contain 'Fireball'"
# Test 2: Grappling
query2 = "How does grappling work?"
results2 = rag.retrieve(query2)
print(f"Query: {query2}")
for res in results2:
print(f"Source: {res.source} | Snippet: {res.snippet[:100]}...")
assert len(results2) > 0, (
"Should have retrieved at least one result for 'Grappling'"
)
assert "Grappling" in results2[0].snippet, (
"The top result should contain 'Grappling'"
)
print("\n✅ RAG Verification Successful!")
if __name__ == "__main__":
test_rag()