5 Commits

Author SHA1 Message Date
17191fc489 Merge pull request 'feat: Add enriched classification output and Todoist dedupe sync' (#4) from feature/todoist-dedupe-enrichment into main
All checks were successful
Build and Publish Docker Image / build-and-push (push) Successful in 2m40s
Reviewed-on: #4
2026-04-09 19:01:52 +00:00
Steve W
8c49ce21e0 Accept native Outlook message shape in classifier request 2026-04-09 18:32:32 +00:00
Steve W
c6ee735949 Use Outlook ids for classifier dedupe precedence 2026-04-09 18:26:37 +00:00
Steve W
1b2c7db924 Refocus classifier on rich extraction and local dedupe only 2026-04-09 18:18:35 +00:00
Steve W
a1dcaf9a74 Add enriched classification output and Todoist dedupe sync 2026-04-09 18:14:11 +00:00
6 changed files with 489 additions and 112 deletions

146
README.md
View File

@@ -1,21 +1,10 @@
# email-classifier # email-classifier
FastAPI service that classifies email using a configurable LLM backend. FastAPI service that classifies email using a configurable LLM backend, returns richer structured extraction, and tracks duplicate classifications using Outlook-aware dedupe.
## What changed
The classifier no longer hardcodes a single Ollama + OpenAI-compatible endpoint.
It now supports:
- OpenAI-compatible APIs
- Anthropic-compatible APIs
- per-request overrides for provider, model, endpoint, and temperature
- global defaults through environment variables
This makes it suitable for local Ollama, hosted OpenAI-compatible services, and MiniMax's recommended Anthropic-compatible API.
## Environment configuration ## Environment configuration
Defaults are loaded from environment variables: LLM defaults:
```bash ```bash
export LLM_PROVIDER=openai export LLM_PROVIDER=openai
@@ -27,9 +16,7 @@ export LLM_TIMEOUT_SECONDS=60
export LLM_MAX_RETRIES=3 export LLM_MAX_RETRIES=3
``` ```
### MiniMax example MiniMax via Anthropic-compatible API:
MiniMax recommends Anthropic-compatible integration.
```bash ```bash
export LLM_PROVIDER=anthropic export LLM_PROVIDER=anthropic
@@ -38,51 +25,128 @@ export LLM_API_KEY=your_minimax_key
export LLM_MODEL=MiniMax-M2.7 export LLM_MODEL=MiniMax-M2.7
``` ```
## API Optional local dedupe store path:
### POST /classify ```bash
export EMAIL_CLASSIFIER_DB_PATH=.data/email_classifier.db
```
Request body: ## Input shape
The request model accepts either:
- simplified input via `email_data`
- or native Outlook-style fields directly
Full Outlook-shaped example:
```json
{
"id": "AAMk...",
"internetMessageId": "<...@...>",
"conversationId": "AAQk...",
"subject": "MB Printer",
"bodyPreview": "Good morning, ...",
"body": {
"contentType": "html",
"content": "<html>...(full HTML body)</html>"
},
"sender": {
"emailAddress": {
"name": "Bobbi Johnson",
"address": "bobbi.johnson@grandportage.com"
}
},
"from": {
"emailAddress": {
"name": "Bobbi Johnson",
"address": "bobbi.johnson@grandportage.com"
}
},
"toRecipients": [
{
"emailAddress": {
"name": "IT Helpdesk Mail",
"address": "helpdeskmail@grandportage.com"
}
}
],
"ccRecipients": [],
"bccRecipients": [],
"replyTo": [],
"receivedDateTime": "2026-02-19T15:27:35Z",
"sentDateTime": "2026-02-19T15:27:32Z",
"hasAttachments": false,
"importance": "normal",
"isRead": false,
"flag": { "flagStatus": "notFlagged" },
"provider": "anthropic",
"base_url": "https://api.minimax.io/anthropic",
"model": "MiniMax-M2.7"
}
```
Simplified request example:
```json ```json
{ {
"email_data": { "email_data": {
"subject": "Can you review this by Friday?", "subject": "MB Printer",
"body": "Hi Daniel, please review the attached budget proposal." "body": "<html>...</html>"
}, },
"provider": "anthropic", "id": "AAMk...",
"base_url": "https://api.minimax.io/anthropic", "conversationId": "AAQk..."
"model": "MiniMax-M2.7",
"temperature": 0.1
} }
``` ```
All override fields are optional. If omitted, the service uses the global env config. ## Response example
Response shape:
```json ```json
{ {
"needs_action": true, "needs_action": true,
"category": "question", "category": "question",
"priority": "high", "priority": "high",
"task_description": "Review the budget proposal and respond by Friday", "task_description": "Investigate MB Printer issue and reply",
"reasoning": "Direct request with a deadline requires follow-up", "reasoning": "The email appears to describe an issue requiring action.",
"confidence": 0.91 "confidence": 0.91,
"details": {
"summary": "Printer issue reported in the MB area.",
"suggested_title": "Handle MB Printer issue",
"suggested_notes": "Review the printer problem, identify urgency, and reply with next steps.",
"deadline": null,
"people": [],
"organizations": [],
"attachments_referenced": [],
"next_steps": ["Review issue", "Respond to sender"],
"key_points": ["Printer issue reported"],
"source_signals": ["request"],
"dedupe_key": "..."
},
"dedupe": {
"status": "new",
"seen_count": 1,
"matched_on": "none",
"message_id": "AAMk...",
"conversation_id": "AAQk...",
"fingerprint": "..."
}
} }
``` ```
## Architecture ## Dedupe precedence
- `app/config.py`: global and per-request LLM settings 1. `id` for exact Outlook message match
- `app/llm_adapters.py`: provider adapters 2. `conversationId` for thread grouping
- `app/classifier.py`: classification orchestration, retries, normalization 3. normalized subject + preview/body fingerprint fallback
- `app/prompts.py`: system prompt
- `app/routers/classify_email.py`: thin API route Statuses:
- `new`: no prior similar email seen
- `duplicate`: same dedupe target and same extracted result as before
- `updated`: matched prior email, but extracted result changed
This is intentionally heuristic for the fallback path.
## Notes ## Notes
- OpenAI-compatible providers use the OpenAI SDK. - No Todoist integration lives in this API.
- Anthropic-compatible providers use the Anthropic SDK. - Dedupe is local and intended to help downstream workflows avoid obvious duplicates.
- Per-request `api_key` is supported, but excluded from response serialization. - SQLite is used for lightweight local dedupe tracking.
- The service normalizes malformed model output and falls back safely after retry exhaustion.

View File

@@ -5,7 +5,8 @@ from typing import Any
from app.config import get_request_settings from app.config import get_request_settings
from app.llm_adapters import build_adapter, coerce_json_text from app.llm_adapters import build_adapter, coerce_json_text
from app.models import ClassificationResult, ClassifyRequest, EmailData from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData
from app.sync import apply_dedupe, build_fingerprint
VALID_CATEGORIES = { VALID_CATEGORIES = {
"action_required", "action_required",
@@ -21,7 +22,7 @@ VALID_PRIORITIES = {"high", "medium", "low"}
async def classify_email(request: ClassifyRequest) -> ClassificationResult: async def classify_email(request: ClassifyRequest) -> ClassificationResult:
clean_email = _clean_email(request.email_data) clean_email = _clean_email(request)
settings = get_request_settings( settings = get_request_settings(
provider=request.provider, provider=request.provider,
model=request.model, model=request.model,
@@ -32,40 +33,50 @@ async def classify_email(request: ClassifyRequest) -> ClassificationResult:
adapter = build_adapter(settings) adapter = build_adapter(settings)
attempts = 0 attempts = 0
result: ClassificationResult | None = None
while attempts < settings.max_retries: while attempts < settings.max_retries:
raw_response = await adapter.classify(clean_email) raw_response = await adapter.classify(clean_email.email_data)
try: try:
payload = json.loads(coerce_json_text(raw_response)) payload = json.loads(coerce_json_text(raw_response))
result = _normalize_result(payload) result = _normalize_result(payload, clean_email)
if result.needs_action and not result.task_description: if result.needs_action and not result.task_description:
attempts += 1 attempts += 1
continue continue
return result break
except (json.JSONDecodeError, ValueError, TypeError): except (json.JSONDecodeError, ValueError, TypeError):
attempts += 1 attempts += 1
return ClassificationResult( if result is None:
needs_action=False, result = ClassificationResult(
category="uncategorized", needs_action=False,
priority="low", category="uncategorized",
task_description=None, priority="low",
reasoning="System failed to classify after multiple attempts.", task_description=None,
confidence=0.0, reasoning="System failed to classify after multiple attempts.",
) confidence=0.0,
details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)),
)
result.dedupe = apply_dedupe(clean_email, result)
return result
def _clean_email(email: EmailData) -> EmailData: def _clean_email(request: ClassifyRequest) -> ClassifyRequest:
from app.helpers.clean_email_html import clean_email_html from app.helpers.clean_email_html import clean_email_html
from app.helpers.extract_latest_message import extract_latest_message from app.helpers.extract_latest_message import extract_latest_message
from app.helpers.remove_disclaimer import remove_disclaimer from app.helpers.remove_disclaimer import remove_disclaimer
return EmailData( return request.model_copy(
subject=email.subject, update={
body=remove_disclaimer(clean_email_html(extract_latest_message(email.body))), "email_data": EmailData(
subject=request.email_data.subject,
body=remove_disclaimer(clean_email_html(extract_latest_message(request.email_data.body))),
)
}
) )
def _normalize_result(data: dict[str, Any]) -> ClassificationResult: def _normalize_result(data: dict[str, Any], request: ClassifyRequest) -> ClassificationResult:
needs_action = bool(data.get("needs_action", False)) needs_action = bool(data.get("needs_action", False))
category = str(data.get("category", "uncategorized") or "uncategorized").lower() category = str(data.get("category", "uncategorized") or "uncategorized").lower()
if category not in VALID_CATEGORIES: if category not in VALID_CATEGORIES:
@@ -81,6 +92,24 @@ def _normalize_result(data: dict[str, Any]) -> ClassificationResult:
reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided." reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided."
confidence_raw = data.get("confidence", 0.0) confidence_raw = data.get("confidence", 0.0)
confidence = max(0.0, min(1.0, float(confidence_raw))) confidence = max(0.0, min(1.0, float(confidence_raw)))
details_payload = data.get("details") or {}
details = ClassificationDetails(
summary=_clean_text(details_payload.get("summary")),
suggested_title=_clean_text(details_payload.get("suggested_title")),
suggested_notes=_clean_text(details_payload.get("suggested_notes")),
deadline=_clean_text(details_payload.get("deadline")),
people=_string_list(details_payload.get("people")),
organizations=_string_list(details_payload.get("organizations")),
attachments_referenced=_string_list(details_payload.get("attachments_referenced")),
next_steps=_string_list(details_payload.get("next_steps")),
key_points=_string_list(details_payload.get("key_points")),
source_signals=_string_list(details_payload.get("source_signals")),
dedupe_key=build_fingerprint(request),
)
if needs_action and not details.suggested_title:
details.suggested_title = task_description
if not details.summary:
details.summary = reasoning
return ClassificationResult( return ClassificationResult(
needs_action=needs_action, needs_action=needs_action,
category=category, category=category,
@@ -88,4 +117,27 @@ def _normalize_result(data: dict[str, Any]) -> ClassificationResult:
task_description=task_description, task_description=task_description,
reasoning=reasoning, reasoning=reasoning,
confidence=confidence, confidence=confidence,
details=details,
) )
def _clean_text(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
def _string_list(value: Any) -> list[str]:
if not value:
return []
if isinstance(value, list):
items = value
else:
items = [value]
output = []
for item in items:
text = str(item).strip()
if text and text not in output:
output.append(text)
return output

119
app/dedupe_store.py Normal file
View File

@@ -0,0 +1,119 @@
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any
class DedupeStore:
def __init__(self, db_path: str = ".data/email_classifier.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init_db(self) -> None:
with self._connect() as conn:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS classification_dedupe (
id INTEGER PRIMARY KEY AUTOINCREMENT,
outlook_id TEXT,
conversation_id TEXT,
fingerprint TEXT NOT NULL,
result_hash TEXT NOT NULL,
request_payload TEXT NOT NULL,
result_payload TEXT NOT NULL,
seen_count INTEGER NOT NULL DEFAULT 1,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_outlook_id ON classification_dedupe(outlook_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_conversation_id ON classification_dedupe(conversation_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_dedupe_fingerprint ON classification_dedupe(fingerprint)")
def find_existing(self, *, outlook_id: str | None, conversation_id: str | None, fingerprint: str) -> tuple[dict[str, Any] | None, str]:
with self._connect() as conn:
if outlook_id:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE outlook_id = ? ORDER BY id DESC LIMIT 1",
(outlook_id,),
).fetchone()
if row:
return self._decode(row), "id"
if conversation_id:
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE conversation_id = ? ORDER BY id DESC LIMIT 1",
(conversation_id,),
).fetchone()
if row:
return self._decode(row), "conversation"
row = conn.execute(
"SELECT * FROM classification_dedupe WHERE fingerprint = ? ORDER BY id DESC LIMIT 1",
(fingerprint,),
).fetchone()
if row:
return self._decode(row), "fingerprint"
return None, "none"
def _decode(self, row: sqlite3.Row) -> dict[str, Any]:
data = dict(row)
data["request_payload"] = json.loads(data["request_payload"])
data["result_payload"] = json.loads(data["result_payload"])
return data
def insert_or_update(
self,
*,
existing_id: int | None,
outlook_id: str | None,
conversation_id: str | None,
fingerprint: str,
result_hash: str,
request_payload: dict[str, Any],
result_payload: dict[str, Any],
seen_count: int,
) -> None:
with self._connect() as conn:
if existing_id is None:
conn.execute(
"""
INSERT INTO classification_dedupe (outlook_id, conversation_id, fingerprint, result_hash, request_payload, result_payload, seen_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
outlook_id,
conversation_id,
fingerprint,
result_hash,
json.dumps(request_payload, sort_keys=True),
json.dumps(result_payload, sort_keys=True),
seen_count,
),
)
else:
conn.execute(
"""
UPDATE classification_dedupe
SET outlook_id = ?, conversation_id = ?, fingerprint = ?, result_hash = ?, request_payload = ?, result_payload = ?,
seen_count = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(
outlook_id,
conversation_id,
fingerprint,
result_hash,
json.dumps(request_payload, sort_keys=True),
json.dumps(result_payload, sort_keys=True),
seen_count,
existing_id,
),
)

View File

@@ -2,7 +2,7 @@ from __future__ import annotations
from typing import Literal from typing import Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field, model_validator
class EmailData(BaseModel): class EmailData(BaseModel):
@@ -10,14 +10,91 @@ class EmailData(BaseModel):
body: str body: str
class EmailAddress(BaseModel):
name: str | None = None
address: str | None = None
class Recipient(BaseModel):
emailAddress: EmailAddress | None = None
class EmailBody(BaseModel):
contentType: str | None = None
content: str | None = None
class Flag(BaseModel):
flagStatus: str | None = None
class ClassifyRequest(BaseModel): class ClassifyRequest(BaseModel):
email_data: EmailData email_data: EmailData | None = None
provider: Literal["openai", "anthropic"] | None = None provider: Literal["openai", "anthropic"] | None = None
model: str | None = None model: str | None = None
base_url: str | None = None base_url: str | None = None
api_key: str | None = Field(default=None, exclude=True) api_key: str | None = Field(default=None, exclude=True)
temperature: float | None = None temperature: float | None = None
id: str | None = None
internetMessageId: str | None = None
conversationId: str | None = None
subject: str | None = None
bodyPreview: str | None = None
body: EmailBody | None = None
sender: Recipient | None = None
from_: Recipient | None = Field(default=None, alias="from")
toRecipients: list[Recipient] = Field(default_factory=list)
ccRecipients: list[Recipient] = Field(default_factory=list)
bccRecipients: list[Recipient] = Field(default_factory=list)
replyTo: list[Recipient] = Field(default_factory=list)
receivedDateTime: str | None = None
sentDateTime: str | None = None
hasAttachments: bool | None = None
importance: str | None = None
isRead: bool | None = None
flag: Flag | None = None
from_address: str | None = None
@model_validator(mode="after")
def populate_email_data(self) -> "ClassifyRequest":
subject = self.email_data.subject if self.email_data else self.subject
body = self.email_data.body if self.email_data else (self.body.content if self.body and self.body.content else None)
if not subject or not body:
raise ValueError("Request must include either email_data or Outlook subject/body.content fields")
self.email_data = EmailData(subject=subject, body=body)
if not self.from_address:
self.from_address = (
(self.from_.emailAddress.address if self.from_ and self.from_.emailAddress else None)
or (self.sender.emailAddress.address if self.sender and self.sender.emailAddress else None)
)
return self
model_config = {"populate_by_name": True}
class ClassificationDetails(BaseModel):
summary: str | None = None
suggested_title: str | None = None
suggested_notes: str | None = None
deadline: str | None = None
people: list[str] = Field(default_factory=list)
organizations: list[str] = Field(default_factory=list)
attachments_referenced: list[str] = Field(default_factory=list)
next_steps: list[str] = Field(default_factory=list)
key_points: list[str] = Field(default_factory=list)
source_signals: list[str] = Field(default_factory=list)
dedupe_key: str | None = None
class DedupeResult(BaseModel):
status: Literal["new", "duplicate", "updated"]
seen_count: int = 1
matched_on: Literal["none", "id", "conversation", "fingerprint"] = "none"
message_id: str | None = None
conversation_id: str | None = None
fingerprint: str
class ClassificationResult(BaseModel): class ClassificationResult(BaseModel):
needs_action: bool needs_action: bool
@@ -26,3 +103,5 @@ class ClassificationResult(BaseModel):
task_description: str | None = None task_description: str | None = None
reasoning: str reasoning: str
confidence: float confidence: float
details: ClassificationDetails | None = None
dedupe: DedupeResult | None = None

View File

@@ -1,58 +1,38 @@
SYSTEM_PROMPT = """You are an email classification assistant. Your job is to analyze emails and determine if they need the user's attention and action. The user works in the I.T. department of the Grand Portage tribal government. SYSTEM_PROMPT = """You are an email classification assistant. Your job is to analyze emails and determine if they need the user's attention and action. The user works in the I.T. department of the Grand Portage tribal government.
Return valid JSON only.
CLASSIFICATION RULES: CLASSIFICATION RULES:
1. NEEDS ATTENTION if the email asks a direct question, requests action, contains a deadline, reports a relevant problem, proposes times needing confirmation, or is a relevant I.T. alert.
2. DOES NOT NEED ATTENTION if the email is marketing, newsletter, sales outreach, bulk/promotional, or simple acknowledgment with no response needed.
3. Scheduling questions and unresolved thread questions always need attention.
1. NEEDS ATTENTION (create todo) if the email: OUTPUT JSON SCHEMA:
- Asks a direct question that requires a response
- Contains scheduling questions like \"Does [day/time] work?\", \"Are you available?\", \"When can we meet?\"
- Requests the user to do something (review, approve, provide info, attend meeting)
- Contains a deadline or time-sensitive request
- Is from a colleague/client discussing active work
- Reports an issue or problem that needs addressing
- Proposes specific dates/times and needs confirmation
- Is an automated alert from a system relevant to I.T.
2. DOES NOT NEED ATTENTION (skip) if the email:
- Is a newsletter, marketing email, or webinar invitation
- Is from a person and is an FYI/informational with no action required
- Is promotional content or sales outreach
- Contains unsubscribe links or bulk sender indicators
- Is a simple acknowledgment (\"got it\", \"thanks\", \"sounds good\") with no questions
3. SPECIAL CASES:
- Even if an email says \"working on that\" or similar, if it ALSO contains a question or proposal that needs response, mark as needs_action=true
- \"Does [X] work?\" or \"When can you...?\" ALWAYS needs a response, regardless of other content
- RE: threads can still need action if they contain unanswered questions
OUTPUT FORMAT:
You must respond with valid JSON only, no other text:
{ {
\"needs_action\": true or false, "needs_action": true or false,
\"category\": \"action_required\" | \"question\" | \"fyi\" | \"newsletter\" | \"promotional\" | \"automated\" | \"alert\" | \"uncategorized\", "category": "action_required" | "question" | "fyi" | "newsletter" | "promotional" | "automated" | "alert" | "uncategorized",
\"priority\": \"high\" | \"medium\" | \"low\", "priority": "high" | "medium" | "low",
\"task_description\": \"Brief description of what to do (only if needs_action is true)\", "task_description": "short action-oriented description or null",
\"reasoning\": \"One sentence explaining your decision\", "reasoning": "one sentence",
\"confidence\": A number from 0 to 1 indicating how confident you are "confidence": 0.0 to 1.0,
"details": {
"summary": "brief human-readable summary",
"suggested_title": "good Todoist/task title",
"suggested_notes": "useful multiline notes for a human reviewing or creating a ticket",
"deadline": "deadline/date/time if present, else null",
"people": ["people involved or referenced"],
"organizations": ["organizations, departments, vendors, teams"],
"attachments_referenced": ["attachment names or referenced docs if mentioned"],
"next_steps": ["specific next actions"],
"key_points": ["important context bullets"],
"source_signals": ["question", "deadline", "request", "alert", "followup", "attachment", "scheduling"]
}
} }
EXAMPLES: Rules for details:
- Be concrete and extract as much useful context as possible.
Email: \"Subject: Q4 Budget Review\nHi Daniel, can you review the attached budget proposal and let me know your thoughts by Friday?\" - suggested_notes should help a human create a ticket later.
Output: {\"needs_action\": true, \"category\": \"question\", \"priority\": \"high\", \"task_description\": \"Review Q4 budget proposal and respond by Friday\", \"reasoning\": \"Direct request with deadline\", \"confidence\": 0.91} - If a field is unknown, use null or empty list.
- Do not invent attachment names, people, or deadlines.
Email: \"Subject: RE: Meeting\nWorking on that. Does Tuesday or Wednesday work for you?\" - If needs_action is true, task_description and suggested_title should be useful and specific.
Output: {\"needs_action\": true, \"category\": \"question\", \"priority\": \"medium\", \"task_description\": \"Respond with availability for Tuesday or Wednesday\", \"reasoning\": \"Scheduling question requires response\", \"confidence\": 0.85} """
Email: \"Subject: RE: Issue\nThanks, I'll look into it and get back to you.\"
Output: {\"needs_action\": false, \"category\": \"fyi\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Status update with no questions or action needed\", \"confidence\": 0.77}
Email: \"Subject: Join us for our exclusive webinar on cloud security\nRegister now for our upcoming webinar series...\"
Output: {\"needs_action\": false, \"category\": \"promotional\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Marketing webinar invitation\", \"confidence\": 0.81}
Email: \"Subject: Your order has shipped\nYour order #12345 has been dispatched and will arrive in 3-5 days.\"
Output: {\"needs_action\": false, \"category\": \"automated\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Automated shipping notification\", \"confidence\": 0.72}
Email: \"Subject: Disk at 95 percent on hvs-internal-01\nThe hard disk on server hvs-internal-01 is at a critical level.\"
Output: {\"needs_action\": true, \"category\": \"alert\", \"priority\": \"medium\", \"task_description\": \"Investigate critical disk usage alert on hvs-internal-01\", \"reasoning\": \"Internal I.T. system alert requires follow-up\", \"confidence\": 0.91}
Now classify the following email:"""

83
app/sync.py Normal file
View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import hashlib
import json
import os
import re
from app.dedupe_store import DedupeStore
from app.models import ClassificationResult, ClassifyRequest, DedupeResult
def normalize_subject(subject: str) -> str:
value = subject.strip().lower()
value = re.sub(r"^(re|fw|fwd)\s*:\s*", "", value)
value = re.sub(r"\s+", " ", value)
return value
def build_fingerprint(request: ClassifyRequest) -> str:
subject = normalize_subject(request.email_data.subject)
body = " ".join(request.email_data.body.split()).strip().lower()
preview = " ".join((request.bodyPreview or "").split()).strip().lower()
sender = (request.from_address or "").strip().lower()
seed = f"{sender}\n{subject}\n{preview}\n{body[:2000]}"
return hashlib.sha256(seed.encode()).hexdigest()
def build_result_hash(result: ClassificationResult) -> str:
payload = result.model_dump(exclude={"dedupe"}, exclude_none=True)
return hashlib.sha256(json.dumps(payload, sort_keys=True).encode()).hexdigest()
def apply_dedupe(request: ClassifyRequest, result: ClassificationResult) -> DedupeResult:
store = DedupeStore(os.getenv("EMAIL_CLASSIFIER_DB_PATH", ".data/email_classifier.db"))
fingerprint = build_fingerprint(request)
existing, matched_on = store.find_existing(
outlook_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
)
result_hash = build_result_hash(result)
if not existing:
store.insert_or_update(
existing_id=None,
outlook_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
result_hash=result_hash,
request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True),
result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True),
seen_count=1,
)
return DedupeResult(
status="new",
seen_count=1,
matched_on="none",
message_id=request.id,
conversation_id=request.conversationId,
fingerprint=fingerprint,
)
previous_hash = existing.get("result_hash")
seen_count = int(existing.get("seen_count", 1)) + 1
status = "duplicate" if previous_hash == result_hash else "updated"
store.insert_or_update(
existing_id=existing["id"],
outlook_id=request.id or existing.get("outlook_id"),
conversation_id=request.conversationId or existing.get("conversation_id"),
fingerprint=fingerprint,
result_hash=result_hash,
request_payload=request.model_dump(exclude={"api_key"}, exclude_none=True),
result_payload=result.model_dump(exclude={"dedupe"}, exclude_none=True),
seen_count=seen_count,
)
return DedupeResult(
status=status,
seen_count=seen_count,
matched_on=matched_on,
message_id=request.id or existing.get("outlook_id"),
conversation_id=request.conversationId or existing.get("conversation_id"),
fingerprint=fingerprint,
)