feat: rebuild work queue api with fastapi and postgres
Some checks failed
ci / build-test-push (push) Failing after 1m42s

This commit is contained in:
Steve W
2026-04-11 19:24:52 +00:00
parent 7420adb7aa
commit fbc88bb62b
33 changed files with 1707 additions and 1132 deletions

1
app/__init__.py Normal file
View File

@@ -0,0 +1 @@
# app package

11
app/config.py Normal file
View File

@@ -0,0 +1,11 @@
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
database_url: str = "postgresql://postgres:password@localhost:5432/work_queue"
port: int = 8080
model_config = SettingsConfigDict(env_prefix="", case_sensitive=False)
settings = Settings()

131
app/db.py Normal file
View File

@@ -0,0 +1,131 @@
from __future__ import annotations
import json
from contextlib import contextmanager
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import UUID, uuid4
import psycopg
from fastapi import HTTPException
from psycopg.rows import dict_row
from app.config import settings
MIGRATION_PATH = Path(__file__).parent / "migrations" / "001_initial.sql"
TERMINAL_STATUSES = {"completed", "failed", "cancelled"}
VALID_TRANSITIONS = {
"queued": {"dispatched", "cancelled"},
"dispatched": {"in_progress", "cancelled"},
"in_progress": {"blocked", "completed", "failed"},
"blocked": {"in_progress", "completed", "failed"},
"completed": set(),
"failed": set(),
"cancelled": set(),
}
def utcnow() -> datetime:
return datetime.now(timezone.utc).replace(microsecond=0)
@contextmanager
def get_conn():
with psycopg.connect(settings.database_url, row_factory=dict_row) as conn:
yield conn
def run_migrations() -> None:
with get_conn() as conn:
conn.execute(MIGRATION_PATH.read_text())
conn.commit()
def _parse_payload(value: Any) -> dict[str, Any] | None:
if value is None:
return None
if isinstance(value, dict):
return value
return json.loads(value)
def _normalize_work_row(row: dict[str, Any]) -> dict[str, Any]:
row["payload"] = _parse_payload(row.get("payload"))
row.setdefault("dispatch_log", [])
return row
def fetch_project_or_404(conn: psycopg.Connection, project_id: str | UUID) -> dict[str, Any]:
row = conn.execute(
"SELECT id, name, external_ref, created_at, updated_at FROM projects WHERE id = %s",
(str(project_id),),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="project not found")
return row
def fetch_work_or_404(conn: psycopg.Connection, work_id: str | UUID) -> dict[str, Any]:
row = conn.execute(
"""
SELECT id, project_id, type, description, payload, priority, status, assigned_agent,
created_by, created_at, updated_at, completed_at, outcome, notes
FROM work_items WHERE id = %s
""",
(str(work_id),),
).fetchone()
if row is None:
raise HTTPException(status_code=404, detail="work item not found")
return _normalize_work_row(row)
def fetch_dispatch_log(conn: psycopg.Connection, work_id: str | UUID) -> list[dict[str, Any]]:
rows = conn.execute(
"""
SELECT id, work_item_id, dispatched_at, agent, completed_at, outcome
FROM dispatch_log WHERE work_item_id = %s ORDER BY dispatched_at ASC
""",
(str(work_id),),
).fetchall()
return rows
def validate_transition(current: str, new: str, assigned_agent: str | None, outcome: str | None, notes: str | None) -> None:
if new not in VALID_TRANSITIONS[current]:
raise HTTPException(status_code=400, detail=f"invalid status transition: {current} -> {new}")
if current == "queued" and new == "dispatched" and not assigned_agent:
raise HTTPException(status_code=400, detail="assigned_agent is required for queued -> dispatched")
if new == "blocked" and not (notes or "").strip():
raise HTTPException(status_code=400, detail="notes are required when status is blocked")
if new in TERMINAL_STATUSES and not outcome:
raise HTTPException(status_code=400, detail="outcome is required for terminal statuses")
if new == "completed" and outcome != "success":
raise HTTPException(status_code=400, detail="completed requires outcome=success")
if new == "failed" and outcome != "failed":
raise HTTPException(status_code=400, detail="failed requires outcome=failed")
if new == "cancelled" and outcome != "cancelled":
raise HTTPException(status_code=400, detail="cancelled requires outcome=cancelled")
def create_dispatch_log(conn: psycopg.Connection, work_item_id: str, agent: str, dispatched_at: datetime) -> None:
conn.execute(
"INSERT INTO dispatch_log (id, work_item_id, dispatched_at, agent) VALUES (%s, %s, %s, %s)",
(str(uuid4()), work_item_id, dispatched_at, agent),
)
def complete_dispatch_log(conn: psycopg.Connection, work_item_id: str, completed_at: datetime, outcome: str) -> None:
conn.execute(
"""
UPDATE dispatch_log
SET completed_at = %s, outcome = %s
WHERE id = (
SELECT id FROM dispatch_log
WHERE work_item_id = %s AND completed_at IS NULL
ORDER BY dispatched_at DESC
LIMIT 1
)
""",
(completed_at, outcome, work_item_id),
)

