144 lines
5.0 KiB
Python
144 lines
5.0 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any
|
|
|
|
from app.config import get_request_settings
|
|
from app.llm_adapters import build_adapter, coerce_json_text
|
|
from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData
|
|
from app.sync import apply_dedupe, build_fingerprint
|
|
|
|
VALID_CATEGORIES = {
|
|
"action_required",
|
|
"question",
|
|
"fyi",
|
|
"newsletter",
|
|
"promotional",
|
|
"automated",
|
|
"alert",
|
|
"uncategorized",
|
|
}
|
|
VALID_PRIORITIES = {"high", "medium", "low"}
|
|
|
|
|
|
async def classify_email(request: ClassifyRequest) -> ClassificationResult:
|
|
clean_email = _clean_email(request)
|
|
settings = get_request_settings(
|
|
provider=request.provider,
|
|
model=request.model,
|
|
base_url=request.base_url,
|
|
api_key=request.api_key,
|
|
temperature=request.temperature,
|
|
)
|
|
adapter = build_adapter(settings)
|
|
|
|
attempts = 0
|
|
result: ClassificationResult | None = None
|
|
while attempts < settings.max_retries:
|
|
raw_response = await adapter.classify(clean_email.email_data)
|
|
try:
|
|
payload = json.loads(coerce_json_text(raw_response))
|
|
result = _normalize_result(payload, clean_email)
|
|
if result.needs_action and not result.task_description:
|
|
attempts += 1
|
|
continue
|
|
break
|
|
except (json.JSONDecodeError, ValueError, TypeError):
|
|
attempts += 1
|
|
|
|
if result is None:
|
|
result = ClassificationResult(
|
|
needs_action=False,
|
|
category="uncategorized",
|
|
priority="low",
|
|
task_description=None,
|
|
reasoning="System failed to classify after multiple attempts.",
|
|
confidence=0.0,
|
|
details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)),
|
|
)
|
|
|
|
result.dedupe = apply_dedupe(clean_email, result)
|
|
return result
|
|
|
|
|
|
def _clean_email(request: ClassifyRequest) -> ClassifyRequest:
|
|
from app.helpers.clean_email_html import clean_email_html
|
|
from app.helpers.extract_latest_message import extract_latest_message
|
|
from app.helpers.remove_disclaimer import remove_disclaimer
|
|
|
|
return request.model_copy(
|
|
update={
|
|
"email_data": EmailData(
|
|
subject=request.email_data.subject,
|
|
body=remove_disclaimer(clean_email_html(extract_latest_message(request.email_data.body))),
|
|
)
|
|
}
|
|
)
|
|
|
|
|
|
def _normalize_result(data: dict[str, Any], request: ClassifyRequest) -> ClassificationResult:
|
|
needs_action = bool(data.get("needs_action", False))
|
|
category = str(data.get("category", "uncategorized") or "uncategorized").lower()
|
|
if category not in VALID_CATEGORIES:
|
|
category = "uncategorized"
|
|
priority = str(data.get("priority", "low") or "low").lower()
|
|
if priority not in VALID_PRIORITIES:
|
|
priority = "low"
|
|
task_description = data.get("task_description")
|
|
if task_description is not None:
|
|
task_description = str(task_description).strip() or None
|
|
if needs_action and not task_description:
|
|
raise ValueError("task_description required when needs_action is true")
|
|
reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided."
|
|
confidence_raw = data.get("confidence", 0.0)
|
|
confidence = max(0.0, min(1.0, float(confidence_raw)))
|
|
details_payload = data.get("details") or {}
|
|
details = ClassificationDetails(
|
|
summary=_clean_text(details_payload.get("summary")),
|
|
suggested_title=_clean_text(details_payload.get("suggested_title")),
|
|
suggested_notes=_clean_text(details_payload.get("suggested_notes")),
|
|
deadline=_clean_text(details_payload.get("deadline")),
|
|
people=_string_list(details_payload.get("people")),
|
|
organizations=_string_list(details_payload.get("organizations")),
|
|
attachments_referenced=_string_list(details_payload.get("attachments_referenced")),
|
|
next_steps=_string_list(details_payload.get("next_steps")),
|
|
key_points=_string_list(details_payload.get("key_points")),
|
|
source_signals=_string_list(details_payload.get("source_signals")),
|
|
dedupe_key=build_fingerprint(request),
|
|
)
|
|
if needs_action and not details.suggested_title:
|
|
details.suggested_title = task_description
|
|
if not details.summary:
|
|
details.summary = reasoning
|
|
return ClassificationResult(
|
|
needs_action=needs_action,
|
|
category=category,
|
|
priority=priority,
|
|
task_description=task_description,
|
|
reasoning=reasoning,
|
|
confidence=confidence,
|
|
details=details,
|
|
)
|
|
|
|
|
|
def _clean_text(value: Any) -> str | None:
|
|
if value is None:
|
|
return None
|
|
text = str(value).strip()
|
|
return text or None
|
|
|
|
|
|
def _string_list(value: Any) -> list[str]:
|
|
if not value:
|
|
return []
|
|
if isinstance(value, list):
|
|
items = value
|
|
else:
|
|
items = [value]
|
|
output = []
|
|
for item in items:
|
|
text = str(item).strip()
|
|
if text and text not in output:
|
|
output.append(text)
|
|
return output
|