Add configurable LLM provider adapters for email classification
All checks were successful
Build and Publish Docker Image / build-and-push (push) Successful in 5m3s

This commit is contained in:
Steve W
2026-04-09 17:36:46 +00:00
parent 9ed41a3dff
commit 7c9d851a9a
9 changed files with 426 additions and 149 deletions

View File

@@ -0,0 +1,88 @@
# email-classifier
FastAPI service that classifies email using a configurable LLM backend.
## What changed
The classifier no longer hardcodes a single Ollama + OpenAI-compatible endpoint.
It now supports:
- OpenAI-compatible APIs
- Anthropic-compatible APIs
- per-request overrides for provider, model, endpoint, and temperature
- global defaults through environment variables
This makes it suitable for local Ollama, hosted OpenAI-compatible services, and MiniMax's recommended Anthropic-compatible API.
## Environment configuration
Defaults are loaded from environment variables:
```bash
export LLM_PROVIDER=openai
export LLM_BASE_URL=http://ollama.internal.henryhosted.com:9292/v1
export LLM_API_KEY=none
export LLM_MODEL=qwen2.5-7b-instruct.q4_k_m
export LLM_TEMPERATURE=0.1
export LLM_TIMEOUT_SECONDS=60
export LLM_MAX_RETRIES=3
```
### MiniMax example
MiniMax recommends Anthropic-compatible integration.
```bash
export LLM_PROVIDER=anthropic
export LLM_BASE_URL=https://api.minimax.io/v1
export LLM_API_KEY=your_minimax_key
export LLM_MODEL=MiniMax-M2.7
```
## API
### POST /classify
Request body:
```json
{
"email_data": {
"subject": "Can you review this by Friday?",
"body": "Hi Daniel, please review the attached budget proposal."
},
"provider": "anthropic",
"base_url": "https://api.minimax.io/v1",
"model": "MiniMax-M2.7",
"temperature": 0.1
}
```
All override fields are optional. If omitted, the service uses the global env config.
Response shape:
```json
{
"needs_action": true,
"category": "question",
"priority": "high",
"task_description": "Review the budget proposal and respond by Friday",
"reasoning": "Direct request with a deadline requires follow-up",
"confidence": 0.91
}
```
## Architecture
- `app/config.py`: global and per-request LLM settings
- `app/llm_adapters.py`: provider adapters
- `app/classifier.py`: classification orchestration, retries, normalization
- `app/prompts.py`: system prompt
- `app/routers/classify_email.py`: thin API route
## Notes
- OpenAI-compatible providers use the OpenAI SDK.
- Anthropic-compatible providers use the Anthropic SDK.
- Per-request `api_key` is supported, but excluded from response serialization.
- The service normalizes malformed model output and falls back safely after retry exhaustion.

91
app/classifier.py Normal file
View File

@@ -0,0 +1,91 @@
from __future__ import annotations
import json
from typing import Any
from app.config import get_request_settings
from app.llm_adapters import build_adapter, coerce_json_text
from app.models import ClassificationResult, ClassifyRequest, EmailData
VALID_CATEGORIES = {
"action_required",
"question",
"fyi",
"newsletter",
"promotional",
"automated",
"alert",
"uncategorized",
}
VALID_PRIORITIES = {"high", "medium", "low"}
async def classify_email(request: ClassifyRequest) -> ClassificationResult:
clean_email = _clean_email(request.email_data)
settings = get_request_settings(
provider=request.provider,
model=request.model,
base_url=request.base_url,
api_key=request.api_key,
temperature=request.temperature,
)
adapter = build_adapter(settings)
attempts = 0
while attempts < settings.max_retries:
raw_response = await adapter.classify(clean_email)
try:
payload = json.loads(coerce_json_text(raw_response))
result = _normalize_result(payload)
if result.needs_action and not result.task_description:
attempts += 1
continue
return result
except (json.JSONDecodeError, ValueError, TypeError):
attempts += 1
return ClassificationResult(
needs_action=False,
category="uncategorized",
priority="low",
task_description=None,
reasoning="System failed to classify after multiple attempts.",
confidence=0.0,
)
def _clean_email(email: EmailData) -> EmailData:
from app.helpers.clean_email_html import clean_email_html
from app.helpers.extract_latest_message import extract_latest_message
from app.helpers.remove_disclaimer import remove_disclaimer
return EmailData(
subject=email.subject,
body=remove_disclaimer(clean_email_html(extract_latest_message(email.body))),
)
def _normalize_result(data: dict[str, Any]) -> ClassificationResult:
needs_action = bool(data.get("needs_action", False))
category = str(data.get("category", "uncategorized") or "uncategorized").lower()
if category not in VALID_CATEGORIES:
category = "uncategorized"
priority = str(data.get("priority", "low") or "low").lower()
if priority not in VALID_PRIORITIES:
priority = "low"
task_description = data.get("task_description")
if task_description is not None:
task_description = str(task_description).strip() or None
if needs_action and not task_description:
raise ValueError("task_description required when needs_action is true")
reasoning = str(data.get("reasoning", "") or "").strip() or "No reasoning provided."
confidence_raw = data.get("confidence", 0.0)
confidence = max(0.0, min(1.0, float(confidence_raw)))
return ClassificationResult(
needs_action=needs_action,
category=category,
priority=priority,
task_description=task_description,
reasoning=reasoning,
confidence=confidence,
)

