From c6ee7359497b670c20ce72bb192e63acbdba533e Mon Sep 17 00:00:00 2001 From: Steve W Date: Thu, 9 Apr 2026 18:26:37 +0000 Subject: [PATCH] Use Outlook ids for classifier dedupe precedence --- README.md | 97 +++++++++++++++++++++++++-------------------- app/dedupe_store.py | 57 ++++++++++++++++---------- app/models.py | 16 ++++++-- app/sync.py | 36 ++++++++++------- 4 files changed, 127 insertions(+), 79 deletions(-) diff --git a/README.md b/README.md index 353c9da..c483937 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # email-classifier -FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using fingerprint-based dedupe. +FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using Outlook-aware dedupe. ## Environment configuration @@ -31,88 +31,101 @@ Optional local dedupe store path: export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db ``` -## API +## Input shape -### POST /classify - -This overhaul is intended to return richer extraction. Top-level compatibility is not required. - -Request example: +Designed around real Outlook message payloads. Relevant fields: ```json { + "id": "AAMk...", + "internetMessageId": "<...@...>", + "conversationId": "AAQk...", + "subject": "MB Printer", + "bodyPreview": "Good morning, ...", + "receivedDateTime": "2026-02-19T15:27:35Z", + "sentDateTime": "2026-02-19T15:27:32Z", + "hasAttachments": false, + "importance": "normal", + "isRead": false, + "body": { + "contentType": "html", + "content": "..." + } +} +``` + +API request example: + +```json +{ + "id": "AAMk...", + "internetMessageId": "<...@...>", + "conversationId": "AAQk...", + "bodyPreview": "Good morning, ...", + "receivedDateTime": "2026-02-19T15:27:35Z", + "sentDateTime": "2026-02-19T15:27:32Z", + "hasAttachments": false, + "importance": "normal", + "isRead": false, "email_data": { - "subject": "Can you review this by Friday?", - "body": "Hi Daniel, please review the attached budget proposal." + "subject": "MB Printer", + "body": "..." }, - "from_address": "sender@example.com", - "received_at": "2026-04-09T12:55:00Z", "provider": "anthropic", "base_url": "https://api.minimax.io/anthropic", "model": "MiniMax-M2.7" } ``` -Response example: +## Response example ```json { "needs_action": true, "category": "question", "priority": "high", - "task_description": "Review the budget proposal and respond by Friday", - "reasoning": "Direct request with a deadline requires follow-up", + "task_description": "Investigate MB Printer issue and reply", + "reasoning": "The email appears to describe an issue requiring action.", "confidence": 0.91, "details": { - "summary": "Budget proposal review requested with Friday deadline.", - "suggested_title": "Review budget proposal and respond by Friday", - "suggested_notes": "Requester asked for feedback on attached budget proposal before Friday.", - "deadline": "Friday", - "people": ["Daniel"], + "summary": "Printer issue reported in the MB area.", + "suggested_title": "Handle MB Printer issue", + "suggested_notes": "Review the printer problem, identify urgency, and reply with next steps.", + "deadline": null, + "people": [], "organizations": [], - "attachments_referenced": ["budget proposal"], - "next_steps": ["Review attachment", "Reply with feedback"], - "key_points": ["Deadline is Friday"], - "source_signals": ["request", "deadline"], + "attachments_referenced": [], + "next_steps": ["Review issue", "Respond to sender"], + "key_points": ["Printer issue reported"], + "source_signals": ["request"], "dedupe_key": "..." }, "dedupe": { "status": "new", "seen_count": 1, "matched_on": "none", - "subject_key": "...", + "message_id": "AAMk...", + "conversation_id": "AAQk...", "fingerprint": "..." } } ``` -## Dedupe behavior +## Dedupe precedence -The API does not create or update Todoist tasks. -It only returns richer extraction and local dedupe metadata for downstream automation like n8n. - -Matching strategy: -- normalized subject plus sender-derived `subject_key` -- full content fingerprint fallback based on sender + normalized subject + cleaned body +1. `id` for exact Outlook message match +2. `conversationId` for thread grouping +3. normalized subject + preview/body fingerprint fallback Statuses: - `new`: no prior similar email seen - `duplicate`: same dedupe target and same extracted result as before - `updated`: matched prior email, but extracted result changed -This is intentionally heuristic, not perfect. - -## Architecture - -- `app/classifier.py`: classification orchestration and dedupe handoff -- `app/prompts.py`: richer extraction prompt -- `app/sync.py`: subject normalization, fingerprinting, dedupe application -- `app/dedupe_store.py`: SQLite-backed dedupe store -- `app/llm_adapters.py`: provider adapters -- `app/config.py`: LLM settings +This is intentionally heuristic for the fallback path. ## Notes - No Todoist integration lives in this API. -- Dedupe is best-effort and designed to help downstream workflows avoid obvious duplicates. +- Dedupe is local and intended to help downstream workflows avoid obvious duplicates. - SQLite is used for lightweight local dedupe tracking. diff --git a/app/dedupe_store.py b/app/dedupe_store.py index a94f121..c96aeca 100644 --- a/app/dedupe_store.py +++ b/app/dedupe_store.py @@ -23,7 +23,8 @@ class DedupeStore: """ CREATE TABLE IF NOT EXISTS classification_dedupe ( id INTEGER PRIMARY KEY AUTOINCREMENT, - subject_key TEXT NOT NULL, + outlook_id TEXT, + conversation_id TEXT, fingerprint TEXT NOT NULL, result_hash TEXT NOT NULL, request_payload TEXT NOT NULL, @@ -34,32 +35,46 @@ class DedupeStore: ) """ ) - conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_subject_key ON classification_dedupe(subject_key)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_outlook_id ON classification_dedupe(outlook_id)") + conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_conversation_id ON classification_dedupe(conversation_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)") - def find_existing(self, *, subject_key: str, fingerprint: str) -> dict[str, Any] | None: + def find_existing(self, *, outlook_id: str | None, conversation_id: str | None, fingerprint: str) -> tuple[dict[str, Any] | None, str]: with self._connect() as conn: + if outlook_id: + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE outlook_id = ? ORDER BY id DESC LIMIT 1", + (outlook_id,), + ).fetchone() + if row: + return self._decode(row), "id" + if conversation_id: + row = conn.execute( + "SELECT * FROM classification_dedupe WHERE conversation_id = ? ORDER BY id DESC LIMIT 1", + (conversation_id,), + ).fetchone() + if row: + return self._decode(row), "conversation" row = conn.execute( "SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", (fingerprint,), ).fetchone() - if row is None: - row = conn.execute( - "SELECT * FROM classification_dedupe WHERE subject_key = ? ORDER BY id DESC LIMIT 1", - (subject_key,), - ).fetchone() - if not row: - return None - data = dict(row) - data["request_payload"] = json.loads(data["request_payload"]) - data["result_payload"] = json.loads(data["result_payload"]) - return data + if row: + return self._decode(row), "fingerprint" + return None, "none" + + def _decode(self, row: sqlite3.Row) -> dict[str, Any]: + data = dict(row) + data["request_payload"] = json.loads(data["request_payload"]) + data["result_payload"] = json.loads(data["result_payload"]) + return data def insert_or_update( self, *, existing_id: int | None, - subject_key: str, + outlook_id: str | None, + conversation_id: str | None, fingerprint: str, result_hash: str, request_payload: dict[str, Any], @@ -70,11 +85,12 @@ class DedupeStore: if existing_id is None: conn.execute( """ - INSERT INTO classification_dedupe (subject_key, fingerprint, result_hash, request_payload, result_payload, seen_count) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO classification_dedupe (outlook_id, conversation_id, fingerprint, result_hash, request_payload, result_payload, seen_count) + VALUES (?, ?, ?, ?, ?, ?, ?) """, ( - subject_key, + outlook_id, + conversation_id, fingerprint, result_hash, json.dumps(request_payload, sort_keys=True), @@ -86,12 +102,13 @@ class DedupeStore: conn.execute( """ UPDATE classification_dedupe - SET subject_key = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, + SET outlook_id = ?, conversation_id = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, seen_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( - subject_key, + outlook_id, + conversation_id, fingerprint, result_hash, json.dumps(request_payload, sort_keys=True), diff --git a/app/models.py b/app/models.py index 9badd5f..6c5c0b0 100644 --- a/app/models.py +++ b/app/models.py @@ -17,8 +17,17 @@ class ClassifyRequest(BaseModel): base_url: str | None = None api_key: str | None = Field(default=None, exclude=True) temperature: float | None = None + + id: str | None = None + internetMessageId: str | None = None + conversationId: str | None = None + bodyPreview: str | None = None + receivedDateTime: str | None = None + sentDateTime: str | None = None + hasAttachments: bool | None = None + importance: str | None = None + isRead: bool | None = None from_address: str | None = None - received_at: str | None = None class ClassificationDetails(BaseModel): @@ -38,8 +47,9 @@ class ClassificationDetails(BaseModel): class DedupeResult(BaseModel): status: Literal["new", "duplicate", "updated"] seen_count: int = 1 - matched_on: Literal["none", "subject", "fingerprint"] = "none" - subject_key: str + matched_on: Literal["none", "id", "conversation", "fingerprint"] = "none" + message_id: str | None = None + conversation_id: str | None = None fingerprint: str diff --git a/app/sync.py b/app/sync.py index 2f5e29f..86f6f00 100644 --- a/app/sync.py +++ b/app/sync.py @@ -16,16 +16,12 @@ def normalize_subject(subject: str) -> str: return value -def build_subject_key(request: ClassifyRequest) -> str: - subject = normalize_subject(request.email_data.subject) - sender = (request.from_address or "").strip().lower() - return hashlib.sha256(f"{sender}\n{subject}".encode()).hexdigest() - - def build_fingerprint(request: ClassifyRequest) -> str: subject = normalize_subject(request.email_data.subject) body = " ".join(request.email_data.body.split()).strip().lower() - seed = f"{request.from_address or ''}\n{subject}\n{body[:2000]}" + preview = " ".join((request.bodyPreview or "").split()).strip().lower() + sender = (request.from_address or "").strip().lower() + seed = f"{sender}\n{subject}\n{preview}\n{body[:2000]}" return hashlib.sha256(seed.encode()).hexdigest() @@ -36,30 +32,41 @@ def build_result_hash(result: ClassificationResult) -> str: def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult: store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db")) - subject_key = build_subject_key(request) fingerprint = build_fingerprint(request) + existing, matched_on = store.find_existing( + outlook_id=request.id, + conversation_id=request.conversationId, + fingerprint=fingerprint, + ) result_hash = build_result_hash(result) - existing = store.find_existing(subject_key=subject_key, fingerprint=fingerprint) if not existing: store.insert_or_update( existing_id=None, - subject_key=subject_key, + outlook_id=request.id, + conversation_id=request.conversationId, fingerprint=fingerprint, result_hash=result_hash, request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True), seen_count=1, ) - return DedupeResult(status="new", seen_count=1, matched_on="none", subject_key=subject_key, fingerprint=fingerprint) + return DedupeResult( + status="new", + seen_count=1, + matched_on="none", + message_id=request.id, + conversation_id=request.conversationId, + fingerprint=fingerprint, + ) - matched_on = "fingerprint" if existing.get("fingerprint") == fingerprint else "subject" previous_hash = existing.get("result_hash") seen_count = int(existing.get("seen_count", 1)) + 1 status = "duplicate" if previous_hash == result_hash else "updated" store.insert_or_update( existing_id=existing["id"], - subject_key=subject_key, + outlook_id=request.id or existing.get("outlook_id"), + conversation_id=request.conversationId or existing.get("conversation_id"), fingerprint=fingerprint, result_hash=result_hash, request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True), @@ -70,6 +77,7 @@ def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> Dedu status=status, seen_count=seen_count, matched_on=matched_on, - subject_key=subject_key, + message_id=request.id or existing.get("outlook_id"), + conversation_id=request.conversationId or existing.get("conversation_id"), fingerprint=fingerprint, )