7.5 KiB
Work Queue API — Specification (Python Rewrite)
Overview
A lightweight internal API that tracks the full lifecycle of work items across TheLab agents. Marcus A. (main dispatcher) submits work; agents poll for their queue and update status; Marcus monitors for exceptions.
Tech Stack
- Language: Python 3.12+
- Package manager: uv
- Web framework: FastAPI (or Flask if preferred)
- Database: PostgreSQL
- Docs: MkDocs
- Container: Docker, pushed to
git.danhenry.dev/thelab/work-queue-api
Database Schema (PostgreSQL)
Table: projects
| Column | Type | Notes |
|---|---|---|
| id | UUID PRIMARY KEY | |
| name | TEXT NOT NULL | Human-readable project name |
| external_ref | TEXT | Todoist project ID, GitHub repo, etc. (optional) |
| created_at | TIMESTAMPTZ | ISO8601 |
| updated_at | TIMESTAMPTZ | ISO8601 |
Table: work_items
| Column | Type | Notes |
|---|---|---|
| id | UUID PRIMARY KEY | |
| project_id | UUID FK | References projects.id (optional) |
| type | TEXT NOT NULL | e.g. code_review, bug_fix, infra_setup, gitea_issue |
| description | TEXT NOT NULL | Human-readable summary |
| payload | JSONB | Type-specific fields |
| priority | INTEGER | 1-5, 1=highest. Default 3 |
| status | TEXT NOT NULL | See Status Lifecycle below |
| assigned_agent | TEXT | e.g. steve-w. NULL until dispatched |
| created_by | TEXT | e.g. marcus-a, gitea-watcher, bms-ticket-workflow |
| created_at | TIMESTAMPTZ | ISO8601 |
| updated_at | TIMESTAMPTZ | ISO8601 |
| completed_at | TIMESTAMPTZ | ISO8601, set when status → completed/failed/cancelled |
| outcome | TEXT | success, failed, cancelled, NULL |
| notes | TEXT | Agent-added notes, URLs, context |
Status Lifecycle
queued → dispatched → in_progress → completed
↘ blocked
↘ failed
↘ cancelled (from queued or dispatched only)
queued— New work, waiting for Marcus to dispatchdispatched— Marcus has assigned to an agent (agent has not yet picked it up)in_progress— Agent acknowledged and is working itblocked— Agent hit a holding condition (waiting on external input, dependencies, etc.)failed— Agent attempted but hit an unrecoverable errorcompleted— Agent finished successfully; Marcus reviews before marking truly donecancelled— Marcus killed it before work started
Table: dispatch_log
| Column | Type | Notes |
|---|---|---|
| id | UUID PRIMARY KEY | |
| work_item_id | UUID FK | References work_items.id |
| dispatched_at | TIMESTAMPTZ | ISO8601 |
| agent | TEXT | Which agent it was dispatched to |
| completed_at | TIMESTAMPTZ | ISO8601, when status reached terminal state |
| outcome | TEXT | success, failed, cancelled |
Constraints
- One
in_progresswork item per agent at any time (enforced via DB constraint or application logic) completed_atandoutcomecan only be set when status is terminal (completed,failed,cancelled)
API Endpoints
Projects
POST /projects — create project
GET /projects — list all
GET /projects/:id — get one
PATCH /projects/:id — update name or external_ref
Work Items
POST /work — create item (status=queued on create)
GET /work — list with filters: ?status=, ?agent=, ?project_id=, ?since=
Sort: priority ASC, created_at ASC
GET /work/:id — single item with dispatch history
PATCH /work/:id — update status, outcome, notes, assigned_agent
DELETE /work/:id — cancel (status=cancelled). Returns 204.
Monitoring (for Marcus heartbeat)
GET /work?status=in_progress — what's being worked on right now
GET /work?status=blocked — items that need intervention
GET /work?status=failed — items that need review
GET /work?status=completed&since=<ts> — completed since last check
Dispatch Flow
- Marcus POSTs /work → status=
queued - Marcus PATCHs /work/:id with status=
dispatched, assigned_agent=steve-w - (optional) Immediately PATCH to status=
in_progress - Steve polls GET /work?agent=steve-w&status=dispatched, PATCHes to in_progress, works, PATCHes completed
Stale Task Detection
Marcus's heartbeat checks in_progress items. If any item has updated_at older than 30 minutes, Marcus marks it blocked and alerts Daniel.
Project Structure
work-queue-api/
├── SPEC.md
├── Dockerfile
├── docker-compose.yml ← must include postgres container
├── .github/
│ └── workflows/
│ └── ci.yml ← build + push to git.danhenry.dev/thelab/work-queue-api:latest
├── pyproject.toml / uv project files
├── mkdocs.yml ← MkDocs configuration
├── docs/
│ └── index.md ← Docker Compose example + usage docs
├── app/
│ ├── __init__.py
│ ├── main.py ← FastAPI app entry
│ ├── config.py ← Settings (DATABASE_URL, PORT, etc.)
│ ├── models.py ← Pydantic models
│ ├── db.py ← PostgreSQL connection
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── projects.py
│ │ └── work.py
│ └── migrations/
│ └── 001_initial.sql
└── tests/
└── test_api.py
CI/CD
GitHub Actions (ci.yml):
- Build Docker image on push to
main - Push to
git.danhenry.dev/thelab/work-queue-api:latest - Tag with short SHA
Docker registry: git.danhenry.dev
Secrets available in thelab org: DOCKER_REGISTRY, DOCKER_USERNAME, DOCKER_PASSWORD
Docker Compose Example (must be in docs AND in docker-compose.yml)
version: '3.8'
services:
api:
image: git.danhenry.dev/thelab/work-queue-api:latest
ports:
- "8080:8080"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/work_queue
- PORT=8080
depends_on:
db:
condition: service_healthy
restart: unless-stopped
healthcheck:
test: ["CMD", "wget", "-qO-", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
db:
image: postgres:16-alpine
environment:
- POSTGRES_PASSWORD=password
- POSTGRES_DB=work_queue
volumes:
- ./data/postgres:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5
Docs (MkDocs)
mkdocs.yml with:
site_name: Work Queue APIrepo_url: https://git.danhenry.dev/thelab/work-queue-api- Nav structure: Getting Started, API Reference, Docker Compose
docs/index.md must include:
- Overview
- Docker Compose full example (the postgres version)
- Quick start
- API endpoint reference
- Status lifecycle diagram
Skills to build after API is done
For Marcus (main dispatcher):
work-queueskill: thin wrapper around HTTP callswork_add(type, description, payload, agent, project_id?)→ POST /workwork_dispatch(work_item_id, agent)→ PATCH status=dispatched+in_progresswork_update(work_item_id, status, outcome?, notes?)→ PATCHwork_list(status?, agent?, project_id?)→ GET /workwork_stale_check()→ poll in_progress, timeout stale items
For Steve's agent:
- Polling skill: every N minutes, GET /work?agent=steve-w&status=dispatched, pick up items