46
app/config.py Normal file
View File

@@ -0,0 +1,46 @@
from __future__ import annotations
import os
from functools import lru_cache
from typing import Literal
from pydantic import BaseModel, Field
Provider = Literal["openai", "anthropic"]
class LLMSettings(BaseModel):
provider: Provider = Field(default=os.getenv("LLM_PROVIDER", "openai"))
api_key: str = Field(default=os.getenv("LLM_API_KEY", "none"))
model: str = Field(default=os.getenv("LLM_MODEL", "qwen2.5-7b-instruct.q4_k_m"))
base_url: str = Field(default=os.getenv("LLM_BASE_URL", "http://ollama.internal.henryhosted.com:9292/v1"))
temperature: float = Field(default=float(os.getenv("LLM_TEMPERATURE", "0.1")))
timeout_seconds: float = Field(default=float(os.getenv("LLM_TIMEOUT_SECONDS", "60")))
max_retries: int = Field(default=int(os.getenv("LLM_MAX_RETRIES", "3")))
@lru_cache(maxsize=1)
def get_settings() -> LLMSettings:
return LLMSettings()
def get_request_settings(
provider: str | None = None,
model: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
temperature: float | None = None,
) -> LLMSettings:
base = get_settings()
data = base.model_dump()
if provider is not None:
data["provider"] = provider
if model is not None:
data["model"] = model
if base_url is not None:
data["base_url"] = base_url
if api_key is not None:
data["api_key"] = api_key
if temperature is not None:
data["temperature"] = temperature
return LLMSettings(**data)

View File

