from __future__ import annotations import json from typing import Any from app.config import get_request_settings from app.llm_adapters import build_adapter, coerce_json_text from app.models import ClassificationDetails, ClassificationResult, ClassifyRequest, EmailData from app.sync import build_fingerprint, sync_todoist VALID_CATEGORIES = { "action_required", "question", "fyi", "newsletter", "promotional", "automated", "alert", "uncategorized", } VALID_PRIORITIES = {"high", "medium", "low"} async def classify_email(request: ClassifyRequest) -> ClassificationResult: clean_email = _clean_email(request) settings = get_request_settings( provider=request.provider, model=request.model, base_url=request.base_url, api_key=request.api_key, temperature=request.temperature, ) adapter = build_adapter(settings) attempts = 0 result: ClassificationResult | None = None while attempts < settings.max_retries: raw_response = await adapter.classify(clean_email.email_data) try: payload = json.loads(coerce_json_text(raw_response)) result = _normalize_result(payload, clean_email) if result.needs_action and not result.task_description: attempts += 1 continue break except (json.JSONDecodeError, ValueError, TypeError): attempts += 1 if result is None: result = ClassificationResult( needs_action=False, category="uncategorized", priority="low", task_description=None, reasoning="System failed to classify after multiple attempts.", confidence=0.0, details=ClassificationDetails(dedupe_key=build_fingerprint(clean_email)), ) result.todoist = await sync_todoist(clean_email, result) return result def _clean_email(request: ClassifyRequest) -> ClassifyRequest: from app.helpers.clean_email_html import clean_email_html from app.helpers.extract_latest_message import extract_latest_message from app.helpers.remove_disclaimer import remove_disclaimer return request.model_copy( update={ "email_data": EmailData( subject=request.email_data.subject, body=remove_disclaimer(clean_email_html(extract_latest_message(request.email_data.body))), ) } ) def _normalize_result(data: dict[str, Any], request: ClassifyRequest) -> ClassificationResult: needs_action = bool(data.get("needs_action", False)) category = str(data.get("category", "uncategorized") or "uncategorized").lower() if category not in VALID_CATEGORIES: category = "uncategorized" priority = str(data.get("priority", "low") or "low").lower() if priority not in VALID_PRIORITIES: priority = "low" task_description = data.get("task_description") if task_description is not None: task_description = str(task_description).strip() or None if needs_action and not task_description: raise ValueError("task_description required when needs_action is true") reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided." confidence_raw = data.get("confidence", 0.0) confidence = max(0.0, min(1.0, float(confidence_raw))) details_payload = data.get("details") or {} details = ClassificationDetails( summary=_clean_text(details_payload.get("summary")), suggested_title=_clean_text(details_payload.get("suggested_title")), suggested_notes=_clean_text(details_payload.get("suggested_notes")), deadline=_clean_text(details_payload.get("deadline")), people=_string_list(details_payload.get("people")), organizations=_string_list(details_payload.get("organizations")), attachments_referenced=_string_list(details_payload.get("attachments_referenced")), next_steps=_string_list(details_payload.get("next_steps")), key_points=_string_list(details_payload.get("key_points")), source_signals=_string_list(details_payload.get("source_signals")), dedupe_key=build_fingerprint(request), ) if needs_action and not details.suggested_title: details.suggested_title = task_description if not details.summary: details.summary = reasoning return ClassificationResult( needs_action=needs_action, category=category, priority=priority, task_description=task_description, reasoning=reasoning, confidence=confidence, details=details, ) def _clean_text(value: Any) -> str | None: if value is None: return None text = str(value).strip() return text or None def _string_list(value: Any) -> list[str]: if not value: return [] if isinstance(value, list): items = value else: items = [value] output = [] for item in items: text = str(item).strip() if text and text not in output: output.append(text) return output