Files
email-classifier/app/classifier.py

144 lines
5.0 KiB
Python

from __future__ import annotations
import json
from typing import Any
from app.config import get_request_settings
from app.llm_adapters import build_adapter, coerce_json_text
from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData
from app.sync import apply_dedupe, build_fingerprint
VALID_CATEGORIES = {
"action_required",
"question",
"fyi",
"newsletter",
"promotional",
"automated",
"alert",
"uncategorized",
}
VALID_PRIORITIES = {"high", "medium", "low"}
async def classify_email(request: ClassifyRequest) -> ClassificationResult:
clean_email = _clean_email(request)
settings = get_request_settings(
provider=request.provider,
model=request.model,
base_url=request.base_url,
api_key=request.api_key,
temperature=request.temperature,
)
adapter = build_adapter(settings)
attempts = 0
result: ClassificationResult | None = None
while attempts < settings.max_retries:
raw_response = await adapter.classify(clean_email.email_data)
try:
payload = json.loads(coerce_json_text(raw_response))
result = _normalize_result(payload, clean_email)
if result.needs_action and not result.task_description:
attempts += 1
continue
break
except (json.JSONDecodeError, ValueError, TypeError):
attempts += 1
if result is None:
result = ClassificationResult(
needs_action=False,
category="uncategorized",
priority="low",
task_description=None,
reasoning="System failed to classify after multiple attempts.",
confidence=0.0,
details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)),
)
result.dedupe = apply_dedupe(clean_email, result)
return result
def _clean_email(request: ClassifyRequest) -> ClassifyRequest:
from app.helpers.clean_email_html import clean_email_html
from app.helpers.extract_latest_message import extract_latest_message
from app.helpers.remove_disclaimer import remove_disclaimer
return request.model_copy(
update={
"email_data": EmailData(
subject=request.email_data.subject,
body=remove_disclaimer(clean_email_html(extract_latest_message(request.email_data.body))),
)
}
)
def _normalize_result(data: dict[str, Any], request: ClassifyRequest) -> ClassificationResult:
needs_action = bool(data.get("needs_action", False))
category = str(data.get("category", "uncategorized") or "uncategorized").lower()
if category not in VALID_CATEGORIES:
category = "uncategorized"
priority = str(data.get("priority", "low") or "low").lower()
if priority not in VALID_PRIORITIES:
priority = "low"
task_description = data.get("task_description")
if task_description is not None:
task_description = str(task_description).strip() or None
if needs_action and not task_description:
raise ValueError("task_description required when needs_action is true")
reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided."
confidence_raw = data.get("confidence", 0.0)
confidence = max(0.0, min(1.0, float(confidence_raw)))
details_payload = data.get("details") or {}
details = ClassificationDetails(
summary=_clean_text(details_payload.get("summary")),
suggested_title=_clean_text(details_payload.get("suggested_title")),
suggested_notes=_clean_text(details_payload.get("suggested_notes")),
deadline=_clean_text(details_payload.get("deadline")),
people=_string_list(details_payload.get("people")),
organizations=_string_list(details_payload.get("organizations")),
attachments_referenced=_string_list(details_payload.get("attachments_referenced")),
next_steps=_string_list(details_payload.get("next_steps")),
key_points=_string_list(details_payload.get("key_points")),
source_signals=_string_list(details_payload.get("source_signals")),
dedupe_key=build_fingerprint(request),
)
if needs_action and not details.suggested_title:
details.suggested_title = task_description
if not details.summary:
details.summary = reasoning
return ClassificationResult(
needs_action=needs_action,
category=category,
priority=priority,
task_description=task_description,
reasoning=reasoning,
confidence=confidence,
details=details,
)
def _clean_text(value: Any) -> str | None:
if value is None:
return None
text = str(value).strip()
return text or None
def _string_list(value: Any) -> list[str]:
if not value:
return []
if isinstance(value, list):
items = value
else:
items = [value]
output = []
for item in items:
text = str(item).strip()
if text and text not in output:
output.append(text)
return output