@@ -1,83 +1,25 @@
from openai import AsyncOpenAI
from __future__ import annotations
from app.config import get_request_settings
from app.llm_adapters import build_adapter
from app.models import EmailData
openai_client = AsyncOpenAI(
base_url="http://ollama.internal.henryhosted.com:9292/v1",
api_key="none"
)
model = "qwen2.5-7b-instruct.q4_k_m"
system_prompt = """You are an email classification assistant. Your job is to analyze emails and determine if they need the user's attention and action. The user works in the I.T. department of the Grand Portage tribal government.
CLASSIFICATION RULES:
1. NEEDS ATTENTION (create todo) if the email:
- Asks a direct question that requires a response
- Contains scheduling questions like "Does [day/time] work?", "Are you available?", "When can we meet?"
- Requests the user to do something (review, approve, provide info, attend meeting)
- Contains a deadline or time-sensitive request
- Is from a colleague/client discussing active work
- Reports an issue or problem that needs addressing
- Proposes specific dates/times and needs confirmation
- Is an automated alert from a system relevant to I.T.
2. DOES NOT NEED ATTENTION (skip) if the email:
- Is a newsletter, marketing email, or webinar invitation
- Is from a person and is an FYI/informational with no action required
- Is promotional content or sales outreach
- Contains unsubscribe links or bulk sender indicators
- Is a simple acknowledgment ("got it", "thanks", "sounds good") with no questions
3. SPECIAL CASES:
- Even if an email says "working on that" or similar, if it ALSO contains a question or proposal that needs response, mark as needs_action=true
- "Does [X] work?" or "When can you...?" ALWAYS needs a response, regardless of other content
- RE: threads can still need action if they contain unanswered questions
OUTPUT FORMAT:
You must respond with valid JSON only, no other text:
{
"needs_action": true or false,
"category": "action_required" | "question" | "fyi" | "newsletter" | "promotional" | "automated",
"priority": "high" | "medium" | "low",
"task_description": "Brief description of what to do (only if needs_action is true)",
"reasoning": "One sentence explaining your decision",
"confidence": "A number from 0 to 1 indicating how confident you are"
}
EXAMPLES:
Email: "Subject: Q4 Budget Review\nHi Daniel, can you review the attached budget proposal and let me know your thoughts by Friday?"
Output: {"needs_action": true, "category": "question", "priority": "high", "task_description": "Review Q4 budget proposal and respond by Friday", "reasoning": "Direct request with deadline", "confidence": 0.91}
Email: "Subject: RE: Meeting\nWorking on that. Does Tuesday or Wednesday work for you?"
Output: {"needs_action": true, "category": "question", "priority": "medium", "task_description": "Respond with availability for Tuesday or Wednesday", "reasoning": "Scheduling question requires response", "confidence": 0.85}
Email: "Subject: RE: Issue\nThanks, I'll look into it and get back to you."
Output: {"needs_action": false, "category": "fyi", "priority": "low", "task_description": null, "reasoning": "Status update with no questions or action needed", "confidence": 0.77}
Email: "Subject: Join us for our exclusive webinar on cloud security\nRegister now for our upcoming webinar series..."
Output: {"needs_action": false, "category": "promotional", "priority": "low", "task_description": null, "reasoning": "Marketing webinar invitation", "confidence": 0.81}
Email: "Subject: Your order has shipped\nYour order #12345 has been dispatched and will arrive in 3-5 days."
Output: {"needs_action": false, "category": "automated", "priority": "low", "task_description": null, "reasoning": "Automated shipping notification", "confidence": 0.72}
Email: "Subject: Disk at 95 percent on hvs-internal-01\nYThe hard disk on server hvs-internal-01 is at a critical level."
Output: {"needs_action": true, "category": "alert", "priority": "medium", "task_description": null, "reasoning": "Internal I.T. system alert", "confidence": 0.91}
Now classify the following email:"""
async def send_classify_request(email: EmailData):
response = await openai_client.chat.completions.create(
async def send_classify_request(
email: EmailData,
*,
provider: str | None = None,
model: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
temperature: float | None = None,
) -> str:
settings = get_request_settings(
provider=provider,
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Subject: {email.subject}\nBody: {email.body}"}
],
temperature=0.1, # Keep it low so it follows rules strictly
response_format={"type": "json_object"} # Important for newer local servers
base_url=base_url,
api_key=api_key,
temperature=temperature,
)
return response.choices[0].message.content
adapter = build_adapter(settings)
return await adapter.classify(email)

92
app/llm_adapters.py Normal file
View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import json
from typing import Protocol
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI
from app.config import LLMSettings
from app.models import EmailData
from app.prompts import SYSTEM_PROMPT
class LLMAdapter(Protocol):
async def classify(self, email: EmailData) -> str: ...
class OpenAICompatibleAdapter:
def __init__(self, settings: LLMSettings):
self.settings = settings
self.client = AsyncOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
timeout=settings.timeout_seconds,
max_retries=0,
)
async def classify(self, email: EmailData) -> str:
response = await self.client.chat.completions.create(
model=self.settings.model,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Subject: {email.subject}\nBody: {email.body}"},
],
temperature=self.settings.temperature,
response_format={"type": "json_object"},
)
return response.choices[0].message.content or ""
class AnthropicCompatibleAdapter:
def __init__(self, settings: LLMSettings):
self.settings = settings
self.client = AsyncAnthropic(
base_url=settings.base_url,
api_key=settings.api_key,
timeout=settings.timeout_seconds,
max_retries=0,
)
async def classify(self, email: EmailData) -> str:
response = await self.client.messages.create(
model=self.settings.model,
max_tokens=500,
temperature=self.settings.temperature,
system=SYSTEM_PROMPT,
messages=[
{"role": "user", "content": f"Subject: {email.subject}\nBody: {email.body}"},
],
)
chunks: list[str] = []
for block in response.content:
text = getattr(block, "text", None)
if text:
chunks.append(text)
return "\n".join(chunks)
def build_adapter(settings: LLMSettings) -> LLMAdapter:
if settings.provider == "anthropic":
return AnthropicCompatibleAdapter(settings)
return OpenAICompatibleAdapter(settings)
def coerce_json_text(raw: str) -> str:
text = raw.strip()
if not text:
return text
if text.startswith("```"):
lines = text.splitlines()
if len(lines) >= 3 and lines[0].startswith("```") and lines[-1].startswith("```"):
text = "\n".join(lines[1:-1]).strip()
if text.lower().startswith("json\n"):
text = text[5:].strip()
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end >= start:
candidate = text[start : end + 1]
json.loads(candidate)
return candidate
json.loads(text)
return text

