From 1b2c7db92493190eca78d6aa649a9257b1a0c468 Mon Sep 17 00:00:00 2001 From: Steve W Date: Thu, 9 Apr 2026 18:18:35 +0000 Subject: [PATCH] Refocus classifier on rich extraction and local dedupe only --- README.md | 61 ++++++++-------- app/classifier.py | 4 +- app/dedupe_store.py | 94 ++++++++++++------------ app/models.py | 16 ++-- app/sync.py | 173 ++++++++++++-------------------------------- app/todoist.py | 48 ------------ pyproject.toml | 1 - 7 files changed, 130 insertions(+), 267 deletions(-) delete mode 100644 app/todoist.py diff --git a/README.md b/README.md index 34a722c..353c9da 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # email-classifier -FastAPI service that classifies email using a configurable LLM backend, enriches the output for human review, and can upsert Todoist tasks without creating duplicates. +FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using fingerprint-based dedupe. ## Environment configuration @@ -25,11 +25,9 @@ export LLM_API_KEY=your_minimax_key export LLM_MODEL=MiniMax-M2.7 ``` -Optional Todoist sync: +Optional local dedupe store path: ```bash -export TODOIST_API_KEY=your_todoist_token -export TODOIST_PROJECT_ID=optional_project_id export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ``` @@ -37,9 +35,9 @@ export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ### POST /classify -Backward-compatible top-level response fields are preserved. +This overhaul is intended to return richer extraction. Top-level compatibility is not required. -Optional request metadata for dedupe and richer sync: +Request example: ```json { @@ -47,8 +45,6 @@ Optional request metadata for dedupe and richer sync: "subject": "Can you review this by Friday?", "body": "Hi Daniel, please review the attached budget proposal." }, - "message_id": "", - "thread_id": "thread-789", "from_address": "sender@example.com", "received_at": "2026-04-09T12:55:00Z", "provider": "anthropic", @@ -57,7 +53,7 @@ Optional request metadata for dedupe and richer sync: } ``` -Response now includes optional enrichment and Todoist sync info: +Response example: ```json { @@ -80,42 +76,43 @@ Response now includes optional enrichment and Todoist sync info: "source_signals": ["request", "deadline"], "dedupe_key": "..." }, - "todoist": { - "status": "created", - "task_id": "1234567890", - "comment_added": false, - "dedupe_match": "none", - "message": null + "dedupe": { + "status": "new", + "seen_count": 1, + "matched_on": "none", + "subject_key": "...", + "fingerprint": "..." } } ``` ## Dedupe behavior -When Todoist sync is enabled and `needs_action=true`: -- first match by `message_id` -- then by `thread_id` -- then by normalized content fingerprint fallback +The API does not create or update Todoist tasks. +It only returns richer extraction and local dedupe metadata for downstream automation like n8n. -Behavior: -- no existing task: create Todoist task -- existing task, same classification: do not duplicate, mark `unchanged` -- existing task, changed classification/context: update task in place -- add a Todoist comment only when material context changed +Matching strategy: +- normalized subject plus sender-derived `subject_key` +- full content fingerprint fallback based on sender + normalized subject + cleaned body + +Statuses: +- `new`: no prior similar email seen +- `duplicate`: same dedupe target and same extracted result as before +- `updated`: matched prior email, but extracted result changed + +This is intentionally heuristic, not perfect. ## Architecture -- `app/classifier.py`: classification orchestration and Todoist sync handoff +- `app/classifier.py`: classification orchestration and dedupe handoff - `app/prompts.py`: richer extraction prompt -- `app/sync.py`: dedupe, task rendering, Todoist upsert logic -- `app/dedupe_store.py`: SQLite-backed mapping store -- `app/todoist.py`: Todoist REST client +- `app/sync.py`: subject normalization, fingerprinting, dedupe application +- `app/dedupe_store.py`: SQLite-backed dedupe store - `app/llm_adapters.py`: provider adapters - `app/config.py`: LLM settings ## Notes -- `/classify` remains backward compatible at the top level. -- New request metadata fields are optional. -- Todoist sync safely no-ops when `TODOIST_API_KEY` is not configured. -- SQLite is used for lightweight production-safe dedupe tracking. +- No Todoist integration lives in this API. +- Dedupe is best-effort and designed to help downstream workflows avoid obvious duplicates. +- SQLite is used for lightweight local dedupe tracking. diff --git a/app/classifier.py b/app/classifier.py index d603ef7..39503d1 100644 --- a/app/classifier.py +++ b/app/classifier.py @@ -6,7 +6,7 @@ from typing import Any from app.config import get_request_settings from app.llm_adapters import build_adapter, coerce_json_text from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData -from app.sync import build_fingerprint, sync_todoist +from app.sync import apply_dedupe, build_fingerprint VALID_CATEGORIES = { "action_required", @@ -57,7 +57,7 @@ async def classify_email(request: ClassifyRequest) -> ClassificationResult: details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)), ) - result.todoist = await sync_todoist(clean_email, result) + result.dedupe = apply_dedupe(clean_email, result) return result diff --git a/app/dedupe_store.py b/app/dedupe_store.py index 32cb279..a94f121 100644 --- a/app/dedupe_store.py +++ b/app/dedupe_store.py @@ -21,86 +21,82 @@ class DedupeStore: with self._connect() as conn: conn.execute( """ - CREATE TABLE IF NOT EXISTS todoist_sync ( + CREATE TABLE IF NOT EXISTS classification_dedupe ( id INTEGER PRIMARY KEY AUTOINCREMENT, - message_id TEXT, - thread_id TEXT, + subject_key TEXT NOT NULL, fingerprint TEXT NOT NULL, - todoist_task_id TEXT NOT NULL, - classification_hash TEXT NOT NULL, - source_payload TEXT NOT NULL, - last_result TEXT NOT NULL, + result_hash TEXT NOT NULL, + request_payload TEXT NOT NULL, + result_payload TEXT NOT NULL, + seen_count INTEGER NOT NULL DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """ ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_message_id ON todoist_sync(message_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_thread_id ON todoist_sync(thread_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_fingerprint ON todoist_sync(fingerprint)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_subject_key ON classification_dedupe(subject_key)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)") - def find_existing(self, *, message_id: str | None, thread_id: str | None, fingerprint: str) -> dict[str, Any] | None: - queries = [] - if message_id: - queries.append(("SELECT * FROM todoist_sync WHERE message_id = ? ORDER BY id DESC LIMIT 1", (message_id,))) - if thread_id: - queries.append(("SELECT * FROM todoist_sync WHERE thread_id = ? ORDER BY id DESC LIMIT 1", (thread_id,))) - queries.append(("SELECT * FROM todoist_sync WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", (fingerprint,))) + def find_existing(self, *, subject_key: str, fingerprint: str) -> dict[str, Any] | None: with self._connect() as conn: - for sql, params in queries: - row = conn.execute(sql, params).fetchone() - if row: - data = dict(row) - data["source_payload"] = json.loads(data["source_payload"]) - data["last_result"] = json.loads(data["last_result"]) - return data - return None + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", + (fingerprint,), + ).fetchone() + if row is None: + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE subject_key = ? ORDER BY id DESC LIMIT 1", + (subject_key,), + ).fetchone() + if not row: + return None + data = dict(row) + data["request_payload"] = json.loads(data["request_payload"]) + data["result_payload"] = json.loads(data["result_payload"]) + return data - def upsert( + def insert_or_update( self, *, existing_id: int | None, - message_id: str | None, - thread_id: str | None, + subject_key: str, fingerprint: str, - todoist_task_id: str, - classification_hash: str, - source_payload: dict[str, Any], - last_result: dict[str, Any], + result_hash: str, + request_payload: dict[str, Any], + result_payload: dict[str, Any], + seen_count: int, ) -> None: with self._connect() as conn: if existing_id is None: conn.execute( """ - INSERT INTO todoist_sync (message_id, thread_id, fingerprint, todoist_task_id, classification_hash, source_payload, last_result) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO classification_dedupe (subject_key, fingerprint, result_hash, request_payload, result_payload, seen_count) + VALUES (?, ?, ?, ?, ?, ?) """, ( - message_id, - thread_id, + subject_key, fingerprint, - todoist_task_id, - classification_hash, - json.dumps(source_payload, sort_keys=True), - json.dumps(last_result, sort_keys=True), + result_hash, + json.dumps(request_payload, sort_keys=True), + json.dumps(result_payload, sort_keys=True), + seen_count, ), ) else: conn.execute( """ - UPDATE todoist_sync - SET message_id = ?, thread_id = ?, fingerprint = ?, todoist_task_id = ?, classification_hash = ?, - source_payload = ?, last_result = ?, updated_at = CURRENT_TIMESTAMP + UPDATE classification_dedupe + SET subject_key = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, + seen_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( - message_id, - thread_id, + subject_key, fingerprint, - todoist_task_id, - classification_hash, - json.dumps(source_payload, sort_keys=True), - json.dumps(last_result, sort_keys=True), + result_hash, + json.dumps(request_payload, sort_keys=True), + json.dumps(result_payload, sort_keys=True), + seen_count, existing_id, ), ) diff --git a/app/models.py b/app/models.py index 39a284f..9badd5f 100644 --- a/app/models.py +++ b/app/models.py @@ -17,8 +17,6 @@ class ClassifyRequest(BaseModel): base_url: str | None = None api_key: str | None = Field(default=None, exclude=True) temperature: float | None = None - message_id: str | None = None - thread_id: str | None = None from_address: str | None = None received_at: str | None = None @@ -37,12 +35,12 @@ class ClassificationDetails(BaseModel): dedupe_key: str | None = None -class TodoistSyncResult(BaseModel): - status: Literal["created", "updated", "unchanged", "disabled", "skipped", "error"] - task_id: str | None = None - comment_added: bool = False - dedupe_match: Literal["message_id", "thread_id", "fingerprint", "none"] = "none" - message: str | None = None +class DedupeResult(BaseModel): + status: Literal["new", "duplicate", "updated"] + seen_count: int = 1 + matched_on: Literal["none", "subject", "fingerprint"] = "none" + subject_key: str + fingerprint: str class ClassificationResult(BaseModel): @@ -53,4 +51,4 @@ class ClassificationResult(BaseModel): reasoning: str confidence: float details: ClassificationDetails | None = None - todoist: TodoistSyncResult | None = None + dedupe: DedupeResult | None = None diff --git a/app/sync.py b/app/sync.py index fadba7b..2f5e29f 100644 --- a/app/sync.py +++ b/app/sync.py @@ -3,152 +3,73 @@ from __future__ import annotations import hashlib import json import os -from typing import Any +import re from app.dedupe_store import DedupeStore -from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, TodoistSyncResult -from app.todoist import TodoistClient +from app.models import ClassificationResult, ClassifyRequest, DedupeResult + + +def normalize_subject(subject: str) -> str: + value = subject.strip().lower() + value = re.sub(r"^(re|fw|fwd)\s*:\s*", "", value) + value = re.sub(r"\s+", " ", value) + return value + + +def build_subject_key(request: ClassifyRequest) -> str: + subject = normalize_subject(request.email_data.subject) + sender = (request.from_address or "").strip().lower() + return hashlib.sha256(f"{sender}\n{subject}".encode()).hexdigest() def build_fingerprint(request: ClassifyRequest) -> str: - subject = request.email_data.subject.strip().lower() + subject = normalize_subject(request.email_data.subject) body = " ".join(request.email_data.body.split()).strip().lower() - seed = f"{request.from_address or ''}\n{subject}\n{body}" + seed = f"{request.from_address or ''}\n{subject}\n{body[:2000]}" return hashlib.sha256(seed.encode()).hexdigest() -def build_classification_hash(result: ClassificationResult) -> str: - payload = result.model_dump(exclude={"todoist"}, exclude_none=True) +def build_result_hash(result: ClassificationResult) -> str: + payload = result.model_dump(exclude={"dedupe"}, exclude_none=True) return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest() -def render_task_content(result: ClassificationResult) -> str: - details = result.details or ClassificationDetails() - return details.suggested_title or result.task_description or details.summary or "Email follow-up" - - -def render_task_description(request: ClassifyRequest, result: ClassificationResult) -> str: - details = result.details or ClassificationDetails() - sections: list[str] = [] - if details.summary: - sections.append(f"Summary:\n{details.summary}") - if result.task_description: - sections.append(f"Action:\n{result.task_description}") - if details.suggested_notes: - sections.append(f"Notes:\n{details.suggested_notes}") - if details.deadline: - sections.append(f"Deadline:\n{details.deadline}") - if details.people: - sections.append("People:\n- " + "\n- ".join(details.people)) - if details.organizations: - sections.append("Organizations:\n- " + "\n- ".join(details.organizations)) - if details.attachments_referenced: - sections.append("Attachments referenced:\n- " + "\n- ".join(details.attachments_referenced)) - if details.next_steps: - sections.append("Next steps:\n- " + "\n- ".join(details.next_steps)) - if details.key_points: - sections.append("Key points:\n- " + "\n- ".join(details.key_points)) - metadata = [] - if request.message_id: - metadata.append(f"message_id: {request.message_id}") - if request.thread_id: - metadata.append(f"thread_id: {request.thread_id}") - if request.from_address: - metadata.append(f"from: {request.from_address}") - if request.received_at: - metadata.append(f"received_at: {request.received_at}") - if metadata: - sections.append("Source metadata:\n" + "\n".join(metadata)) - return "\n\n".join(sections).strip() - - -async def sync_todoist(request: ClassifyRequest, result: ClassificationResult) -> TodoistSyncResult: - if not result.needs_action: - return TodoistSyncResult(status="skipped", message="No action required.") - client = TodoistClient() - if not client.enabled: - return TodoistSyncResult(status="disabled", message="Todoist is not configured.") - +def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult: store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db")) + subject_key = build_subject_key(request) fingerprint = build_fingerprint(request) - existing = store.find_existing(message_id=request.message_id, thread_id=request.thread_id, fingerprint=fingerprint) - dedupe_match = "none" - if existing: - if request.message_id and existing.get("message_id") == request.message_id: - dedupe_match = "message_id" - elif request.thread_id and existing.get("thread_id") == request.thread_id: - dedupe_match = "thread_id" - else: - dedupe_match = "fingerprint" - - content = render_task_content(result) - description = render_task_description(request, result) - classification_hash = build_classification_hash(result) + result_hash = build_result_hash(result) + existing = store.find_existing(subject_key=subject_key, fingerprint=fingerprint) if not existing: - created = await client.create_task(content=content, description=description, due_string=(result.details.deadline if result.details else None)) - task_id = str(created.get("id")) - store.upsert( + store.insert_or_update( existing_id=None, - message_id=request.message_id, - thread_id=request.thread_id, + subject_key=subject_key, fingerprint=fingerprint, - todoist_task_id=task_id, - classification_hash=classification_hash, - source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), - last_result=result.model_dump(exclude_none=True), + result_hash=result_hash, + request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True), + seen_count=1, ) - return TodoistSyncResult(status="created", task_id=task_id, dedupe_match=dedupe_match) + return DedupeResult(status="new", seen_count=1, matched_on="none", subject_key=subject_key, fingerprint=fingerprint) - task_id = str(existing["todoist_task_id"]) - if existing.get("classification_hash") == classification_hash: - store.upsert( - existing_id=existing["id"], - message_id=request.message_id, - thread_id=request.thread_id, - fingerprint=fingerprint, - todoist_task_id=task_id, - classification_hash=classification_hash, - source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), - last_result=result.model_dump(exclude_none=True), - ) - return TodoistSyncResult(status="unchanged", task_id=task_id, dedupe_match=dedupe_match, message="Existing task already reflects this classification.") - - await client.update_task(task_id, content=content, description=description, due_string=(result.details.deadline if result.details else None)) - comment_added = False - previous_details = (existing.get("last_result") or {}).get("details") or {} - current_details = (result.details.model_dump(exclude_none=True) if result.details else {}) - if _material_context_changed(previous_details, current_details): - await client.add_comment(task_id, _build_update_comment(result)) - comment_added = True - - store.upsert( + matched_on = "fingerprint" if existing.get("fingerprint") == fingerprint else "subject" + previous_hash = existing.get("result_hash") + seen_count = int(existing.get("seen_count", 1)) + 1 + status = "duplicate" if previous_hash == result_hash else "updated" + store.insert_or_update( existing_id=existing["id"], - message_id=request.message_id, - thread_id=request.thread_id, + subject_key=subject_key, + fingerprint=fingerprint, + result_hash=result_hash, + request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True), + seen_count=seen_count, + ) + return DedupeResult( + status=status, + seen_count=seen_count, + matched_on=matched_on, + subject_key=subject_key, fingerprint=fingerprint, - todoist_task_id=task_id, - classification_hash=classification_hash, - source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), - last_result=result.model_dump(exclude_none=True), ) - return TodoistSyncResult(status="updated", task_id=task_id, comment_added=comment_added, dedupe_match=dedupe_match) - - -def _material_context_changed(previous: dict[str, Any], current: dict[str, Any]) -> bool: - keys = {"summary", "deadline", "attachments_referenced", "next_steps", "key_points", "people"} - return any(previous.get(k) != current.get(k) for k in keys) - - -def _build_update_comment(result: ClassificationResult) -> str: - details = result.details or ClassificationDetails() - parts = ["Email classifier update:"] - if details.summary: - parts.append(f"Summary: {details.summary}") - if details.deadline: - parts.append(f"Deadline: {details.deadline}") - if details.next_steps: - parts.append("Next steps: " + "; ".join(details.next_steps)) - if details.key_points: - parts.append("Key points: " + "; ".join(details.key_points[:4])) - return "\n".join(parts) diff --git a/app/todoist.py b/app/todoist.py deleted file mode 100644 index 8ca9257..0000000 --- a/app/todoist.py +++ /dev/null @@ -1,48 +0,0 @@ -from __future__ import annotations - -import os -from typing import Any - -import httpx - - -class TodoistClient: - def __init__(self, api_key: str | None = None, base_url: str | None = None): - self.api_key = api_key or os.getenv("TODOIST_API_KEY") - self.base_url = (base_url or os.getenv("TODOIST_API_BASE_URL") or "https://api.todoist.com/rest/v2").rstrip("/") - self.project_id = os.getenv("TODOIST_PROJECT_ID") - - @property - def enabled(self) -> bool: - return bool(self.api_key) - - def _headers(self) -> dict[str, str]: - if not self.api_key: - raise RuntimeError("TODOIST_API_KEY is not configured") - return {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} - - async def create_task(self, *, content: str, description: str, due_string: str | None = None) -> dict[str, Any]: - payload: dict[str, Any] = {"content": content, "description": description} - if self.project_id: - payload["project_id"] = self.project_id - if due_string: - payload["due_string"] = due_string - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(f"{self.base_url}/tasks", headers=self._headers(), json=payload) - response.raise_for_status() - return response.json() - - async def update_task(self, task_id: str, *, content: str, description: str, due_string: str | None = None) -> None: - payload: dict[str, Any] = {"content": content, "description": description} - if due_string: - payload["due_string"] = due_string - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(f"{self.base_url}/tasks/{task_id}", headers=self._headers(), json=payload) - response.raise_for_status() - - async def add_comment(self, task_id: str, content: str) -> dict[str, Any]: - payload = {"task_id": task_id, "content": content} - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(f"{self.base_url}/comments", headers=self._headers(), json=payload) - response.raise_for_status() - return response.json() diff --git a/pyproject.toml b/pyproject.toml index d80affe..17c2c0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ dependencies = [ "anthropic>=0.57.1", "beautifulsoup4>=4.14.3", "fastapi>=0.128.0", - "httpx>=0.28.1", "openai>=2.16.0", "uvicorn>=0.40.0", ]