Files
email-classifier/app/sync.py

84 lines
3.1 KiB
Python

from __future__ import annotations
import hashlib
import json
import os
import re
from app.dedupe_store import DedupeStore
from app.models import ClassificationResult, ClassifyRequest, DedupeResult
def normalize_subject(subject: str) -> str:
value = subject.strip().lower()
value = re.sub(r"^(re|fw|fwd)\s*:\s*", "", value)
value = re.sub(r"\s+", " ", value)
return value
def build_fingerprint(request: ClassifyRequest) -> str:
subject = normalize_subject(request.email_data.subject)
body = " ".join(request.email_data.body.split()).strip().lower()
preview = " ".join((request.bodyPreview or "").split()).strip().lower()
sender = (request.from_address or "").strip().lower()
seed = f"{sender}\n{subject}\n{preview}\n{body[:2000]}"
return hashlib.sha256(seed.encode()).hexdigest()
def build_result_hash(result: ClassificationResult) -> str:
payload = result.model_dump(exclude={"dedupe"}, exclude_none=True)
return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult:
store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db"))
fingerprint = build_fingerprint(request)
existing, matched_on = store.find_existing(
outlook_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
)
result_hash = build_result_hash(result)
if not existing:
store.insert_or_update(
existing_id=None,
outlook_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
result_hash=result_hash,
request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True),
result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True),
seen_count=1,
)
return DedupeResult(
status="new",
seen_count=1,
matched_on="none",
message_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
)
previous_hash = existing.get("result_hash")
seen_count = int(existing.get("seen_count", 1)) + 1
status = "duplicate" if previous_hash == result_hash else "updated"
store.insert_or_update(
existing_id=existing["id"],
outlook_id=request.id or existing.get("outlook_id"),
conversation_id=request.conversationId or existing.get("conversation_id"),
fingerprint=fingerprint,
result_hash=result_hash,
request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True),
result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True),
seen_count=seen_count,
)
return DedupeResult(
status=status,
seen_count=seen_count,
matched_on=matched_on,
message_id=request.id or existing.get("outlook_id"),
conversation_id=request.conversationId or existing.get("conversation_id"),
fingerprint=fingerprint,
)