23
app/main.py Normal file
View File

@@ -0,0 +1,23 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.db import run_migrations
from app.routers.projects import router as projects_router
from app.routers.work import router as work_router
@asynccontextmanager
async def lifespan(_: FastAPI):
run_migrations()
yield
app = FastAPI(title="Work Queue API", lifespan=lifespan)
app.include_router(projects_router)
app.include_router(work_router)
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}

View File

@@ -0,0 +1,51 @@
CREATE EXTENSION IF NOT EXISTS pgcrypto;
CREATE TABLE IF NOT EXISTS projects (
id UUID PRIMARY KEY,
name TEXT NOT NULL,
external_ref TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);
CREATE TABLE IF NOT EXISTS work_items (
id UUID PRIMARY KEY,
project_id UUID REFERENCES projects(id) ON DELETE SET NULL,
type TEXT NOT NULL,
description TEXT NOT NULL,
payload JSONB,
priority INTEGER NOT NULL DEFAULT 3 CHECK (priority BETWEEN 1 AND 5),
status TEXT NOT NULL CHECK (status IN ('queued','dispatched','in_progress','blocked','failed','completed','cancelled')),
assigned_agent TEXT,
created_by TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
completed_at TIMESTAMPTZ,
outcome TEXT CHECK (outcome IN ('success','failed','cancelled') OR outcome IS NULL),
notes TEXT,
CHECK (
(status IN ('completed','failed','cancelled') AND completed_at IS NOT NULL AND outcome IS NOT NULL)
OR
(status NOT IN ('completed','failed','cancelled') AND completed_at IS NULL AND outcome IS NULL)
)
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_one_in_progress_per_agent
ON work_items(assigned_agent)
WHERE status = 'in_progress' AND assigned_agent IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_work_items_status ON work_items(status);
CREATE INDEX IF NOT EXISTS idx_work_items_agent_status ON work_items(assigned_agent, status);
CREATE INDEX IF NOT EXISTS idx_work_items_project_id ON work_items(project_id);
CREATE INDEX IF NOT EXISTS idx_work_items_created_at ON work_items(created_at);
CREATE TABLE IF NOT EXISTS dispatch_log (
id UUID PRIMARY KEY,
work_item_id UUID NOT NULL REFERENCES work_items(id) ON DELETE CASCADE,
dispatched_at TIMESTAMPTZ NOT NULL,
agent TEXT NOT NULL,
completed_at TIMESTAMPTZ,
outcome TEXT CHECK (outcome IN ('success','failed','cancelled') OR outcome IS NULL)
);
CREATE INDEX IF NOT EXISTS idx_dispatch_log_work_item_id ON dispatch_log(work_item_id);

76
app/models.py Normal file
View File

@@ -0,0 +1,76 @@
from datetime import datetime
from typing import Any, Literal
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
Status = Literal["queued", "dispatched", "in_progress", "blocked", "failed", "completed", "cancelled"]
Outcome = Literal["success", "failed", "cancelled"]
class ProjectCreate(BaseModel):
name: str
external_ref: str | None = None
class ProjectUpdate(BaseModel):
name: str | None = None
external_ref: str | None = None
class Project(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
name: str
external_ref: str | None = None
created_at: datetime
updated_at: datetime
class WorkCreate(BaseModel):
project_id: UUID | None = None
type: str
description: str
payload: dict[str, Any] | None = None
priority: int = Field(default=3, ge=1, le=5)
assigned_agent: str | None = None
created_by: str | None = None
class WorkUpdate(BaseModel):
status: Status | None = None
outcome: Outcome | None = None
notes: str | None = None
assigned_agent: str | None = None
class DispatchLog(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
work_item_id: UUID
dispatched_at: datetime
agent: str
completed_at: datetime | None = None
outcome: Outcome | None = None
class WorkItem(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: UUID
project_id: UUID | None = None
type: str
description: str
payload: dict[str, Any] | None = None
priority: int
status: Status
assigned_agent: str | None = None
created_by: str | None = None
created_at: datetime
updated_at: datetime
completed_at: datetime | None = None
outcome: Outcome | None = None
notes: str | None = None
dispatch_log: list[DispatchLog] = []

1
app/routers/__init__.py Normal file
View File

@@ -0,0 +1 @@
# routers package

63
app/routers/projects.py Normal file
View File

@@ -0,0 +1,63 @@
from uuid import uuid4
from fastapi import APIRouter, HTTPException
from app.db import fetch_project_or_404, get_conn, utcnow
from app.models import Project, ProjectCreate, ProjectUpdate
router = APIRouter(prefix="/projects", tags=["projects"])
@router.post("", response_model=Project, status_code=201)
def create_project(payload: ProjectCreate) -> Project:
if not payload.name.strip():
raise HTTPException(status_code=400, detail="name is required")
now = utcnow()
record = {
"id": str(uuid4()),
"name": payload.name.strip(),
"external_ref": payload.external_ref,
"created_at": now,
"updated_at": now,
}
with get_conn() as conn:
conn.execute(
"INSERT INTO projects (id, name, external_ref, created_at, updated_at) VALUES (%s, %s, %s, %s, %s)",
(record["id"], record["name"], record["external_ref"], record["created_at"], record["updated_at"]),
)
conn.commit()
return Project.model_validate(record)
@router.get("", response_model=list[Project])
def list_projects() -> list[Project]:
with get_conn() as conn:
rows = conn.execute(
"SELECT id, name, external_ref, created_at, updated_at FROM projects ORDER BY created_at ASC"
).fetchall()
return [Project.model_validate(row) for row in rows]
@router.get("/{project_id}", response_model=Project)
def get_project(project_id: str) -> Project:
with get_conn() as conn:
return Project.model_validate(fetch_project_or_404(conn, project_id))
@router.patch("/{project_id}", response_model=Project)
def update_project(project_id: str, payload: ProjectUpdate) -> Project:
with get_conn() as conn:
current = fetch_project_or_404(conn, project_id)
if payload.name is not None:
if not payload.name.strip():
raise HTTPException(status_code=400, detail="name cannot be empty")
current["name"] = payload.name.strip()
if payload.external_ref is not None:
current["external_ref"] = payload.external_ref
current["updated_at"] = utcnow()
conn.execute(
"UPDATE projects SET name = %s, external_ref = %s, updated_at = %s WHERE id = %s",
(current["name"], current["external_ref"], current["updated_at"], project_id),
)
conn.commit()
return Project.model_validate(current)

188
app/routers/work.py Normal file
View File

@@ -0,0 +1,188 @@
from __future__ import annotations
import json
from uuid import uuid4
from fastapi import APIRouter, HTTPException, Response
from psycopg.errors import UniqueViolation
from app.db import (
TERMINAL_STATUSES,
complete_dispatch_log,
create_dispatch_log,
fetch_dispatch_log,
fetch_project_or_404,
fetch_work_or_404,
get_conn,
utcnow,
validate_transition,
)
from app.models import DispatchLog, WorkCreate, WorkItem, WorkUpdate
router = APIRouter(prefix="/work", tags=["work"])
@router.post("", response_model=WorkItem, status_code=201)
def create_work(payload: WorkCreate) -> WorkItem:
if not payload.type.strip() or not payload.description.strip():
raise HTTPException(status_code=400, detail="type and description are required")
now = utcnow()
record = {
"id": str(uuid4()),
"project_id": str(payload.project_id) if payload.project_id else None,
"type": payload.type.strip(),
"description": payload.description.strip(),
"payload": payload.payload,
"priority": payload.priority,
"status": "queued",
"assigned_agent": payload.assigned_agent,
"created_by": payload.created_by,
"created_at": now,
"updated_at": now,
"completed_at": None,
"outcome": None,
"notes": None,
"dispatch_log": [],
}
with get_conn() as conn:
if record["project_id"]:
fetch_project_or_404(conn, record["project_id"])
conn.execute(
"""
INSERT INTO work_items (
id, project_id, type, description, payload, priority, status,
assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes
) VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
record["id"],
record["project_id"],
record["type"],
record["description"],
json.dumps(record["payload"]) if record["payload"] is not None else None,
record["priority"],
record["status"],
record["assigned_agent"],
record["created_by"],
record["created_at"],
record["updated_at"],
None,
None,
None,
),
)
conn.commit()
return WorkItem.model_validate(record)
@router.get("", response_model=list[WorkItem])
def list_work(status: str | None = None, agent: str | None = None, project_id: str | None = None, since: str | None = None) -> list[WorkItem]:
clauses: list[str] = []
params: list[object] = []
if status:
clauses.append("status = %s")
params.append(status)
if agent:
clauses.append("assigned_agent = %s")
params.append(agent)
if project_id:
clauses.append("project_id = %s")
params.append(project_id)
if since:
clauses.append("created_at > %s::timestamptz")
params.append(since)
query = """
SELECT id, project_id, type, description, payload, priority, status, assigned_agent,
created_by, created_at, updated_at, completed_at, outcome, notes
FROM work_items
"""
if clauses:
query += " WHERE " + " AND ".join(clauses)
query += " ORDER BY created_at ASC" if since else " ORDER BY priority ASC, created_at ASC"
with get_conn() as conn:
rows = conn.execute(query, tuple(params)).fetchall()
return [WorkItem.model_validate({**row, "dispatch_log": []}) for row in rows]
@router.get("/{work_id}", response_model=WorkItem)
def get_work(work_id: str) -> WorkItem:
with get_conn() as conn:
row = fetch_work_or_404(conn, work_id)
row["dispatch_log"] = fetch_dispatch_log(conn, work_id)
return WorkItem.model_validate(row)
@router.patch("/{work_id}", response_model=WorkItem)
def update_work(work_id: str, payload: WorkUpdate) -> WorkItem:
with get_conn() as conn:
current = fetch_work_or_404(conn, work_id)
if payload.assigned_agent is not None:
current["assigned_agent"] = payload.assigned_agent
if payload.notes is not None:
current["notes"] = payload.notes
requested_status = payload.status
if requested_status is not None:
validate_transition(current["status"], requested_status, current.get("assigned_agent"), payload.outcome, payload.notes)
current["status"] = requested_status
if requested_status in TERMINAL_STATUSES:
current["completed_at"] = utcnow()
current["outcome"] = payload.outcome
else:
current["completed_at"] = None
current["outcome"] = None
current["updated_at"] = utcnow()
try:
conn.execute(
"""
UPDATE work_items
SET project_id = %s, type = %s, description = %s, payload = %s::jsonb, priority = %s,
status = %s, assigned_agent = %s, created_by = %s, created_at = %s, updated_at = %s,
completed_at = %s, outcome = %s, notes = %s
WHERE id = %s
""",
(
current["project_id"],
current["type"],
current["description"],
json.dumps(current["payload"]) if current["payload"] is not None else None,
current["priority"],
current["status"],
current["assigned_agent"],
current["created_by"],
current["created_at"],
current["updated_at"],
current["completed_at"],
current["outcome"],
current["notes"],
work_id,
),
)
if requested_status == "dispatched":
create_dispatch_log(conn, work_id, current["assigned_agent"], current["updated_at"])
if requested_status in TERMINAL_STATUSES:
complete_dispatch_log(conn, work_id, current["completed_at"], current["outcome"])
conn.commit()
except UniqueViolation:
conn.rollback()
raise HTTPException(status_code=409, detail="agent already has an in_progress work item") from None
current["dispatch_log"] = fetch_dispatch_log(conn, work_id)
return WorkItem.model_validate(current)
@router.delete("/{work_id}", status_code=204)
def delete_work(work_id: str) -> Response:
with get_conn() as conn:
current = fetch_work_or_404(conn, work_id)
if current["status"] not in {"queued", "dispatched"}:
raise HTTPException(status_code=400, detail="only queued or dispatched items can be cancelled")
current["status"] = "cancelled"
current["updated_at"] = utcnow()
current["completed_at"] = utcnow()
current["outcome"] = "cancelled"
conn.execute(
"UPDATE work_items SET status = %s, updated_at = %s, completed_at = %s, outcome = %s WHERE id = %s",
(current["status"], current["updated_at"], current["completed_at"], current["outcome"], work_id),
)
complete_dispatch_log(conn, work_id, current["completed_at"], current["outcome"])
conn.commit()
return Response(status_code=204)