diff --git a/ObsidianRAGPipe.py b/ObsidianRAGPipe.py index 6c32493..8770f2c 100644 --- a/ObsidianRAGPipe.py +++ b/ObsidianRAGPipe.py @@ -1,12 +1,14 @@ """ title: Obsidian RAG Pipeline author: Daniel Henry -version: 0.15 +version: 0.17 +description: Updated for llama-swap with llama.cpp (OpenAI-compatible API) """ import asyncio import json import time +import math import urllib.parse from typing import AsyncGenerator @@ -18,9 +20,8 @@ class Pipe: class Valves(BaseModel): # Endpoints - ollama_url: str = Field(default="http://ollama.internal.henryhosted.com:11434") + llamacpp_url: str = Field(default="http://ollama.internal.henryhosted.com:9292") qdrant_url: str = Field(default="http://app-01.internal.henryhosted.com:6333") - rerank_url: str = Field(default="http://ollama.internal.henryhosted.com:7997") # Qdrant collection_name: str = Field(default="obsidian_vault") @@ -35,6 +36,16 @@ class Pipe: rerank_enabled: bool = Field( default=True, description="Set to False to skip reranking" ) + rerank_logit: bool = Field( + default=False, description="Enable if reranker outputs logits" + ) + rerank_debug: bool = Field( + default=False, description="Output all rerank values into think" + ) + rerank_model: str = Field( + default="bge-reranker-v2-m3-q8_0", + description="Reranker model name", + ) rerank_timeout: float = Field(default=60.0) min_rerank_score: float = Field( default=0.01, description="Minimum rerank score to keep" @@ -44,9 +55,17 @@ class Pipe: ) # LLM - embedding_model: str = Field(default="nomic-embed-text:latest") - llm_model: str = Field(default="llama3.2:3b") - llm_context_size: int = Field(default=8192) + embedding_model: str = Field( + default="nomic-embed-text-v1.5.f16", + description="Embedding model name", + ) + llm_model: str = Field( + default="qwen2.5-3b-instruct-q4_k_m", + description="LLM model name", + ) + llm_max_tokens: int = Field( + default=2048, description="Max tokens for LLM response" + ) llm_timeout: float = Field(default=300.0) query_rewrite_model: str = Field( default="", @@ -69,6 +88,10 @@ class Pipe: def __init__(self): self.valves = self.Valves() + def _estimate_tokens(self, text: str) -> int: + """Rough token estimate: ~4 chars per token for English text.""" + return len(text) // 4 + async def pipe(self, body: dict) -> AsyncGenerator[str, None]: messages = body.get("messages", []) if not messages: @@ -92,11 +115,12 @@ class Pipe: ) -> AsyncGenerator[str, None]: think = self.valves.show_thinking + total_prompt_tokens = 0 - # Start thinking block immediately + # Start thinking block if think: yield "\n" - yield f"**LLM Model:** {self.valves.llm_model}\n\n" + yield f"**LLM Model:** {self.valves.llm_model}\n" yield f"**Query:** {query}\n\n" # ───────────────────────────────────────────── @@ -107,22 +131,19 @@ class Pipe: t0 = time.time() rewrite_model = self.valves.query_rewrite_model or self.valves.llm_model + current_question = messages[-1].get("content", "") - # Build conversation context for rewriting + # Build conversation context for rewriting (only if there's prior conversation) conversation_for_rewrite = [] - for m in messages[:-1]: # All messages except the last one + for m in messages[:-1]: role = m.get("role", "") content = m.get("content", "") if role == "user": conversation_for_rewrite.append(f"User: {content}") elif role == "assistant": - # Truncate assistant responses to avoid bloat truncated = content[:500] + "..." if len(content) > 500 else content conversation_for_rewrite.append(f"Assistant: {truncated}") - current_question = messages[-1].get("content", "") - - # If there's prior conversation, rewrite the query if conversation_for_rewrite: rewrite_prompt = f"""Do not interpret or answer the question. Simply add enough context from the conversation so the question makes sense on its own. @@ -135,18 +156,23 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, try: async with session.post( - f"{self.valves.ollama_url}/api/generate", + f"{self.valves.llamacpp_url}/v1/chat/completions", json={ "model": rewrite_model, - "prompt": rewrite_prompt, + "messages": [{"role": "user", "content": rewrite_prompt}], "stream": False, + "max_tokens": 256, }, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status == 200: data = await resp.json() - rewritten = data.get("response", "").strip() - # Sanity check - if rewrite is empty or way too long, use original + rewritten = ( + data.get("choices", [{}])[0] + .get("message", {}) + .get("content", "") + .strip() + ) if rewritten and len(rewritten) < 1000: search_query = rewritten else: @@ -158,7 +184,6 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, yield f" ⚠ Rewrite failed: {e}, using original query\n" search_query = current_question else: - # No prior conversation, use the question as-is search_query = current_question if think: @@ -176,20 +201,26 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, try: async with session.post( - f"{self.valves.ollama_url}/api/embeddings", + f"{self.valves.llamacpp_url}/v1/embeddings", json={ "model": self.valves.embedding_model, - "prompt": search_query, - "options": {"num_ctx": 8192}, + "input": search_query, }, - timeout=aiohttp.ClientTimeout(total=15), + timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status != 200: + error_text = await resp.text() if think: - yield f" ✗ HTTP {resp.status}\n\n\n" + yield f" ✗ HTTP {resp.status}: {error_text}\n\n\n" yield f"Embedding failed: HTTP {resp.status}" return - embedding = (await resp.json()).get("embedding") + data = await resp.json() + embedding = data.get("data", [{}])[0].get("embedding") + if not embedding: + if think: + yield " ✗ No embedding in response\n\n\n" + yield "Embedding failed: No embedding returned" + return except Exception as e: if think: yield f" ✗ {e}\n\n\n" @@ -238,7 +269,6 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, yield "No relevant notes found for this query." return - # Show top 5 if think: yield " Top 5:\n" for i, r in enumerate(qdrant_results[:5]): @@ -253,6 +283,7 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, if self.valves.rerank_enabled: if think: yield "**Step 4: Reranking**\n" + yield f"**Rerank Model:** {self.valves.rerank_model}\n" t0 = time.time() docs_for_rerank = [ @@ -261,26 +292,31 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, try: async with session.post( - f"{self.valves.rerank_url}/rerank", + f"{self.valves.llamacpp_url}/v1/rerank", json={ + "model": self.valves.rerank_model, "query": search_query, "documents": docs_for_rerank, - "return_documents": False, }, timeout=aiohttp.ClientTimeout(total=self.valves.rerank_timeout), ) as resp: if resp.status != 200: + error_text = await resp.text() if think: - yield f" ⚠ Reranker failed: HTTP {resp.status}, using Qdrant order\n\n" + yield f" ⚠ Reranker failed: HTTP {resp.status} - {error_text}, using Qdrant order\n\n" chunks = qdrant_results[: self.valves.final_top_k] else: - rerank_results = (await resp.json()).get("results", []) + rerank_data = await resp.json() + rerank_results = rerank_data.get("results", []) - # Apply rerank scores and filter scored = [] for item in rerank_results: idx = item["index"] score = item["relevance_score"] + if self.valves.rerank_logit: + score = 1 / (1 + math.exp(-item["relevance_score"])) + if think and self.valves.rerank_debug: + yield f" • Debug: Doc {idx} score: {score}\n" if score >= self.valves.min_rerank_score: chunk = qdrant_results[idx].copy() chunk["rerank_score"] = score @@ -291,7 +327,6 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, if think: yield f" ✓ Kept {len(chunks)} chunks ({time.time() - t0:.2f}s)\n" - if chunks: yield " Top 5 after rerank:\n" for i, c in enumerate(chunks[:5]): @@ -304,7 +339,6 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, if think: yield f" ⚠ Reranker error: {e}, using Qdrant order\n\n" chunks = qdrant_results[: self.valves.final_top_k] - else: if think: yield "**Step 4: Reranking** (disabled)\n\n" @@ -329,22 +363,20 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, content = payload.get("content", "").strip() source = payload.get("source", "") - part = f"### Note Name {i}: {file_name}\n" + # CHANGE: Explicit bracketed ID format + part = f"[{i}] File: {file_name}\n" if source: - part += f"Original source: {source}\n" + part += f"Source: {source}\n" part += f"\n{content}" context_parts.append(part) context = "\n\n---\n\n".join(context_parts) - context_chars = len(context) - estimated_tokens = context_chars // 4 + context_tokens = self._estimate_tokens(context) if think: - yield f" ✓ {len(chunks)} chunks, {context_chars:,} chars (~{estimated_tokens:,} tokens)\n" - - if estimated_tokens > self.valves.token_warning_threshold: - yield f" ⚠ Warning: approaching context limit ({self.valves.llm_context_size})\n" - + yield f" ✓ {len(chunks)} chunks, ~{context_tokens:,} tokens\n" + if context_tokens > self.valves.token_warning_threshold: + yield f" ⚠ Warning: large context may affect quality\n" yield "\n" # ───────────────────────────────────────────── @@ -354,91 +386,77 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, yield "**Step 6: Generate Response**\n" yield "\n\n" - system_prompt = f""" - ### ROLE - You are the user's "Knowledge Partner." You are warm, enthusiastic, and helpful. You love the user's notes and want to help them connect ideas. - - ### THE GOLDEN RULE (HARD WALL) - Your knowledge is strictly limited to the provided . - - IF the answer is in the notes: Synthesize it warmly and cite it. - - IF the answer is NOT in the notes: You must admit it. Say: "I checked your notes, but I couldn't find info on that." - - Be honest with the user. The user does not want blind support. You are a friendly research assistant not an overly supportive friend. - - **EXCEPTION:** ONLY if the user explicitly types the trigger phrase "System: Add Context" are you allowed to use outside knowledge. - - ### INSTRUCTIONS - 1. **Search First:** Look through the to find the answer. - 2. **Synthesize:** You may combine facts from different notes to build a complete answer. - 3. **Cite Everything:** Every single statement of fact must end with a citation in this format: `[Note Name]`. - 4. **Tone:** Be conversational but professional. Avoid robotic phrases like "According to the provided text." Instead, say "Your note on [Topic] mentions..." - 5. **Additional:** Avoid asking follow up questions at the end of your output. - - ### EXAMPLES (Follow this pattern) - - **User:** "What did I write about the project deadline?" - **You:** "I looked through your project logs! It seems you set the final submission date for October 15th [Project_Alpha_Log]. You also noted that the design phase needs to wrap up by the 1st [Design_Team_Meeting]." - - **User:** "Who is the president of France?" (Note: This is NOT in your notes) - **You:** "I checked your notes, but I don't see any mention of the current president of France. Would you like me to use outside knowledge? If so, just say 'System: Add Context'." - - ### SOURCE NOTES - - {context} - - """ + system_prompt = f"""You are a helpful assistant. Use the provided notes to answer the user's question. - # Only keep user/assistant messages - conversation = [m for m in messages if m.get("role") in ("user", "assistant")] +RULES: +1. Use the as your source of truth. +2. Cite facts using the bracketed ID number [1]. +3. SYNTHESIS: You are encouraged to draw connections between different notes to form a complete answer. +4. INFERENCE: If the answer is not explicitly written but can be logically inferred from the notes, you may answer, but please use phrases like "The notes imply..." or "Based on [1], it suggests..." +5. If the answer is completely absent, say "I couldn't find that in your notes." - # UPDATED: Robustly strip previous "Sources" to prevent pattern matching + +{context} +""" + + # Build conversation, stripping previous sources from assistant messages conversation = [] for m in messages: - if m.get("role") not in ("user", "assistant"): + role = m.get("role") + if role not in ("user", "assistant"): continue - - msg = m.copy() - if msg["role"] == "assistant": - content = msg.get("content", "") - # Split on "**Sources:**" which is the visible header. - # This catches it even if the newlines/separators are slightly different. - if "**Sources:**" in content: - msg["content"] = content.split("**Sources:**")[0].strip() - + msg = {"role": role, "content": m.get("content", "")} + if role == "assistant" and "**Sources:**" in msg["content"]: + msg["content"] = msg["content"].split("**Sources:**")[0].strip() conversation.append(msg) + llm_messages = [{"role": "system", "content": system_prompt}] + conversation + + # Estimate prompt tokens + prompt_text = system_prompt + "".join(m["content"] for m in conversation) + total_prompt_tokens = self._estimate_tokens(prompt_text) + llm_payload = { "model": self.valves.llm_model, - "messages": [ - {"role": "system", "content": system_prompt}, - *conversation, - ], + "messages": llm_messages, "stream": True, - "options": {"num_ctx": self.valves.llm_context_size}, + "max_tokens": self.valves.llm_max_tokens, } - # Stream LLM response - prompt_tokens = 0 completion_tokens = 0 + completion_text = "" try: async with session.post( - f"{self.valves.ollama_url}/api/chat", + f"{self.valves.llamacpp_url}/v1/chat/completions", json=llm_payload, timeout=aiohttp.ClientTimeout(total=self.valves.llm_timeout), ) as resp: if resp.status != 200: - yield f"LLM error: HTTP {resp.status}" + error_text = await resp.text() + yield f"LLM error: HTTP {resp.status} - {error_text}" return async for line in resp.content: if not line: continue + + line_str = line.decode("utf-8").strip() + if not line_str or line_str.startswith(":"): + continue + + if line_str.startswith("data: "): + line_str = line_str[6:] + + if line_str == "[DONE]": + break + try: - data = json.loads(line) - if text := data.get("message", {}).get("content"): - yield text - if data.get("done"): - prompt_tokens = data.get("prompt_eval_count", 0) - completion_tokens = data.get("eval_count", 0) + data = json.loads(line_str) + delta = data.get("choices", [{}])[0].get("delta", {}) + if content := delta.get("content"): + yield content + completion_text += content except json.JSONDecodeError: continue @@ -449,32 +467,47 @@ Rewrite the question to be standalone (respond with ONLY the rewritten question, yield f"\n\nLLM error: {e}" return + # Estimate completion tokens + completion_tokens = self._estimate_tokens(completion_text) + # ───────────────────────────────────────────── # Sources # ───────────────────────────────────────────── if self.valves.show_sources: - # Dedupe by file path, count chunks + # We now track 'indices' list along with the count source_counts: dict[str, dict] = {} - for chunk in chunks: + + # 'chunks' is still available from Step 4/Step 3 + for i, chunk in enumerate(chunks, 1): payload = chunk.get("payload", {}) path = payload.get("filePath", "") name = payload.get("fileName", "Unknown") + if path in source_counts: source_counts[path]["count"] += 1 + source_counts[path]["indices"].append(i) else: - source_counts[path] = {"name": name, "path": path, "count": 1} + source_counts[path] = { + "name": name, + "path": path, + "count": 1, + "indices": [i], + } yield "\n\n---\n**Sources:**\n" for src in source_counts.values(): vault = urllib.parse.quote(self.valves.vault_name) path = urllib.parse.quote(src["path"]) uri = f"obsidian://open?vault={vault}&file={path}" - count_str = f" ({src['count']} chunks)" if src["count"] > 1 else "" - yield f"- [{src['name']}]({uri}){count_str}\n" + + # Format indices like: [1, 2, 5] + indices_str = ", ".join(map(str, src["indices"])) + + yield f"- [{src['name']}]({uri}) (Chunks: {indices_str})\n" # ───────────────────────────────────────────── # Stats # ───────────────────────────────────────────── if self.valves.show_stats: - yield f"\n*{prompt_tokens:,} in / {completion_tokens:,} out*" + yield f"\n*~{total_prompt_tokens:,} in / ~{completion_tokens:,} out (estimated)*"