Use Outlook ids for classifier dedupe precedence

This commit is contained in:
Steve W
2026-04-09 18:26:37 +00:00
parent 1b2c7db924
commit c6ee735949
4 changed files with 127 additions and 79 deletions

View File

@@ -23,7 +23,8 @@ class DedupeStore:
"""
CREATE TABLE IF NOT EXISTS classification_dedupe (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subject_key TEXT NOT NULL,
outlook_id TEXT,
conversation_id TEXT,
fingerprint TEXT NOT NULL,
result_hash TEXT NOT NULL,
request_payload TEXT NOT NULL,
@@ -34,32 +35,46 @@ class DedupeStore:
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_subject_key ON classification_dedupe(subject_key)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_outlook_id ON classification_dedupe(outlook_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_conversation_id ON classification_dedupe(conversation_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)")
def find_existing(self, *, subject_key: str, fingerprint: str) -> dict[str, Any] | None:
def find_existing(self, *, outlook_id: str | None, conversation_id: str | None, fingerprint: str) -> tuple[dict[str, Any] | None, str]:
with self._connect() as conn:
if outlook_id:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE outlook_id = ? ORDER BY id DESC LIMIT 1",
(outlook_id,),
).fetchone()
if row:
return self._decode(row), "id"
if conversation_id:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE conversation_id = ? ORDER BY id DESC LIMIT 1",
(conversation_id,),
).fetchone()
if row:
return self._decode(row), "conversation"
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1",
(fingerprint,),
).fetchone()
if row is None:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE subject_key = ? ORDER BY id DESC LIMIT 1",
(subject_key,),
).fetchone()
if not row:
return None
data = dict(row)
data["request_payload"] = json.loads(data["request_payload"])
data["result_payload"] = json.loads(data["result_payload"])
return data
if row:
return self._decode(row), "fingerprint"
return None, "none"
def _decode(self, row: sqlite3.Row) -> dict[str, Any]:
data = dict(row)
data["request_payload"] = json.loads(data["request_payload"])
data["result_payload"] = json.loads(data["result_payload"])
return data
def insert_or_update(
self,
*,
existing_id: int | None,
subject_key: str,
outlook_id: str | None,
conversation_id: str | None,
fingerprint: str,
result_hash: str,
request_payload: dict[str, Any],
@@ -70,11 +85,12 @@ class DedupeStore:
if existing_id is None:
conn.execute(
"""
INSERT INTO classification_dedupe (subject_key, fingerprint, result_hash, request_payload, result_payload, seen_count)
VALUES (?, ?, ?, ?, ?, ?)
INSERT INTO classification_dedupe (outlook_id, conversation_id, fingerprint, result_hash, request_payload, result_payload, seen_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
subject_key,
outlook_id,
conversation_id,
fingerprint,
result_hash,
json.dumps(request_payload, sort_keys=True),
@@ -86,12 +102,13 @@ class DedupeStore:
conn.execute(
"""
UPDATE classification_dedupe
SET subject_key = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?,
SET outlook_id = ?, conversation_id = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?,
seen_count = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
subject_key,
outlook_id,
conversation_id,
fingerprint,
result_hash,
json.dumps(request_payload, sort_keys=True),

View File

@@ -17,8 +17,17 @@ class ClassifyRequest(BaseModel):
base_url: str | None = None
api_key: str | None = Field(default=None, exclude=True)
temperature: float | None = None
id: str | None = None
internetMessageId: str | None = None
conversationId: str | None = None
bodyPreview: str | None = None
receivedDateTime: str | None = None
sentDateTime: str | None = None
hasAttachments: bool | None = None
importance: str | None = None
isRead: bool | None = None
from_address: str | None = None
received_at: str | None = None
class ClassificationDetails(BaseModel):
@@ -38,8 +47,9 @@ class ClassificationDetails(BaseModel):
class DedupeResult(BaseModel):
status: Literal["new", "duplicate", "updated"]
seen_count: int = 1
matched_on: Literal["none", "subject", "fingerprint"] = "none"
subject_key: str
matched_on: Literal["none", "id", "conversation", "fingerprint"] = "none"
message_id: str | None = None
conversation_id: str | None = None
fingerprint: str

View File

@@ -16,16 +16,12 @@ def normalize_subject(subject: str) -> str:
return value
def build_subject_key(request: ClassifyRequest) -> str:
subject = normalize_subject(request.email_data.subject)
sender = (request.from_address or "").strip().lower()
return hashlib.sha256(f"{sender}\n{subject}".encode()).hexdigest()
def build_fingerprint(request: ClassifyRequest) -> str:
subject = normalize_subject(request.email_data.subject)
body = " ".join(request.email_data.body.split()).strip().lower()
seed = f"{request.from_address or ''}\n{subject}\n{body[:2000]}"
preview = " ".join((request.bodyPreview or "").split()).strip().lower()
sender = (request.from_address or "").strip().lower()
seed = f"{sender}\n{subject}\n{preview}\n{body[:2000]}"
return hashlib.sha256(seed.encode()).hexdigest()
@@ -36,30 +32,41 @@ def build_result_hash(result: ClassificationResult) -> str:
def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult:
store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db"))
subject_key = build_subject_key(request)
fingerprint = build_fingerprint(request)
existing, matched_on = store.find_existing(
outlook_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
)
result_hash = build_result_hash(result)
existing = store.find_existing(subject_key=subject_key, fingerprint=fingerprint)
if not existing:
store.insert_or_update(
existing_id=None,
subject_key=subject_key,
outlook_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
result_hash=result_hash,
request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True),
result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True),
seen_count=1,
)
return DedupeResult(status="new", seen_count=1, matched_on="none", subject_key=subject_key, fingerprint=fingerprint)
return DedupeResult(
status="new",
seen_count=1,
matched_on="none",
message_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
)
matched_on = "fingerprint" if existing.get("fingerprint") == fingerprint else "subject"
previous_hash = existing.get("result_hash")
seen_count = int(existing.get("seen_count", 1)) + 1
status = "duplicate" if previous_hash == result_hash else "updated"
store.insert_or_update(
existing_id=existing["id"],
subject_key=subject_key,
outlook_id=request.id or existing.get("outlook_id"),
conversation_id=request.conversationId or existing.get("conversation_id"),
fingerprint=fingerprint,
result_hash=result_hash,
request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True),
@@ -70,6 +77,7 @@ def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> Dedu
status=status,
seen_count=seen_count,
matched_on=matched_on,
subject_key=subject_key,
message_id=request.id or existing.get("outlook_id"),
conversation_id=request.conversationId or existing.get("conversation_id"),
fingerprint=fingerprint,
)