From a1dcaf9a749c07cde0b5ed9e467ef4e2f8f9a2c2 Mon Sep 17 00:00:00 2001 From: Steve W Date: Thu, 9 Apr 2026 18:14:11 +0000 Subject: [PATCH 1/4] Add enriched classification output and Todoist dedupe sync --- README.md | 95 ++++++++++++++++++--------- app/classifier.py | 88 +++++++++++++++++++------ app/dedupe_store.py | 106 ++++++++++++++++++++++++++++++ app/models.py | 28 ++++++++ app/prompts.py | 82 +++++++++-------------- app/sync.py | 154 ++++++++++++++++++++++++++++++++++++++++++++ app/todoist.py | 48 ++++++++++++++ pyproject.toml | 1 + 8 files changed, 502 insertions(+), 100 deletions(-) create mode 100644 app/dedupe_store.py create mode 100644 app/sync.py create mode 100644 app/todoist.py diff --git a/README.md b/README.md index aff2d1c..34a722c 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,10 @@ # email-classifier -FastAPI service that classifies email using a configurable LLM backend. - -## What changed - -The classifier no longer hardcodes a single Ollama + OpenAI-compatible endpoint. -It now supports: -- OpenAI-compatible APIs -- Anthropic-compatible APIs -- per-request overrides for provider, model, endpoint, and temperature -- global defaults through environment variables - -This makes it suitable for local Ollama, hosted OpenAI-compatible services, and MiniMax's recommended Anthropic-compatible API. +FastAPI service that classifies email using a configurable LLM backend, enriches the output for human review, and can upsert Todoist tasks without creating duplicates. ## Environment configuration -Defaults are loaded from environment variables: +LLM defaults: ```bash export LLM_PROVIDER=openai @@ -27,9 +16,7 @@ export LLM_TIMEOUT_SECONDS=60 export LLM_MAX_RETRIES=3 ``` -### MiniMax example - -MiniMax recommends Anthropic-compatible integration. +MiniMax via Anthropic-compatible API: ```bash export LLM_PROVIDER=anthropic @@ -38,11 +25,21 @@ export LLM_API_KEY=your_minimax_key export LLM_MODEL=MiniMax-M2.7 ``` +Optional Todoist sync: + +```bash +export TODOIST_API_KEY=your_todoist_token +export TODOIST_PROJECT_ID=optional_project_id +export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db +``` + ## API ### POST /classify -Request body: +Backward-compatible top-level response fields are preserved. + +Optional request metadata for dedupe and richer sync: ```json { @@ -50,16 +47,17 @@ Request body: "subject": "Can you review this by Friday?", "body": "Hi Daniel, please review the attached budget proposal." }, + "message_id": "", + "thread_id": "thread-789", + "from_address": "sender@example.com", + "received_at": "2026-04-09T12:55:00Z", "provider": "anthropic", "base_url": "https://api.minimax.io/anthropic", - "model": "MiniMax-M2.7", - "temperature": 0.1 + "model": "MiniMax-M2.7" } ``` -All override fields are optional. If omitted, the service uses the global env config. - -Response shape: +Response now includes optional enrichment and Todoist sync info: ```json { @@ -68,21 +66,56 @@ Response shape: "priority": "high", "task_description": "Review the budget proposal and respond by Friday", "reasoning": "Direct request with a deadline requires follow-up", - "confidence": 0.91 + "confidence": 0.91, + "details": { + "summary": "Budget proposal review requested with Friday deadline.", + "suggested_title": "Review budget proposal and respond by Friday", + "suggested_notes": "Requester asked for feedback on attached budget proposal before Friday.", + "deadline": "Friday", + "people": ["Daniel"], + "organizations": [], + "attachments_referenced": ["budget proposal"], + "next_steps": ["Review attachment", "Reply with feedback"], + "key_points": ["Deadline is Friday"], + "source_signals": ["request", "deadline"], + "dedupe_key": "..." + }, + "todoist": { + "status": "created", + "task_id": "1234567890", + "comment_added": false, + "dedupe_match": "none", + "message": null + } } ``` +## Dedupe behavior + +When Todoist sync is enabled and `needs_action=true`: +- first match by `message_id` +- then by `thread_id` +- then by normalized content fingerprint fallback + +Behavior: +- no existing task: create Todoist task +- existing task, same classification: do not duplicate, mark `unchanged` +- existing task, changed classification/context: update task in place +- add a Todoist comment only when material context changed + ## Architecture -- `app/config.py`: global and per-request LLM settings +- `app/classifier.py`: classification orchestration and Todoist sync handoff +- `app/prompts.py`: richer extraction prompt +- `app/sync.py`: dedupe, task rendering, Todoist upsert logic +- `app/dedupe_store.py`: SQLite-backed mapping store +- `app/todoist.py`: Todoist REST client - `app/llm_adapters.py`: provider adapters -- `app/classifier.py`: classification orchestration, retries, normalization -- `app/prompts.py`: system prompt -- `app/routers/classify_email.py`: thin API route +- `app/config.py`: LLM settings ## Notes -- OpenAI-compatible providers use the OpenAI SDK. -- Anthropic-compatible providers use the Anthropic SDK. -- Per-request `api_key` is supported, but excluded from response serialization. -- The service normalizes malformed model output and falls back safely after retry exhaustion. +- `/classify` remains backward compatible at the top level. +- New request metadata fields are optional. +- Todoist sync safely no-ops when `TODOIST_API_KEY` is not configured. +- SQLite is used for lightweight production-safe dedupe tracking. diff --git a/app/classifier.py b/app/classifier.py index 4223d8d..d603ef7 100644 --- a/app/classifier.py +++ b/app/classifier.py @@ -5,7 +5,8 @@ from typing import Any from app.config import get_request_settings from app.llm_adapters import build_adapter, coerce_json_text -from app.models import ClassificationResult, ClassifyRequest, EmailData +from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData +from app.sync import build_fingerprint, sync_todoist VALID_CATEGORIES = { "action_required", @@ -21,7 +22,7 @@ VALID_PRIORITIES = {"high", "medium", "low"} async def classify_email(request: ClassifyRequest) -> ClassificationResult: - clean_email = _clean_email(request.email_data) + clean_email = _clean_email(request) settings = get_request_settings( provider=request.provider, model=request.model, @@ -32,40 +33,50 @@ async def classify_email(request: ClassifyRequest) -> ClassificationResult: adapter = build_adapter(settings) attempts = 0 + result: ClassificationResult | None = None while attempts < settings.max_retries: - raw_response = await adapter.classify(clean_email) + raw_response = await adapter.classify(clean_email.email_data) try: payload = json.loads(coerce_json_text(raw_response)) - result = _normalize_result(payload) + result = _normalize_result(payload, clean_email) if result.needs_action and not result.task_description: attempts += 1 continue - return result + break except (json.JSONDecodeError, ValueError, TypeError): attempts += 1 - return ClassificationResult( - needs_action=False, - category="uncategorized", - priority="low", - task_description=None, - reasoning="System failed to classify after multiple attempts.", - confidence=0.0, - ) + if result is None: + result = ClassificationResult( + needs_action=False, + category="uncategorized", + priority="low", + task_description=None, + reasoning="System failed to classify after multiple attempts.", + confidence=0.0, + details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)), + ) + + result.todoist = await sync_todoist(clean_email, result) + return result -def _clean_email(email: EmailData) -> EmailData: +def _clean_email(request: ClassifyRequest) -> ClassifyRequest: from app.helpers.clean_email_html import clean_email_html from app.helpers.extract_latest_message import extract_latest_message from app.helpers.remove_disclaimer import remove_disclaimer - return EmailData( - subject=email.subject, - body=remove_disclaimer(clean_email_html(extract_latest_message(email.body))), + return request.model_copy( + update={ + "email_data": EmailData( + subject=request.email_data.subject, + body=remove_disclaimer(clean_email_html(extract_latest_message(request.email_data.body))), + ) + } ) -def _normalize_result(data: dict[str, Any]) -> ClassificationResult: +def _normalize_result(data: dict[str, Any], request: ClassifyRequest) -> ClassificationResult: needs_action = bool(data.get("needs_action", False)) category = str(data.get("category", "uncategorized") or "uncategorized").lower() if category not in VALID_CATEGORIES: @@ -81,6 +92,24 @@ def _normalize_result(data: dict[str, Any]) -> ClassificationResult: reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided." confidence_raw = data.get("confidence", 0.0) confidence = max(0.0, min(1.0, float(confidence_raw))) + details_payload = data.get("details") or {} + details = ClassificationDetails( + summary=_clean_text(details_payload.get("summary")), + suggested_title=_clean_text(details_payload.get("suggested_title")), + suggested_notes=_clean_text(details_payload.get("suggested_notes")), + deadline=_clean_text(details_payload.get("deadline")), + people=_string_list(details_payload.get("people")), + organizations=_string_list(details_payload.get("organizations")), + attachments_referenced=_string_list(details_payload.get("attachments_referenced")), + next_steps=_string_list(details_payload.get("next_steps")), + key_points=_string_list(details_payload.get("key_points")), + source_signals=_string_list(details_payload.get("source_signals")), + dedupe_key=build_fingerprint(request), + ) + if needs_action and not details.suggested_title: + details.suggested_title = task_description + if not details.summary: + details.summary = reasoning return ClassificationResult( needs_action=needs_action, category=category, @@ -88,4 +117,27 @@ def _normalize_result(data: dict[str, Any]) -> ClassificationResult: task_description=task_description, reasoning=reasoning, confidence=confidence, + details=details, ) + + +def _clean_text(value: Any) -> str | None: + if value is None: + return None + text = str(value).strip() + return text or None + + +def _string_list(value: Any) -> list[str]: + if not value: + return [] + if isinstance(value, list): + items = value + else: + items = [value] + output = [] + for item in items: + text = str(item).strip() + if text and text not in output: + output.append(text) + return output diff --git a/app/dedupe_store.py b/app/dedupe_store.py new file mode 100644 index 0000000..32cb279 --- /dev/null +++ b/app/dedupe_store.py @@ -0,0 +1,106 @@ +from __future__ import annotations + +import json +import sqlite3 +from pathlib import Path +from typing import Any + + +class DedupeStore: + def __init__(self, db_path: str = ".data/email_classifier.db"): + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self._init_db() + + def _connect(self) -> sqlite3.Connection: + conn = sqlite3.connect(self.db_path) + conn.row_factory = sqlite3.Row + return conn + + def _init_db(self) -> None: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS todoist_sync ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + message_id TEXT, + thread_id TEXT, + fingerprint TEXT NOT NULL, + todoist_task_id TEXT NOT NULL, + classification_hash TEXT NOT NULL, + source_payload TEXT NOT NULL, + last_result TEXT NOT NULL, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ) + """ + ) + conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_message_id ON todoist_sync(message_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_thread_id ON todoist_sync(thread_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_fingerprint ON todoist_sync(fingerprint)") + + def find_existing(self, *, message_id: str | None, thread_id: str | None, fingerprint: str) -> dict[str, Any] | None: + queries = [] + if message_id: + queries.append(("SELECT * FROM todoist_sync WHERE message_id = ? ORDER BY id DESC LIMIT 1", (message_id,))) + if thread_id: + queries.append(("SELECT * FROM todoist_sync WHERE thread_id = ? ORDER BY id DESC LIMIT 1", (thread_id,))) + queries.append(("SELECT * FROM todoist_sync WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", (fingerprint,))) + with self._connect() as conn: + for sql, params in queries: + row = conn.execute(sql, params).fetchone() + if row: + data = dict(row) + data["source_payload"] = json.loads(data["source_payload"]) + data["last_result"] = json.loads(data["last_result"]) + return data + return None + + def upsert( + self, + *, + existing_id: int | None, + message_id: str | None, + thread_id: str | None, + fingerprint: str, + todoist_task_id: str, + classification_hash: str, + source_payload: dict[str, Any], + last_result: dict[str, Any], + ) -> None: + with self._connect() as conn: + if existing_id is None: + conn.execute( + """ + INSERT INTO todoist_sync (message_id, thread_id, fingerprint, todoist_task_id, classification_hash, source_payload, last_result) + VALUES (?, ?, ?, ?, ?, ?, ?) + """, + ( + message_id, + thread_id, + fingerprint, + todoist_task_id, + classification_hash, + json.dumps(source_payload, sort_keys=True), + json.dumps(last_result, sort_keys=True), + ), + ) + else: + conn.execute( + """ + UPDATE todoist_sync + SET message_id = ?, thread_id = ?, fingerprint = ?, todoist_task_id = ?, classification_hash = ?, + source_payload = ?, last_result = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """, + ( + message_id, + thread_id, + fingerprint, + todoist_task_id, + classification_hash, + json.dumps(source_payload, sort_keys=True), + json.dumps(last_result, sort_keys=True), + existing_id, + ), + ) diff --git a/app/models.py b/app/models.py index d0825eb..39a284f 100644 --- a/app/models.py +++ b/app/models.py @@ -17,6 +17,32 @@ class ClassifyRequest(BaseModel): base_url: str | None = None api_key: str | None = Field(default=None, exclude=True) temperature: float | None = None + message_id: str | None = None + thread_id: str | None = None + from_address: str | None = None + received_at: str | None = None + + +class ClassificationDetails(BaseModel): + summary: str | None = None + suggested_title: str | None = None + suggested_notes: str | None = None + deadline: str | None = None + people: list[str] = Field(default_factory=list) + organizations: list[str] = Field(default_factory=list) + attachments_referenced: list[str] = Field(default_factory=list) + next_steps: list[str] = Field(default_factory=list) + key_points: list[str] = Field(default_factory=list) + source_signals: list[str] = Field(default_factory=list) + dedupe_key: str | None = None + + +class TodoistSyncResult(BaseModel): + status: Literal["created", "updated", "unchanged", "disabled", "skipped", "error"] + task_id: str | None = None + comment_added: bool = False + dedupe_match: Literal["message_id", "thread_id", "fingerprint", "none"] = "none" + message: str | None = None class ClassificationResult(BaseModel): @@ -26,3 +52,5 @@ class ClassificationResult(BaseModel): task_description: str | None = None reasoning: str confidence: float + details: ClassificationDetails | None = None + todoist: TodoistSyncResult | None = None diff --git a/app/prompts.py b/app/prompts.py index 3cfcc95..8eb78f4 100644 --- a/app/prompts.py +++ b/app/prompts.py @@ -1,58 +1,38 @@ SYSTEM_PROMPT = """You are an email classification assistant. Your job is to analyze emails and determine if they need the user's attention and action. The user works in the I.T. department of the Grand Portage tribal government. +Return valid JSON only. + CLASSIFICATION RULES: +1. NEEDS ATTENTION if the email asks a direct question, requests action, contains a deadline, reports a relevant problem, proposes times needing confirmation, or is a relevant I.T. alert. +2. DOES NOT NEED ATTENTION if the email is marketing, newsletter, sales outreach, bulk/promotional, or simple acknowledgment with no response needed. +3. Scheduling questions and unresolved thread questions always need attention. -1. NEEDS ATTENTION (create todo) if the email: - - Asks a direct question that requires a response - - Contains scheduling questions like \"Does [day/time] work?\", \"Are you available?\", \"When can we meet?\" - - Requests the user to do something (review, approve, provide info, attend meeting) - - Contains a deadline or time-sensitive request - - Is from a colleague/client discussing active work - - Reports an issue or problem that needs addressing - - Proposes specific dates/times and needs confirmation - - Is an automated alert from a system relevant to I.T. - -2. DOES NOT NEED ATTENTION (skip) if the email: - - Is a newsletter, marketing email, or webinar invitation - - Is from a person and is an FYI/informational with no action required - - Is promotional content or sales outreach - - Contains unsubscribe links or bulk sender indicators - - Is a simple acknowledgment (\"got it\", \"thanks\", \"sounds good\") with no questions - -3. SPECIAL CASES: - - Even if an email says \"working on that\" or similar, if it ALSO contains a question or proposal that needs response, mark as needs_action=true - - \"Does [X] work?\" or \"When can you...?\" ALWAYS needs a response, regardless of other content - - RE: threads can still need action if they contain unanswered questions - -OUTPUT FORMAT: -You must respond with valid JSON only, no other text: +OUTPUT JSON SCHEMA: { - \"needs_action\": true or false, - \"category\": \"action_required\" | \"question\" | \"fyi\" | \"newsletter\" | \"promotional\" | \"automated\" | \"alert\" | \"uncategorized\", - \"priority\": \"high\" | \"medium\" | \"low\", - \"task_description\": \"Brief description of what to do (only if needs_action is true)\", - \"reasoning\": \"One sentence explaining your decision\", - \"confidence\": A number from 0 to 1 indicating how confident you are + "needs_action": true or false, + "category": "action_required" | "question" | "fyi" | "newsletter" | "promotional" | "automated" | "alert" | "uncategorized", + "priority": "high" | "medium" | "low", + "task_description": "short action-oriented description or null", + "reasoning": "one sentence", + "confidence": 0.0 to 1.0, + "details": { + "summary": "brief human-readable summary", + "suggested_title": "good Todoist/task title", + "suggested_notes": "useful multiline notes for a human reviewing or creating a ticket", + "deadline": "deadline/date/time if present, else null", + "people": ["people involved or referenced"], + "organizations": ["organizations, departments, vendors, teams"], + "attachments_referenced": ["attachment names or referenced docs if mentioned"], + "next_steps": ["specific next actions"], + "key_points": ["important context bullets"], + "source_signals": ["question", "deadline", "request", "alert", "followup", "attachment", "scheduling"] + } } -EXAMPLES: - -Email: \"Subject: Q4 Budget Review\nHi Daniel, can you review the attached budget proposal and let me know your thoughts by Friday?\" -Output: {\"needs_action\": true, \"category\": \"question\", \"priority\": \"high\", \"task_description\": \"Review Q4 budget proposal and respond by Friday\", \"reasoning\": \"Direct request with deadline\", \"confidence\": 0.91} - -Email: \"Subject: RE: Meeting\nWorking on that. Does Tuesday or Wednesday work for you?\" -Output: {\"needs_action\": true, \"category\": \"question\", \"priority\": \"medium\", \"task_description\": \"Respond with availability for Tuesday or Wednesday\", \"reasoning\": \"Scheduling question requires response\", \"confidence\": 0.85} - -Email: \"Subject: RE: Issue\nThanks, I'll look into it and get back to you.\" -Output: {\"needs_action\": false, \"category\": \"fyi\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Status update with no questions or action needed\", \"confidence\": 0.77} - -Email: \"Subject: Join us for our exclusive webinar on cloud security\nRegister now for our upcoming webinar series...\" -Output: {\"needs_action\": false, \"category\": \"promotional\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Marketing webinar invitation\", \"confidence\": 0.81} - -Email: \"Subject: Your order has shipped\nYour order #12345 has been dispatched and will arrive in 3-5 days.\" -Output: {\"needs_action\": false, \"category\": \"automated\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Automated shipping notification\", \"confidence\": 0.72} - -Email: \"Subject: Disk at 95 percent on hvs-internal-01\nThe hard disk on server hvs-internal-01 is at a critical level.\" -Output: {\"needs_action\": true, \"category\": \"alert\", \"priority\": \"medium\", \"task_description\": \"Investigate critical disk usage alert on hvs-internal-01\", \"reasoning\": \"Internal I.T. system alert requires follow-up\", \"confidence\": 0.91} - -Now classify the following email:""" +Rules for details: +- Be concrete and extract as much useful context as possible. +- suggested_notes should help a human create a ticket later. +- If a field is unknown, use null or empty list. +- Do not invent attachment names, people, or deadlines. +- If needs_action is true, task_description and suggested_title should be useful and specific. +""" diff --git a/app/sync.py b/app/sync.py new file mode 100644 index 0000000..fadba7b --- /dev/null +++ b/app/sync.py @@ -0,0 +1,154 @@ +from __future__ import annotations + +import hashlib +import json +import os +from typing import Any + +from app.dedupe_store import DedupeStore +from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, TodoistSyncResult +from app.todoist import TodoistClient + + +def build_fingerprint(request: ClassifyRequest) -> str: + subject = request.email_data.subject.strip().lower() + body = " ".join(request.email_data.body.split()).strip().lower() + seed = f"{request.from_address or ''}\n{subject}\n{body}" + return hashlib.sha256(seed.encode()).hexdigest() + + +def build_classification_hash(result: ClassificationResult) -> str: + payload = result.model_dump(exclude={"todoist"}, exclude_none=True) + return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest() + + +def render_task_content(result: ClassificationResult) -> str: + details = result.details or ClassificationDetails() + return details.suggested_title or result.task_description or details.summary or "Email follow-up" + + +def render_task_description(request: ClassifyRequest, result: ClassificationResult) -> str: + details = result.details or ClassificationDetails() + sections: list[str] = [] + if details.summary: + sections.append(f"Summary:\n{details.summary}") + if result.task_description: + sections.append(f"Action:\n{result.task_description}") + if details.suggested_notes: + sections.append(f"Notes:\n{details.suggested_notes}") + if details.deadline: + sections.append(f"Deadline:\n{details.deadline}") + if details.people: + sections.append("People:\n- " + "\n- ".join(details.people)) + if details.organizations: + sections.append("Organizations:\n- " + "\n- ".join(details.organizations)) + if details.attachments_referenced: + sections.append("Attachments referenced:\n- " + "\n- ".join(details.attachments_referenced)) + if details.next_steps: + sections.append("Next steps:\n- " + "\n- ".join(details.next_steps)) + if details.key_points: + sections.append("Key points:\n- " + "\n- ".join(details.key_points)) + metadata = [] + if request.message_id: + metadata.append(f"message_id: {request.message_id}") + if request.thread_id: + metadata.append(f"thread_id: {request.thread_id}") + if request.from_address: + metadata.append(f"from: {request.from_address}") + if request.received_at: + metadata.append(f"received_at: {request.received_at}") + if metadata: + sections.append("Source metadata:\n" + "\n".join(metadata)) + return "\n\n".join(sections).strip() + + +async def sync_todoist(request: ClassifyRequest, result: ClassificationResult) -> TodoistSyncResult: + if not result.needs_action: + return TodoistSyncResult(status="skipped", message="No action required.") + client = TodoistClient() + if not client.enabled: + return TodoistSyncResult(status="disabled", message="Todoist is not configured.") + + store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db")) + fingerprint = build_fingerprint(request) + existing = store.find_existing(message_id=request.message_id, thread_id=request.thread_id, fingerprint=fingerprint) + dedupe_match = "none" + if existing: + if request.message_id and existing.get("message_id") == request.message_id: + dedupe_match = "message_id" + elif request.thread_id and existing.get("thread_id") == request.thread_id: + dedupe_match = "thread_id" + else: + dedupe_match = "fingerprint" + + content = render_task_content(result) + description = render_task_description(request, result) + classification_hash = build_classification_hash(result) + + if not existing: + created = await client.create_task(content=content, description=description, due_string=(result.details.deadline if result.details else None)) + task_id = str(created.get("id")) + store.upsert( + existing_id=None, + message_id=request.message_id, + thread_id=request.thread_id, + fingerprint=fingerprint, + todoist_task_id=task_id, + classification_hash=classification_hash, + source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + last_result=result.model_dump(exclude_none=True), + ) + return TodoistSyncResult(status="created", task_id=task_id, dedupe_match=dedupe_match) + + task_id = str(existing["todoist_task_id"]) + if existing.get("classification_hash") == classification_hash: + store.upsert( + existing_id=existing["id"], + message_id=request.message_id, + thread_id=request.thread_id, + fingerprint=fingerprint, + todoist_task_id=task_id, + classification_hash=classification_hash, + source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + last_result=result.model_dump(exclude_none=True), + ) + return TodoistSyncResult(status="unchanged", task_id=task_id, dedupe_match=dedupe_match, message="Existing task already reflects this classification.") + + await client.update_task(task_id, content=content, description=description, due_string=(result.details.deadline if result.details else None)) + comment_added = False + previous_details = (existing.get("last_result") or {}).get("details") or {} + current_details = (result.details.model_dump(exclude_none=True) if result.details else {}) + if _material_context_changed(previous_details, current_details): + await client.add_comment(task_id, _build_update_comment(result)) + comment_added = True + + store.upsert( + existing_id=existing["id"], + message_id=request.message_id, + thread_id=request.thread_id, + fingerprint=fingerprint, + todoist_task_id=task_id, + classification_hash=classification_hash, + source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + last_result=result.model_dump(exclude_none=True), + ) + return TodoistSyncResult(status="updated", task_id=task_id, comment_added=comment_added, dedupe_match=dedupe_match) + + +def _material_context_changed(previous: dict[str, Any], current: dict[str, Any]) -> bool: + keys = {"summary", "deadline", "attachments_referenced", "next_steps", "key_points", "people"} + return any(previous.get(k) != current.get(k) for k in keys) + + +def _build_update_comment(result: ClassificationResult) -> str: + details = result.details or ClassificationDetails() + parts = ["Email classifier update:"] + if details.summary: + parts.append(f"Summary: {details.summary}") + if details.deadline: + parts.append(f"Deadline: {details.deadline}") + if details.next_steps: + parts.append("Next steps: " + "; ".join(details.next_steps)) + if details.key_points: + parts.append("Key points: " + "; ".join(details.key_points[:4])) + return "\n".join(parts) diff --git a/app/todoist.py b/app/todoist.py new file mode 100644 index 0000000..8ca9257 --- /dev/null +++ b/app/todoist.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import os +from typing import Any + +import httpx + + +class TodoistClient: + def __init__(self, api_key: str | None = None, base_url: str | None = None): + self.api_key = api_key or os.getenv("TODOIST_API_KEY") + self.base_url = (base_url or os.getenv("TODOIST_API_BASE_URL") or "https://api.todoist.com/rest/v2").rstrip("/") + self.project_id = os.getenv("TODOIST_PROJECT_ID") + + @property + def enabled(self) -> bool: + return bool(self.api_key) + + def _headers(self) -> dict[str, str]: + if not self.api_key: + raise RuntimeError("TODOIST_API_KEY is not configured") + return {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} + + async def create_task(self, *, content: str, description: str, due_string: str | None = None) -> dict[str, Any]: + payload: dict[str, Any] = {"content": content, "description": description} + if self.project_id: + payload["project_id"] = self.project_id + if due_string: + payload["due_string"] = due_string + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(f"{self.base_url}/tasks", headers=self._headers(), json=payload) + response.raise_for_status() + return response.json() + + async def update_task(self, task_id: str, *, content: str, description: str, due_string: str | None = None) -> None: + payload: dict[str, Any] = {"content": content, "description": description} + if due_string: + payload["due_string"] = due_string + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(f"{self.base_url}/tasks/{task_id}", headers=self._headers(), json=payload) + response.raise_for_status() + + async def add_comment(self, task_id: str, content: str) -> dict[str, Any]: + payload = {"task_id": task_id, "content": content} + async with httpx.AsyncClient(timeout=30) as client: + response = await client.post(f"{self.base_url}/comments", headers=self._headers(), json=payload) + response.raise_for_status() + return response.json() diff --git a/pyproject.toml b/pyproject.toml index 17c2c0b..d80affe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "anthropic>=0.57.1", "beautifulsoup4>=4.14.3", "fastapi>=0.128.0", + "httpx>=0.28.1", "openai>=2.16.0", "uvicorn>=0.40.0", ] -- 2.49.1 From 1b2c7db92493190eca78d6aa649a9257b1a0c468 Mon Sep 17 00:00:00 2001 From: Steve W Date: Thu, 9 Apr 2026 18:18:35 +0000 Subject: [PATCH 2/4] Refocus classifier on rich extraction and local dedupe only --- README.md | 61 ++++++++-------- app/classifier.py | 4 +- app/dedupe_store.py | 94 ++++++++++++------------ app/models.py | 16 ++-- app/sync.py | 173 ++++++++++++-------------------------------- app/todoist.py | 48 ------------ pyproject.toml | 1 - 7 files changed, 130 insertions(+), 267 deletions(-) delete mode 100644 app/todoist.py diff --git a/README.md b/README.md index 34a722c..353c9da 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # email-classifier -FastAPI service that classifies email using a configurable LLM backend, enriches the output for human review, and can upsert Todoist tasks without creating duplicates. +FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using fingerprint-based dedupe. ## Environment configuration @@ -25,11 +25,9 @@ export LLM_API_KEY=your_minimax_key export LLM_MODEL=MiniMax-M2.7 ``` -Optional Todoist sync: +Optional local dedupe store path: ```bash -export TODOIST_API_KEY=your_todoist_token -export TODOIST_PROJECT_ID=optional_project_id export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ``` @@ -37,9 +35,9 @@ export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ### POST /classify -Backward-compatible top-level response fields are preserved. +This overhaul is intended to return richer extraction. Top-level compatibility is not required. -Optional request metadata for dedupe and richer sync: +Request example: ```json { @@ -47,8 +45,6 @@ Optional request metadata for dedupe and richer sync: "subject": "Can you review this by Friday?", "body": "Hi Daniel, please review the attached budget proposal." }, - "message_id": "", - "thread_id": "thread-789", "from_address": "sender@example.com", "received_at": "2026-04-09T12:55:00Z", "provider": "anthropic", @@ -57,7 +53,7 @@ Optional request metadata for dedupe and richer sync: } ``` -Response now includes optional enrichment and Todoist sync info: +Response example: ```json { @@ -80,42 +76,43 @@ Response now includes optional enrichment and Todoist sync info: "source_signals": ["request", "deadline"], "dedupe_key": "..." }, - "todoist": { - "status": "created", - "task_id": "1234567890", - "comment_added": false, - "dedupe_match": "none", - "message": null + "dedupe": { + "status": "new", + "seen_count": 1, + "matched_on": "none", + "subject_key": "...", + "fingerprint": "..." } } ``` ## Dedupe behavior -When Todoist sync is enabled and `needs_action=true`: -- first match by `message_id` -- then by `thread_id` -- then by normalized content fingerprint fallback +The API does not create or update Todoist tasks. +It only returns richer extraction and local dedupe metadata for downstream automation like n8n. -Behavior: -- no existing task: create Todoist task -- existing task, same classification: do not duplicate, mark `unchanged` -- existing task, changed classification/context: update task in place -- add a Todoist comment only when material context changed +Matching strategy: +- normalized subject plus sender-derived `subject_key` +- full content fingerprint fallback based on sender + normalized subject + cleaned body + +Statuses: +- `new`: no prior similar email seen +- `duplicate`: same dedupe target and same extracted result as before +- `updated`: matched prior email, but extracted result changed + +This is intentionally heuristic, not perfect. ## Architecture -- `app/classifier.py`: classification orchestration and Todoist sync handoff +- `app/classifier.py`: classification orchestration and dedupe handoff - `app/prompts.py`: richer extraction prompt -- `app/sync.py`: dedupe, task rendering, Todoist upsert logic -- `app/dedupe_store.py`: SQLite-backed mapping store -- `app/todoist.py`: Todoist REST client +- `app/sync.py`: subject normalization, fingerprinting, dedupe application +- `app/dedupe_store.py`: SQLite-backed dedupe store - `app/llm_adapters.py`: provider adapters - `app/config.py`: LLM settings ## Notes -- `/classify` remains backward compatible at the top level. -- New request metadata fields are optional. -- Todoist sync safely no-ops when `TODOIST_API_KEY` is not configured. -- SQLite is used for lightweight production-safe dedupe tracking. +- No Todoist integration lives in this API. +- Dedupe is best-effort and designed to help downstream workflows avoid obvious duplicates. +- SQLite is used for lightweight local dedupe tracking. diff --git a/app/classifier.py b/app/classifier.py index d603ef7..39503d1 100644 --- a/app/classifier.py +++ b/app/classifier.py @@ -6,7 +6,7 @@ from typing import Any from app.config import get_request_settings from app.llm_adapters import build_adapter, coerce_json_text from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData -from app.sync import build_fingerprint, sync_todoist +from app.sync import apply_dedupe, build_fingerprint VALID_CATEGORIES = { "action_required", @@ -57,7 +57,7 @@ async def classify_email(request: ClassifyRequest) -> ClassificationResult: details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)), ) - result.todoist = await sync_todoist(clean_email, result) + result.dedupe = apply_dedupe(clean_email, result) return result diff --git a/app/dedupe_store.py b/app/dedupe_store.py index 32cb279..a94f121 100644 --- a/app/dedupe_store.py +++ b/app/dedupe_store.py @@ -21,86 +21,82 @@ class DedupeStore: with self._connect() as conn: conn.execute( """ - CREATE TABLE IF NOT EXISTS todoist_sync ( + CREATE TABLE IF NOT EXISTS classification_dedupe ( id INTEGER PRIMARY KEY AUTOINCREMENT, - message_id TEXT, - thread_id TEXT, + subject_key TEXT NOT NULL, fingerprint TEXT NOT NULL, - todoist_task_id TEXT NOT NULL, - classification_hash TEXT NOT NULL, - source_payload TEXT NOT NULL, - last_result TEXT NOT NULL, + result_hash TEXT NOT NULL, + request_payload TEXT NOT NULL, + result_payload TEXT NOT NULL, + seen_count INTEGER NOT NULL DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """ ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_message_id ON todoist_sync(message_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_thread_id ON todoist_sync(thread_id)") - conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_fingerprint ON todoist_sync(fingerprint)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_subject_key ON classification_dedupe(subject_key)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)") - def find_existing(self, *, message_id: str | None, thread_id: str | None, fingerprint: str) -> dict[str, Any] | None: - queries = [] - if message_id: - queries.append(("SELECT * FROM todoist_sync WHERE message_id = ? ORDER BY id DESC LIMIT 1", (message_id,))) - if thread_id: - queries.append(("SELECT * FROM todoist_sync WHERE thread_id = ? ORDER BY id DESC LIMIT 1", (thread_id,))) - queries.append(("SELECT * FROM todoist_sync WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", (fingerprint,))) + def find_existing(self, *, subject_key: str, fingerprint: str) -> dict[str, Any] | None: with self._connect() as conn: - for sql, params in queries: - row = conn.execute(sql, params).fetchone() - if row: - data = dict(row) - data["source_payload"] = json.loads(data["source_payload"]) - data["last_result"] = json.loads(data["last_result"]) - return data - return None + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", + (fingerprint,), + ).fetchone() + if row is None: + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE subject_key = ? ORDER BY id DESC LIMIT 1", + (subject_key,), + ).fetchone() + if not row: + return None + data = dict(row) + data["request_payload"] = json.loads(data["request_payload"]) + data["result_payload"] = json.loads(data["result_payload"]) + return data - def upsert( + def insert_or_update( self, *, existing_id: int | None, - message_id: str | None, - thread_id: str | None, + subject_key: str, fingerprint: str, - todoist_task_id: str, - classification_hash: str, - source_payload: dict[str, Any], - last_result: dict[str, Any], + result_hash: str, + request_payload: dict[str, Any], + result_payload: dict[str, Any], + seen_count: int, ) -> None: with self._connect() as conn: if existing_id is None: conn.execute( """ - INSERT INTO todoist_sync (message_id, thread_id, fingerprint, todoist_task_id, classification_hash, source_payload, last_result) - VALUES (?, ?, ?, ?, ?, ?, ?) + INSERT INTO classification_dedupe (subject_key, fingerprint, result_hash, request_payload, result_payload, seen_count) + VALUES (?, ?, ?, ?, ?, ?) """, ( - message_id, - thread_id, + subject_key, fingerprint, - todoist_task_id, - classification_hash, - json.dumps(source_payload, sort_keys=True), - json.dumps(last_result, sort_keys=True), + result_hash, + json.dumps(request_payload, sort_keys=True), + json.dumps(result_payload, sort_keys=True), + seen_count, ), ) else: conn.execute( """ - UPDATE todoist_sync - SET message_id = ?, thread_id = ?, fingerprint = ?, todoist_task_id = ?, classification_hash = ?, - source_payload = ?, last_result = ?, updated_at = CURRENT_TIMESTAMP + UPDATE classification_dedupe + SET subject_key = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, + seen_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( - message_id, - thread_id, + subject_key, fingerprint, - todoist_task_id, - classification_hash, - json.dumps(source_payload, sort_keys=True), - json.dumps(last_result, sort_keys=True), + result_hash, + json.dumps(request_payload, sort_keys=True), + json.dumps(result_payload, sort_keys=True), + seen_count, existing_id, ), ) diff --git a/app/models.py b/app/models.py index 39a284f..9badd5f 100644 --- a/app/models.py +++ b/app/models.py @@ -17,8 +17,6 @@ class ClassifyRequest(BaseModel): base_url: str | None = None api_key: str | None = Field(default=None, exclude=True) temperature: float | None = None - message_id: str | None = None - thread_id: str | None = None from_address: str | None = None received_at: str | None = None @@ -37,12 +35,12 @@ class ClassificationDetails(BaseModel): dedupe_key: str | None = None -class TodoistSyncResult(BaseModel): - status: Literal["created", "updated", "unchanged", "disabled", "skipped", "error"] - task_id: str | None = None - comment_added: bool = False - dedupe_match: Literal["message_id", "thread_id", "fingerprint", "none"] = "none" - message: str | None = None +class DedupeResult(BaseModel): + status: Literal["new", "duplicate", "updated"] + seen_count: int = 1 + matched_on: Literal["none", "subject", "fingerprint"] = "none" + subject_key: str + fingerprint: str class ClassificationResult(BaseModel): @@ -53,4 +51,4 @@ class ClassificationResult(BaseModel): reasoning: str confidence: float details: ClassificationDetails | None = None - todoist: TodoistSyncResult | None = None + dedupe: DedupeResult | None = None diff --git a/app/sync.py b/app/sync.py index fadba7b..2f5e29f 100644 --- a/app/sync.py +++ b/app/sync.py @@ -3,152 +3,73 @@ from __future__ import annotations import hashlib import json import os -from typing import Any +import re from app.dedupe_store import DedupeStore -from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, TodoistSyncResult -from app.todoist import TodoistClient +from app.models import ClassificationResult, ClassifyRequest, DedupeResult + + +def normalize_subject(subject: str) -> str: + value = subject.strip().lower() + value = re.sub(r"^(re|fw|fwd)\s*:\s*", "", value) + value = re.sub(r"\s+", " ", value) + return value + + +def build_subject_key(request: ClassifyRequest) -> str: + subject = normalize_subject(request.email_data.subject) + sender = (request.from_address or "").strip().lower() + return hashlib.sha256(f"{sender}\n{subject}".encode()).hexdigest() def build_fingerprint(request: ClassifyRequest) -> str: - subject = request.email_data.subject.strip().lower() + subject = normalize_subject(request.email_data.subject) body = " ".join(request.email_data.body.split()).strip().lower() - seed = f"{request.from_address or ''}\n{subject}\n{body}" + seed = f"{request.from_address or ''}\n{subject}\n{body[:2000]}" return hashlib.sha256(seed.encode()).hexdigest() -def build_classification_hash(result: ClassificationResult) -> str: - payload = result.model_dump(exclude={"todoist"}, exclude_none=True) +def build_result_hash(result: ClassificationResult) -> str: + payload = result.model_dump(exclude={"dedupe"}, exclude_none=True) return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest() -def render_task_content(result: ClassificationResult) -> str: - details = result.details or ClassificationDetails() - return details.suggested_title or result.task_description or details.summary or "Email follow-up" - - -def render_task_description(request: ClassifyRequest, result: ClassificationResult) -> str: - details = result.details or ClassificationDetails() - sections: list[str] = [] - if details.summary: - sections.append(f"Summary:\n{details.summary}") - if result.task_description: - sections.append(f"Action:\n{result.task_description}") - if details.suggested_notes: - sections.append(f"Notes:\n{details.suggested_notes}") - if details.deadline: - sections.append(f"Deadline:\n{details.deadline}") - if details.people: - sections.append("People:\n- " + "\n- ".join(details.people)) - if details.organizations: - sections.append("Organizations:\n- " + "\n- ".join(details.organizations)) - if details.attachments_referenced: - sections.append("Attachments referenced:\n- " + "\n- ".join(details.attachments_referenced)) - if details.next_steps: - sections.append("Next steps:\n- " + "\n- ".join(details.next_steps)) - if details.key_points: - sections.append("Key points:\n- " + "\n- ".join(details.key_points)) - metadata = [] - if request.message_id: - metadata.append(f"message_id: {request.message_id}") - if request.thread_id: - metadata.append(f"thread_id: {request.thread_id}") - if request.from_address: - metadata.append(f"from: {request.from_address}") - if request.received_at: - metadata.append(f"received_at: {request.received_at}") - if metadata: - sections.append("Source metadata:\n" + "\n".join(metadata)) - return "\n\n".join(sections).strip() - - -async def sync_todoist(request: ClassifyRequest, result: ClassificationResult) -> TodoistSyncResult: - if not result.needs_action: - return TodoistSyncResult(status="skipped", message="No action required.") - client = TodoistClient() - if not client.enabled: - return TodoistSyncResult(status="disabled", message="Todoist is not configured.") - +def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult: store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db")) + subject_key = build_subject_key(request) fingerprint = build_fingerprint(request) - existing = store.find_existing(message_id=request.message_id, thread_id=request.thread_id, fingerprint=fingerprint) - dedupe_match = "none" - if existing: - if request.message_id and existing.get("message_id") == request.message_id: - dedupe_match = "message_id" - elif request.thread_id and existing.get("thread_id") == request.thread_id: - dedupe_match = "thread_id" - else: - dedupe_match = "fingerprint" - - content = render_task_content(result) - description = render_task_description(request, result) - classification_hash = build_classification_hash(result) + result_hash = build_result_hash(result) + existing = store.find_existing(subject_key=subject_key, fingerprint=fingerprint) if not existing: - created = await client.create_task(content=content, description=description, due_string=(result.details.deadline if result.details else None)) - task_id = str(created.get("id")) - store.upsert( + store.insert_or_update( existing_id=None, - message_id=request.message_id, - thread_id=request.thread_id, + subject_key=subject_key, fingerprint=fingerprint, - todoist_task_id=task_id, - classification_hash=classification_hash, - source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), - last_result=result.model_dump(exclude_none=True), + result_hash=result_hash, + request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True), + seen_count=1, ) - return TodoistSyncResult(status="created", task_id=task_id, dedupe_match=dedupe_match) + return DedupeResult(status="new", seen_count=1, matched_on="none", subject_key=subject_key, fingerprint=fingerprint) - task_id = str(existing["todoist_task_id"]) - if existing.get("classification_hash") == classification_hash: - store.upsert( - existing_id=existing["id"], - message_id=request.message_id, - thread_id=request.thread_id, - fingerprint=fingerprint, - todoist_task_id=task_id, - classification_hash=classification_hash, - source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), - last_result=result.model_dump(exclude_none=True), - ) - return TodoistSyncResult(status="unchanged", task_id=task_id, dedupe_match=dedupe_match, message="Existing task already reflects this classification.") - - await client.update_task(task_id, content=content, description=description, due_string=(result.details.deadline if result.details else None)) - comment_added = False - previous_details = (existing.get("last_result") or {}).get("details") or {} - current_details = (result.details.model_dump(exclude_none=True) if result.details else {}) - if _material_context_changed(previous_details, current_details): - await client.add_comment(task_id, _build_update_comment(result)) - comment_added = True - - store.upsert( + matched_on = "fingerprint" if existing.get("fingerprint") == fingerprint else "subject" + previous_hash = existing.get("result_hash") + seen_count = int(existing.get("seen_count", 1)) + 1 + status = "duplicate" if previous_hash == result_hash else "updated" + store.insert_or_update( existing_id=existing["id"], - message_id=request.message_id, - thread_id=request.thread_id, + subject_key=subject_key, + fingerprint=fingerprint, + result_hash=result_hash, + request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), + result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True), + seen_count=seen_count, + ) + return DedupeResult( + status=status, + seen_count=seen_count, + matched_on=matched_on, + subject_key=subject_key, fingerprint=fingerprint, - todoist_task_id=task_id, - classification_hash=classification_hash, - source_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), - last_result=result.model_dump(exclude_none=True), ) - return TodoistSyncResult(status="updated", task_id=task_id, comment_added=comment_added, dedupe_match=dedupe_match) - - -def _material_context_changed(previous: dict[str, Any], current: dict[str, Any]) -> bool: - keys = {"summary", "deadline", "attachments_referenced", "next_steps", "key_points", "people"} - return any(previous.get(k) != current.get(k) for k in keys) - - -def _build_update_comment(result: ClassificationResult) -> str: - details = result.details or ClassificationDetails() - parts = ["Email classifier update:"] - if details.summary: - parts.append(f"Summary: {details.summary}") - if details.deadline: - parts.append(f"Deadline: {details.deadline}") - if details.next_steps: - parts.append("Next steps: " + "; ".join(details.next_steps)) - if details.key_points: - parts.append("Key points: " + "; ".join(details.key_points[:4])) - return "\n".join(parts) diff --git a/app/todoist.py b/app/todoist.py deleted file mode 100644 index 8ca9257..0000000 --- a/app/todoist.py +++ /dev/null @@ -1,48 +0,0 @@ -from __future__ import annotations - -import os -from typing import Any - -import httpx - - -class TodoistClient: - def __init__(self, api_key: str | None = None, base_url: str | None = None): - self.api_key = api_key or os.getenv("TODOIST_API_KEY") - self.base_url = (base_url or os.getenv("TODOIST_API_BASE_URL") or "https://api.todoist.com/rest/v2").rstrip("/") - self.project_id = os.getenv("TODOIST_PROJECT_ID") - - @property - def enabled(self) -> bool: - return bool(self.api_key) - - def _headers(self) -> dict[str, str]: - if not self.api_key: - raise RuntimeError("TODOIST_API_KEY is not configured") - return {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} - - async def create_task(self, *, content: str, description: str, due_string: str | None = None) -> dict[str, Any]: - payload: dict[str, Any] = {"content": content, "description": description} - if self.project_id: - payload["project_id"] = self.project_id - if due_string: - payload["due_string"] = due_string - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(f"{self.base_url}/tasks", headers=self._headers(), json=payload) - response.raise_for_status() - return response.json() - - async def update_task(self, task_id: str, *, content: str, description: str, due_string: str | None = None) -> None: - payload: dict[str, Any] = {"content": content, "description": description} - if due_string: - payload["due_string"] = due_string - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(f"{self.base_url}/tasks/{task_id}", headers=self._headers(), json=payload) - response.raise_for_status() - - async def add_comment(self, task_id: str, content: str) -> dict[str, Any]: - payload = {"task_id": task_id, "content": content} - async with httpx.AsyncClient(timeout=30) as client: - response = await client.post(f"{self.base_url}/comments", headers=self._headers(), json=payload) - response.raise_for_status() - return response.json() diff --git a/pyproject.toml b/pyproject.toml index d80affe..17c2c0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ dependencies = [ "anthropic>=0.57.1", "beautifulsoup4>=4.14.3", "fastapi>=0.128.0", - "httpx>=0.28.1", "openai>=2.16.0", "uvicorn>=0.40.0", ] -- 2.49.1 From c6ee7359497b670c20ce72bb192e63acbdba533e Mon Sep 17 00:00:00 2001 From: Steve W Date: Thu, 9 Apr 2026 18:26:37 +0000 Subject: [PATCH 3/4] Use Outlook ids for classifier dedupe precedence --- README.md | 97 +++++++++++++++++++++++++-------------------- app/dedupe_store.py | 57 ++++++++++++++++---------- app/models.py | 16 ++++++-- app/sync.py | 36 ++++++++++------- 4 files changed, 127 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 353c9da..c483937 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # email-classifier -FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using fingerprint-based dedupe. +FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using Outlook-aware dedupe. ## Environment configuration @@ -31,88 +31,101 @@ Optional local dedupe store path: export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ``` -## API +## Input shape -### POST /classify - -This overhaul is intended to return richer extraction. Top-level compatibility is not required. - -Request example: +Designed around real Outlook message payloads. Relevant fields: ```json { + "id": "AAMk...", + "internetMessageId": "<...@...>", + "conversationId": "AAQk...", + "subject": "MB Printer", + "bodyPreview": "Good morning, ...", + "receivedDateTime": "2026-02-19T15:27:35Z", + "sentDateTime": "2026-02-19T15:27:32Z", + "hasAttachments": false, + "importance": "normal", + "isRead": false, + "body": { + "contentType": "html", + "content": "..." + } +} +``` + +API request example: + +```json +{ + "id": "AAMk...", + "internetMessageId": "<...@...>", + "conversationId": "AAQk...", + "bodyPreview": "Good morning, ...", + "receivedDateTime": "2026-02-19T15:27:35Z", + "sentDateTime": "2026-02-19T15:27:32Z", + "hasAttachments": false, + "importance": "normal", + "isRead": false, "email_data": { - "subject": "Can you review this by Friday?", - "body": "Hi Daniel, please review the attached budget proposal." + "subject": "MB Printer", + "body": "..." }, - "from_address": "sender@example.com", - "received_at": "2026-04-09T12:55:00Z", "provider": "anthropic", "base_url": "https://api.minimax.io/anthropic", "model": "MiniMax-M2.7" } ``` -Response example: +## Response example ```json { "needs_action": true, "category": "question", "priority": "high", - "task_description": "Review the budget proposal and respond by Friday", - "reasoning": "Direct request with a deadline requires follow-up", + "task_description": "Investigate MB Printer issue and reply", + "reasoning": "The email appears to describe an issue requiring action.", "confidence": 0.91, "details": { - "summary": "Budget proposal review requested with Friday deadline.", - "suggested_title": "Review budget proposal and respond by Friday", - "suggested_notes": "Requester asked for feedback on attached budget proposal before Friday.", - "deadline": "Friday", - "people": ["Daniel"], + "summary": "Printer issue reported in the MB area.", + "suggested_title": "Handle MB Printer issue", + "suggested_notes": "Review the printer problem, identify urgency, and reply with next steps.", + "deadline": null, + "people": [], "organizations": [], - "attachments_referenced": ["budget proposal"], - "next_steps": ["Review attachment", "Reply with feedback"], - "key_points": ["Deadline is Friday"], - "source_signals": ["request", "deadline"], + "attachments_referenced": [], + "next_steps": ["Review issue", "Respond to sender"], + "key_points": ["Printer issue reported"], + "source_signals": ["request"], "dedupe_key": "..." }, "dedupe": { "status": "new", "seen_count": 1, "matched_on": "none", - "subject_key": "...", + "message_id": "AAMk...", + "conversation_id": "AAQk...", "fingerprint": "..." } } ``` -## Dedupe behavior +## Dedupe precedence -The API does not create or update Todoist tasks. -It only returns richer extraction and local dedupe metadata for downstream automation like n8n. - -Matching strategy: -- normalized subject plus sender-derived `subject_key` -- full content fingerprint fallback based on sender + normalized subject + cleaned body +1. `id` for exact Outlook message match +2. `conversationId` for thread grouping +3. normalized subject + preview/body fingerprint fallback Statuses: - `new`: no prior similar email seen - `duplicate`: same dedupe target and same extracted result as before - `updated`: matched prior email, but extracted result changed -This is intentionally heuristic, not perfect. - -## Architecture - -- `app/classifier.py`: classification orchestration and dedupe handoff -- `app/prompts.py`: richer extraction prompt -- `app/sync.py`: subject normalization, fingerprinting, dedupe application -- `app/dedupe_store.py`: SQLite-backed dedupe store -- `app/llm_adapters.py`: provider adapters -- `app/config.py`: LLM settings +This is intentionally heuristic for the fallback path. ## Notes - No Todoist integration lives in this API. -- Dedupe is best-effort and designed to help downstream workflows avoid obvious duplicates. +- Dedupe is local and intended to help downstream workflows avoid obvious duplicates. - SQLite is used for lightweight local dedupe tracking. diff --git a/app/dedupe_store.py b/app/dedupe_store.py index a94f121..c96aeca 100644 --- a/app/dedupe_store.py +++ b/app/dedupe_store.py @@ -23,7 +23,8 @@ class DedupeStore: """ CREATE TABLE IF NOT EXISTS classification_dedupe ( id INTEGER PRIMARY KEY AUTOINCREMENT, - subject_key TEXT NOT NULL, + outlook_id TEXT, + conversation_id TEXT, fingerprint TEXT NOT NULL, result_hash TEXT NOT NULL, request_payload TEXT NOT NULL, @@ -34,32 +35,46 @@ class DedupeStore: ) """ ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_subject_key ON classification_dedupe(subject_key)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_outlook_id ON classification_dedupe(outlook_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_conversation_id ON classification_dedupe(conversation_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)") - def find_existing(self, *, subject_key: str, fingerprint: str) -> dict[str, Any] | None: + def find_existing(self, *, outlook_id: str | None, conversation_id: str | None, fingerprint: str) -> tuple[dict[str, Any] | None, str]: with self._connect() as conn: + if outlook_id: + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE outlook_id = ? ORDER BY id DESC LIMIT 1", + (outlook_id,), + ).fetchone() + if row: + return self._decode(row), "id" + if conversation_id: + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE conversation_id = ? ORDER BY id DESC LIMIT 1", + (conversation_id,), + ).fetchone() + if row: + return self._decode(row), "conversation" row = conn.execute( "SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", (fingerprint,), ).fetchone() - if row is None: - row = conn.execute( - "SELECT * FROM classification_dedupe WHERE subject_key = ? ORDER BY id DESC LIMIT 1", - (subject_key,), - ).fetchone() - if not row: - return None - data = dict(row) - data["request_payload"] = json.loads(data["request_payload"]) - data["result_payload"] = json.loads(data["result_payload"]) - return data + if row: + return self._decode(row), "fingerprint" + return None, "none" + + def _decode(self, row: sqlite3.Row) -> dict[str, Any]: + data = dict(row) + data["request_payload"] = json.loads(data["request_payload"]) + data["result_payload"] = json.loads(data["result_payload"]) + return data def insert_or_update( self, *, existing_id: int | None, - subject_key: str, + outlook_id: str | None, + conversation_id: str | None, fingerprint: str, result_hash: str, request_payload: dict[str, Any], @@ -70,11 +85,12 @@ class DedupeStore: if existing_id is None: conn.execute( """ - INSERT INTO classification_dedupe (subject_key, fingerprint, result_hash, request_payload, result_payload, seen_count) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO classification_dedupe (outlook_id, conversation_id, fingerprint, result_hash, request_payload, result_payload, seen_count) + VALUES (?, ?, ?, ?, ?, ?, ?) """, ( - subject_key, + outlook_id, + conversation_id, fingerprint, result_hash, json.dumps(request_payload, sort_keys=True), @@ -86,12 +102,13 @@ class DedupeStore: conn.execute( """ UPDATE classification_dedupe - SET subject_key = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, + SET outlook_id = ?, conversation_id = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, seen_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( - subject_key, + outlook_id, + conversation_id, fingerprint, result_hash, json.dumps(request_payload, sort_keys=True), diff --git a/app/models.py b/app/models.py index 9badd5f..6c5c0b0 100644 --- a/app/models.py +++ b/app/models.py @@ -17,8 +17,17 @@ class ClassifyRequest(BaseModel): base_url: str | None = None api_key: str | None = Field(default=None, exclude=True) temperature: float | None = None + + id: str | None = None + internetMessageId: str | None = None + conversationId: str | None = None + bodyPreview: str | None = None + receivedDateTime: str | None = None + sentDateTime: str | None = None + hasAttachments: bool | None = None + importance: str | None = None + isRead: bool | None = None from_address: str | None = None - received_at: str | None = None class ClassificationDetails(BaseModel): @@ -38,8 +47,9 @@ class ClassificationDetails(BaseModel): class DedupeResult(BaseModel): status: Literal["new", "duplicate", "updated"] seen_count: int = 1 - matched_on: Literal["none", "subject", "fingerprint"] = "none" - subject_key: str + matched_on: Literal["none", "id", "conversation", "fingerprint"] = "none" + message_id: str | None = None + conversation_id: str | None = None fingerprint: str diff --git a/app/sync.py b/app/sync.py index 2f5e29f..86f6f00 100644 --- a/app/sync.py +++ b/app/sync.py @@ -16,16 +16,12 @@ def normalize_subject(subject: str) -> str: return value -def build_subject_key(request: ClassifyRequest) -> str: - subject = normalize_subject(request.email_data.subject) - sender = (request.from_address or "").strip().lower() - return hashlib.sha256(f"{sender}\n{subject}".encode()).hexdigest() - - def build_fingerprint(request: ClassifyRequest) -> str: subject = normalize_subject(request.email_data.subject) body = " ".join(request.email_data.body.split()).strip().lower() - seed = f"{request.from_address or ''}\n{subject}\n{body[:2000]}" + preview = " ".join((request.bodyPreview or "").split()).strip().lower() + sender = (request.from_address or "").strip().lower() + seed = f"{sender}\n{subject}\n{preview}\n{body[:2000]}" return hashlib.sha256(seed.encode()).hexdigest() @@ -36,30 +32,41 @@ def build_result_hash(result: ClassificationResult) -> str: def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult: store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db")) - subject_key = build_subject_key(request) fingerprint = build_fingerprint(request) + existing, matched_on = store.find_existing( + outlook_id=request.id, + conversation_id=request.conversationId, + fingerprint=fingerprint, + ) result_hash = build_result_hash(result) - existing = store.find_existing(subject_key=subject_key, fingerprint=fingerprint) if not existing: store.insert_or_update( existing_id=None, - subject_key=subject_key, + outlook_id=request.id, + conversation_id=request.conversationId, fingerprint=fingerprint, result_hash=result_hash, request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True), seen_count=1, ) - return DedupeResult(status="new", seen_count=1, matched_on="none", subject_key=subject_key, fingerprint=fingerprint) + return DedupeResult( + status="new", + seen_count=1, + matched_on="none", + message_id=request.id, + conversation_id=request.conversationId, + fingerprint=fingerprint, + ) - matched_on = "fingerprint" if existing.get("fingerprint") == fingerprint else "subject" previous_hash = existing.get("result_hash") seen_count = int(existing.get("seen_count", 1)) + 1 status = "duplicate" if previous_hash == result_hash else "updated" store.insert_or_update( existing_id=existing["id"], - subject_key=subject_key, + outlook_id=request.id or existing.get("outlook_id"), + conversation_id=request.conversationId or existing.get("conversation_id"), fingerprint=fingerprint, result_hash=result_hash, request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), @@ -70,6 +77,7 @@ def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> Dedu status=status, seen_count=seen_count, matched_on=matched_on, - subject_key=subject_key, + message_id=request.id or existing.get("outlook_id"), + conversation_id=request.conversationId or existing.get("conversation_id"), fingerprint=fingerprint, ) -- 2.49.1 From 8c49ce21e01ee84cd15d062e86a597d6d497e106 Mon Sep 17 00:00:00 2001 From: Steve W Date: Thu, 9 Apr 2026 18:32:32 +0000 Subject: [PATCH 4/4] Accept native Outlook message shape in classifier request --- README.md | 57 +++++++++++++++++++++++++++++++++++---------------- app/models.py | 47 ++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index c483937..b5195a0 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,11 @@ export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ## Input shape -Designed around real Outlook message payloads. Relevant fields: +The request model accepts either: +- simplified input via `email_data` +- or native Outlook-style fields directly + +Full Outlook-shaped example: ```json { @@ -42,38 +46,55 @@ Designed around real Outlook message payloads. Relevant fields: "conversationId": "AAQk...", "subject": "MB Printer", "bodyPreview": "Good morning, ...", + "body": { + "contentType": "html", + "content": "...(full HTML body)" + }, + "sender": { + "emailAddress": { + "name": "Bobbi Johnson", + "address": "bobbi.johnson@grandportage.com" + } + }, + "from": { + "emailAddress": { + "name": "Bobbi Johnson", + "address": "bobbi.johnson@grandportage.com" + } + }, + "toRecipients": [ + { + "emailAddress": { + "name": "IT Helpdesk Mail", + "address": "helpdeskmail@grandportage.com" + } + } + ], + "ccRecipients": [], + "bccRecipients": [], + "replyTo": [], "receivedDateTime": "2026-02-19T15:27:35Z", "sentDateTime": "2026-02-19T15:27:32Z", "hasAttachments": false, "importance": "normal", "isRead": false, - "body": { - "contentType": "html", - "content": "..." - } + "flag": { "flagStatus": "notFlagged" }, + "provider": "anthropic", + "base_url": "https://api.minimax.io/anthropic", + "model": "MiniMax-M2.7" } ``` -API request example: +Simplified request example: ```json { - "id": "AAMk...", - "internetMessageId": "<...@...>", - "conversationId": "AAQk...", - "bodyPreview": "Good morning, ...", - "receivedDateTime": "2026-02-19T15:27:35Z", - "sentDateTime": "2026-02-19T15:27:32Z", - "hasAttachments": false, - "importance": "normal", - "isRead": false, "email_data": { "subject": "MB Printer", "body": "..." }, - "provider": "anthropic", - "base_url": "https://api.minimax.io/anthropic", - "model": "MiniMax-M2.7" + "id": "AAMk...", + "conversationId": "AAQk..." } ``` diff --git a/app/models.py b/app/models.py index 6c5c0b0..e3b1a44 100644 --- a/app/models.py +++ b/app/models.py @@ -2,7 +2,7 @@ from __future__ import annotations from typing import Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, model_validator class EmailData(BaseModel): @@ -10,8 +10,26 @@ class EmailData(BaseModel): body: str +class EmailAddress(BaseModel): + name: str | None = None + address: str | None = None + + +class Recipient(BaseModel): + emailAddress: EmailAddress | None = None + + +class EmailBody(BaseModel): + contentType: str | None = None + content: str | None = None + + +class Flag(BaseModel): + flagStatus: str | None = None + + class ClassifyRequest(BaseModel): - email_data: EmailData + email_data: EmailData | None = None provider: Literal["openai", "anthropic"] | None = None model: str | None = None base_url: str | None = None @@ -21,14 +39,39 @@ class ClassifyRequest(BaseModel): id: str | None = None internetMessageId: str | None = None conversationId: str | None = None + subject: str | None = None bodyPreview: str | None = None + body: EmailBody | None = None + sender: Recipient | None = None + from_: Recipient | None = Field(default=None, alias="from") + toRecipients: list[Recipient] = Field(default_factory=list) + ccRecipients: list[Recipient] = Field(default_factory=list) + bccRecipients: list[Recipient] = Field(default_factory=list) + replyTo: list[Recipient] = Field(default_factory=list) receivedDateTime: str | None = None sentDateTime: str | None = None hasAttachments: bool | None = None importance: str | None = None isRead: bool | None = None + flag: Flag | None = None from_address: str | None = None + @model_validator(mode="after") + def populate_email_data(self) -> "ClassifyRequest": + subject = self.email_data.subject if self.email_data else self.subject + body = self.email_data.body if self.email_data else (self.body.content if self.body and self.body.content else None) + if not subject or not body: + raise ValueError("Request must include either email_data or Outlook subject/body.content fields") + self.email_data = EmailData(subject=subject, body=body) + if not self.from_address: + self.from_address = ( + (self.from_.emailAddress.address if self.from_ and self.from_.emailAddress else None) + or (self.sender.emailAddress.address if self.sender and self.sender.emailAddress else None) + ) + return self + + model_config = {"populate_by_name": True} + class ClassificationDetails(BaseModel): summary: str | None = None -- 2.49.1