from __future__ import annotations import json import sqlite3 from pathlib import Path from typing import Any class DedupeStore: def __init__(self, db_path: str = ".data/email_classifier.db"): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._init_db() def _connect(self) -> sqlite3.Connection: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row return conn def _init_db(self) -> None: with self._connect() as conn: conn.execute( """ CREATE TABLE IF NOT EXISTS classification_dedupe ( id INTEGER PRIMARY KEY AUTOINCREMENT, outlook_id TEXT, conversation_id TEXT, fingerprint TEXT NOT NULL, result_hash TEXT NOT NULL, request_payload TEXT NOT NULL, result_payload TEXT NOT NULL, seen_count INTEGER NOT NULL DEFAULT 1, created_at TEXT DEFAULT CURRENT_TIMESTAMP, updated_at TEXT DEFAULT CURRENT_TIMESTAMP ) """ ) conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_outlook_id ON classification_dedupe(outlook_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_conversation_id ON classification_dedupe(conversation_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)") def find_existing(self, *, outlook_id: str | None, conversation_id: str | None, fingerprint: str) -> tuple[dict[str, Any] | None, str]: with self._connect() as conn: if outlook_id: row = conn.execute( "SELECT * FROM classification_dedupe WHERE outlook_id = ? ORDER BY id DESC LIMIT 1", (outlook_id,), ).fetchone() if row: return self._decode(row), "id" if conversation_id: row = conn.execute( "SELECT * FROM classification_dedupe WHERE conversation_id = ? ORDER BY id DESC LIMIT 1", (conversation_id,), ).fetchone() if row: return self._decode(row), "conversation" row = conn.execute( "SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1", (fingerprint,), ).fetchone() if row: return self._decode(row), "fingerprint" return None, "none" def _decode(self, row: sqlite3.Row) -> dict[str, Any]: data = dict(row) data["request_payload"] = json.loads(data["request_payload"]) data["result_payload"] = json.loads(data["result_payload"]) return data def insert_or_update( self, *, existing_id: int | None, outlook_id: str | None, conversation_id: str | None, fingerprint: str, result_hash: str, request_payload: dict[str, Any], result_payload: dict[str, Any], seen_count: int, ) -> None: with self._connect() as conn: if existing_id is None: conn.execute( """ INSERT INTO classification_dedupe (outlook_id, conversation_id, fingerprint, result_hash, request_payload, result_payload, seen_count) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( outlook_id, conversation_id, fingerprint, result_hash, json.dumps(request_payload, sort_keys=True), json.dumps(result_payload, sort_keys=True), seen_count, ), ) else: conn.execute( """ UPDATE classification_dedupe SET outlook_id = ?, conversation_id = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?, seen_count = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, ( outlook_id, conversation_id, fingerprint, result_hash, json.dumps(request_payload, sort_keys=True), json.dumps(result_payload, sort_keys=True), seen_count, existing_id, ), )