120 lines
4.7 KiB
Python
120 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class DedupeStore:
|
|
def __init__(self, db_path: str = ".data/email_classifier.db"):
|
|
self.db_path = Path(db_path)
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
self._init_db()
|
|
|
|
def _connect(self) -> sqlite3.Connection:
|
|
conn = sqlite3.connect(self.db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
def _init_db(self) -> None:
|
|
with self._connect() as conn:
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS classification_dedupe (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
outlook_id TEXT,
|
|
conversation_id TEXT,
|
|
fingerprint TEXT NOT NULL,
|
|
result_hash TEXT NOT NULL,
|
|
request_payload TEXT NOT NULL,
|
|
result_payload TEXT NOT NULL,
|
|
seen_count INTEGER NOT NULL DEFAULT 1,
|
|
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
"""
|
|
)
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_outlook_id ON classification_dedupe(outlook_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_conversation_id ON classification_dedupe(conversation_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)")
|
|
|
|
def find_existing(self, *, outlook_id: str | None, conversation_id: str | None, fingerprint: str) -> tuple[dict[str, Any] | None, str]:
|
|
with self._connect() as conn:
|
|
if outlook_id:
|
|
row = conn.execute(
|
|
"SELECT * FROM classification_dedupe WHERE outlook_id = ? ORDER BY id DESC LIMIT 1",
|
|
(outlook_id,),
|
|
).fetchone()
|
|
if row:
|
|
return self._decode(row), "id"
|
|
if conversation_id:
|
|
row = conn.execute(
|
|
"SELECT * FROM classification_dedupe WHERE conversation_id = ? ORDER BY id DESC LIMIT 1",
|
|
(conversation_id,),
|
|
).fetchone()
|
|
if row:
|
|
return self._decode(row), "conversation"
|
|
row = conn.execute(
|
|
"SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1",
|
|
(fingerprint,),
|
|
).fetchone()
|
|
if row:
|
|
return self._decode(row), "fingerprint"
|
|
return None, "none"
|
|
|
|
def _decode(self, row: sqlite3.Row) -> dict[str, Any]:
|
|
data = dict(row)
|
|
data["request_payload"] = json.loads(data["request_payload"])
|
|
data["result_payload"] = json.loads(data["result_payload"])
|
|
return data
|
|
|
|
def insert_or_update(
|
|
self,
|
|
*,
|
|
existing_id: int | None,
|
|
outlook_id: str | None,
|
|
conversation_id: str | None,
|
|
fingerprint: str,
|
|
result_hash: str,
|
|
request_payload: dict[str, Any],
|
|
result_payload: dict[str, Any],
|
|
seen_count: int,
|
|
) -> None:
|
|
with self._connect() as conn:
|
|
if existing_id is None:
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO classification_dedupe (outlook_id, conversation_id, fingerprint, result_hash, request_payload, result_payload, seen_count)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
outlook_id,
|
|
conversation_id,
|
|
fingerprint,
|
|
result_hash,
|
|
json.dumps(request_payload, sort_keys=True),
|
|
json.dumps(result_payload, sort_keys=True),
|
|
seen_count,
|
|
),
|
|
)
|
|
else:
|
|
conn.execute(
|
|
"""
|
|
UPDATE classification_dedupe
|
|
SET outlook_id = ?, conversation_id = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?,
|
|
seen_count = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
""",
|
|
(
|
|
outlook_id,
|
|
conversation_id,
|
|
fingerprint,
|
|
result_hash,
|
|
json.dumps(request_payload, sort_keys=True),
|
|
json.dumps(result_payload, sort_keys=True),
|
|
seen_count,
|
|
existing_id,
|
|
),
|
|
)
|