Files
email-classifier/app/dedupe_store.py

103 lines
3.9 KiB
Python

from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any
class DedupeStore:
def __init__(self, db_path: str = ".data/email_classifier.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS classification_dedupe (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_key TEXT NOT NULL,
fingerprint TEXT NOT NULL,
result_hash TEXT NOT NULL,
request_payload TEXT NOT NULL,
result_payload TEXT NOT NULL,
seen_count INTEGER NOT NULL DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_subject_key ON classification_dedupe(subject_key)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)")
def find_existing(self, *, subject_key: str, fingerprint: str) -> dict[str, Any] | None:
with self._connect() as conn:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1",
(fingerprint,),
).fetchone()
if row is None:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE subject_key = ? ORDER BY id DESC LIMIT 1",
(subject_key,),
).fetchone()
if not row:
return None
data = dict(row)
data["request_payload"] = json.loads(data["request_payload"])
data["result_payload"] = json.loads(data["result_payload"])
return data
def insert_or_update(
self,
*,
existing_id: int | None,
subject_key: str,
fingerprint: str,
result_hash: str,
request_payload: dict[str, Any],
result_payload: dict[str, Any],
seen_count: int,
) -> None:
with self._connect() as conn:
if existing_id is None:
conn.execute(
"""
INSERT INTO classification_dedupe (subject_key, fingerprint, result_hash, request_payload, result_payload, seen_count)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
subject_key,
fingerprint,
result_hash,
json.dumps(request_payload, sort_keys=True),
json.dumps(result_payload, sort_keys=True),
seen_count,
),
)
else:
conn.execute(
"""
UPDATE classification_dedupe
SET subject_key = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?,
seen_count = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
subject_key,
fingerprint,
result_hash,
json.dumps(request_payload, sort_keys=True),
json.dumps(result_payload, sort_keys=True),
seen_count,
existing_id,
),
)