View File

@@ -1,5 +1,28 @@
from pydantic import BaseModel
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
class EmailData(BaseModel):
subject: str
body: str
class ClassifyRequest(BaseModel):
email_data: EmailData
provider: Literal["openai", "anthropic"] | None = None
model: str | None = None
base_url: str | None = None
api_key: str | None = Field(default=None, exclude=True)
temperature: float | None = None
class ClassificationResult(BaseModel):
needs_action: bool
category: Literal["action_required", "question", "fyi", "newsletter", "promotional", "automated", "alert", "uncategorized"]
priority: Literal["high", "medium", "low"]
task_description: str | None = None
reasoning: str
confidence: float

58
app/prompts.py Normal file
View File

@@ -0,0 +1,58 @@
SYSTEM_PROMPT = """You are an email classification assistant. Your job is to analyze emails and determine if they need the user's attention and action. The user works in the I.T. department of the Grand Portage tribal government.
CLASSIFICATION RULES:
1. NEEDS ATTENTION (create todo) if the email:
- Asks a direct question that requires a response
- Contains scheduling questions like \"Does [day/time] work?\", \"Are you available?\", \"When can we meet?\"
- Requests the user to do something (review, approve, provide info, attend meeting)
- Contains a deadline or time-sensitive request
- Is from a colleague/client discussing active work
- Reports an issue or problem that needs addressing
- Proposes specific dates/times and needs confirmation
- Is an automated alert from a system relevant to I.T.
2. DOES NOT NEED ATTENTION (skip) if the email:
- Is a newsletter, marketing email, or webinar invitation
- Is from a person and is an FYI/informational with no action required
- Is promotional content or sales outreach
- Contains unsubscribe links or bulk sender indicators
- Is a simple acknowledgment (\"got it\", \"thanks\", \"sounds good\") with no questions
3. SPECIAL CASES:
- Even if an email says \"working on that\" or similar, if it ALSO contains a question or proposal that needs response, mark as needs_action=true
- \"Does [X] work?\" or \"When can you...?\" ALWAYS needs a response, regardless of other content
- RE: threads can still need action if they contain unanswered questions
OUTPUT FORMAT:
You must respond with valid JSON only, no other text:
{
\"needs_action\": true or false,
\"category\": \"action_required\" | \"question\" | \"fyi\" | \"newsletter\" | \"promotional\" | \"automated\" | \"alert\" | \"uncategorized\",
\"priority\": \"high\" | \"medium\" | \"low\",
\"task_description\": \"Brief description of what to do (only if needs_action is true)\",
\"reasoning\": \"One sentence explaining your decision\",
\"confidence\": A number from 0 to 1 indicating how confident you are
}
EXAMPLES:
Email: \"Subject: Q4 Budget Review\nHi Daniel, can you review the attached budget proposal and let me know your thoughts by Friday?\"
Output: {\"needs_action\": true, \"category\": \"question\", \"priority\": \"high\", \"task_description\": \"Review Q4 budget proposal and respond by Friday\", \"reasoning\": \"Direct request with deadline\", \"confidence\": 0.91}
Email: \"Subject: RE: Meeting\nWorking on that. Does Tuesday or Wednesday work for you?\"
Output: {\"needs_action\": true, \"category\": \"question\", \"priority\": \"medium\", \"task_description\": \"Respond with availability for Tuesday or Wednesday\", \"reasoning\": \"Scheduling question requires response\", \"confidence\": 0.85}
Email: \"Subject: RE: Issue\nThanks, I'll look into it and get back to you.\"
Output: {\"needs_action\": false, \"category\": \"fyi\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Status update with no questions or action needed\", \"confidence\": 0.77}
Email: \"Subject: Join us for our exclusive webinar on cloud security\nRegister now for our upcoming webinar series...\"
Output: {\"needs_action\": false, \"category\": \"promotional\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Marketing webinar invitation\", \"confidence\": 0.81}
Email: \"Subject: Your order has shipped\nYour order #12345 has been dispatched and will arrive in 3-5 days.\"
Output: {\"needs_action\": false, \"category\": \"automated\", \"priority\": \"low\", \"task_description\": null, \"reasoning\": \"Automated shipping notification\", \"confidence\": 0.72}
Email: \"Subject: Disk at 95 percent on hvs-internal-01\nThe hard disk on server hvs-internal-01 is at a critical level.\"
Output: {\"needs_action\": true, \"category\": \"alert\", \"priority\": \"medium\", \"task_description\": \"Investigate critical disk usage alert on hvs-internal-01\", \"reasoning\": \"Internal I.T. system alert requires follow-up\", \"confidence\": 0.91}
Now classify the following email:"""

View File

@@ -1,75 +1,11 @@
from fastapi import APIRouter
from pydantic import BaseModel
from app.helpers.extract_latest_message import extract_latest_message
from app.helpers.clean_email_html import clean_email_html
from app.helpers.remove_disclaimer import remove_disclaimer
from app.helpers.send_classify_request import send_classify_request
from app.models import EmailData
import json
class ClassifyRequest(BaseModel):
email_data: EmailData
from app.classifier import classify_email
from app.models import ClassificationResult, ClassifyRequest
router = APIRouter()
@router.post("/classify")
async def classify_route(request: ClassifyRequest):
email = request.email_data
clean_email = email.copy()
clean_email.subject = email.subject
clean_email.body = extract_latest_message(clean_email.body)
clean_email.body = clean_email_html(clean_email.body)
clean_email.body = remove_disclaimer(clean_email.body)
max_retries = 3
attempts = 0
valid_response = False
response_data = {}
# return await send_classify_request(clean_email)
while attempts < max_retries:
# 1. Get the raw string response
raw_response = await send_classify_request(clean_email)
try:
if raw_response is None:
print("Error: Received no response from classifier.")
continue # or handle the error
# 2. Parse the string into a Python dict
data = json.loads(raw_response)
needs_action = data.get("needs_action")
task_description = data.get("task_description")
# 3. Check your "re-do" condition
# Logic: If it needs action but the description is missing/empty, we retry.
if needs_action is True and not task_description:
print(f"Attempt {attempts + 1}: Needs action but description is empty. Retrying...")
attempts += 1
continue
# If we reach here, the response is either (needs_action=False)
# OR (needs_action=True AND has a description).
response_data = data
valid_response = True
break
except json.JSONDecodeError:
print("Response was not valid JSON. Retrying...")
attempts += 1
if not valid_response:
print("Failed to get a valid classification after maximum retries. Sending fallback.")
# Create a safe, default response
response_data = {
"needs_action": False,
"category": "uncategorized",
"priority": "low",
"task_description": "",
"reasoning": "System failed to classify after multiple attempts.",
"confidence": 0.0
}
return response_data
@router.post("/classify", response_model=ClassificationResult)
async def classify_route(request: ClassifyRequest) -> ClassificationResult:
return await classify_email(request)

View File

@@ -1,10 +1,11 @@
[project]
name = "email-classifier"
version = "0.1.0"
description = "Add your description here"
description = "Email classification API with configurable LLM providers and endpoints"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"anthropic>=0.57.1",
"beautifulsoup4>=4.14.3",
"fastapi>=0.128.0",
"openai>=2.16.0",