Files
work-queue-api/internal/api/handlers_work.go
2026-04-11 18:38:52 +00:00

474 lines
14 KiB
Go

package api
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/google/uuid"
"work-queue-api/internal/model"
)
const timeLayout = time.RFC3339
var terminalStatuses = map[string]bool{"completed": true, "failed": true, "cancelled": true}
var validTransitions = map[string]map[string]bool{
"queued": {"dispatched": true, "cancelled": true},
"dispatched": {"in_progress": true, "cancelled": true},
"in_progress": {"blocked": true, "completed": true, "failed": true},
"blocked": {"in_progress": true, "failed": true, "completed": true},
"completed": {},
"failed": {},
"cancelled": {},
}
type createWorkRequest struct {
ProjectID *string `json:"project_id"`
Type string `json:"type"`
Description string `json:"description"`
Payload json.RawMessage `json:"payload"`
Priority *int `json:"priority"`
AssignedAgent *string `json:"assigned_agent"`
CreatedBy *string `json:"created_by"`
}
type patchWorkRequest struct {
Status *string `json:"status"`
Outcome *string `json:"outcome"`
Notes *string `json:"notes"`
AssignedAgent *string `json:"assigned_agent"`
}
func (s *Server) handleCreateWork(w http.ResponseWriter, r *http.Request) {
var req createWorkRequest
if err := decodeJSON(r, &req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
if req.Type == "" || req.Description == "" {
writeError(w, http.StatusBadRequest, "type and description are required")
return
}
priority := 3
if req.Priority != nil {
priority = *req.Priority
}
if priority < 1 || priority > 5 {
writeError(w, http.StatusBadRequest, "priority must be between 1 and 5")
return
}
if req.ProjectID != nil {
if _, err := s.fetchProject(*req.ProjectID); err == sql.ErrNoRows {
writeError(w, http.StatusBadRequest, "project_id does not exist")
return
} else if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
}
now := nowUTC()
item := model.WorkItem{
ID: uuid.NewString(),
ProjectID: req.ProjectID,
Type: req.Type,
Description: req.Description,
Priority: priority,
Status: "queued",
AssignedAgent: req.AssignedAgent,
CreatedBy: req.CreatedBy,
CreatedAt: now,
UpdatedAt: now,
}
payloadText := nullablePayload(req.Payload)
if len(req.Payload) > 0 {
_ = json.Unmarshal(req.Payload, &item.Payload)
}
_, err := s.db.Exec(`INSERT INTO work_items (id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)`,
item.ID, item.ProjectID, item.Type, item.Description, payloadText, item.Priority, item.Status, item.AssignedAgent, item.CreatedBy,
item.CreatedAt.Format(timeLayout), item.UpdatedAt.Format(timeLayout),
)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusCreated, item)
}
func (s *Server) handleListWork(w http.ResponseWriter, r *http.Request) {
query := `SELECT id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes FROM work_items`
var where []string
var args []any
if status := r.URL.Query().Get("status"); status != "" {
where = append(where, "status = ?")
args = append(args, status)
}
if agent := r.URL.Query().Get("agent"); agent != "" {
where = append(where, "assigned_agent = ?")
args = append(args, agent)
}
if projectID := r.URL.Query().Get("project_id"); projectID != "" {
where = append(where, "project_id = ?")
args = append(args, projectID)
}
if since := r.URL.Query().Get("since"); since != "" {
if _, err := time.Parse(timeLayout, since); err != nil {
writeError(w, http.StatusBadRequest, "since must be ISO8601/RFC3339")
return
}
where = append(where, "created_at > ?")
args = append(args, since)
}
if len(where) > 0 {
query += " WHERE " + strings.Join(where, " AND ")
}
if r.URL.Query().Get("since") != "" {
query += " ORDER BY created_at ASC"
} else {
query += " ORDER BY priority ASC, created_at ASC"
}
rows, err := s.db.Query(query, args...)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
defer rows.Close()
var items []model.WorkItem
for rows.Next() {
item, err := scanWorkItem(rows)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
items = append(items, item)
}
writeJSON(w, http.StatusOK, items)
}
func (s *Server) handleGetWork(w http.ResponseWriter, r *http.Request) {
item, err := s.fetchWorkItem(chi.URLParam(r, "id"))
if err == sql.ErrNoRows {
writeError(w, http.StatusNotFound, "work item not found")
return
}
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
logs, err := s.fetchDispatchLogs(item.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
item.DispatchLog = logs
writeJSON(w, http.StatusOK, item)
}
func (s *Server) handlePatchWork(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
item, err := s.fetchWorkItem(id)
if err == sql.ErrNoRows {
writeError(w, http.StatusNotFound, "work item not found")
return
}
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
var req patchWorkRequest
if err := decodeJSON(r, &req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json body")
return
}
if req.AssignedAgent != nil {
item.AssignedAgent = req.AssignedAgent
}
if req.Notes != nil {
item.Notes = req.Notes
}
if req.Status != nil {
if err := applyStatusTransition(&item, *req.Status, req.Outcome, req.Notes); err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
}
item.UpdatedAt = nowUTC()
if err := s.persistPatchedWork(item, req.Status); err != nil {
if isUniqueConstraint(err) {
writeError(w, http.StatusConflict, "agent already has an in_progress work item")
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, item)
}
func (s *Server) handleDeleteWork(w http.ResponseWriter, r *http.Request) {
item, err := s.fetchWorkItem(chi.URLParam(r, "id"))
if err == sql.ErrNoRows {
writeError(w, http.StatusNotFound, "work item not found")
return
}
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if item.Status != "queued" && item.Status != "dispatched" {
writeError(w, http.StatusBadRequest, "only queued or dispatched items can be cancelled")
return
}
outcome := "cancelled"
if err := applyStatusTransition(&item, "cancelled", &outcome, nil); err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
item.UpdatedAt = nowUTC()
if err := s.persistPatchedWork(item, ptr("cancelled")); err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
w.WriteHeader(http.StatusNoContent)
}
func applyStatusTransition(item *model.WorkItem, nextStatus string, outcome *string, notes *string) error {
nextStatus = strings.TrimSpace(nextStatus)
if !validTransitions[item.Status][nextStatus] {
return fmt.Errorf("invalid status transition: %s -> %s", item.Status, nextStatus)
}
if item.Status == "queued" && nextStatus == "dispatched" && item.AssignedAgent == nil {
return errors.New("assigned_agent is required for queued -> dispatched")
}
if nextStatus == "blocked" && (notes == nil || strings.TrimSpace(*notes) == "") {
return errors.New("notes are required when status is blocked")
}
if (nextStatus == "completed" || nextStatus == "failed" || nextStatus == "cancelled") && outcome == nil {
return errors.New("outcome is required for terminal statuses")
}
if nextStatus == "completed" && *outcome != "success" {
return errors.New("completed requires outcome=success")
}
if nextStatus == "failed" && *outcome != "failed" {
return errors.New("failed requires outcome=failed")
}
if nextStatus == "cancelled" && *outcome != "cancelled" {
return errors.New("cancelled requires outcome=cancelled")
}
item.Status = nextStatus
if terminalStatuses[nextStatus] {
now := nowUTC()
item.CompletedAt = &now
item.Outcome = outcome
} else {
item.CompletedAt = nil
item.Outcome = nil
}
return nil
}
func (s *Server) persistPatchedWork(item model.WorkItem, requestedStatus *string) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
_, err = tx.Exec(`UPDATE work_items
SET project_id = ?, type = ?, description = ?, payload = ?, priority = ?, status = ?, assigned_agent = ?, created_by = ?, created_at = ?, updated_at = ?, completed_at = ?, outcome = ?, notes = ?
WHERE id = ?`,
item.ProjectID, item.Type, item.Description, marshalPayload(item.Payload), item.Priority, item.Status, item.AssignedAgent, item.CreatedBy,
item.CreatedAt.Format(timeLayout), item.UpdatedAt.Format(timeLayout), formatNullTime(item.CompletedAt), item.Outcome, item.Notes, item.ID,
)
if err != nil {
return err
}
if requestedStatus != nil && *requestedStatus == "dispatched" {
_, err = tx.Exec(`INSERT INTO dispatch_log (id, work_item_id, dispatched_at, agent, completed_at, outcome) VALUES (?, ?, ?, ?, NULL, NULL)`,
uuid.NewString(), item.ID, item.UpdatedAt.Format(timeLayout), deref(item.AssignedAgent),
)
if err != nil {
return err
}
}
if requestedStatus != nil && terminalStatuses[*requestedStatus] {
_, err = tx.Exec(`UPDATE dispatch_log SET completed_at = ?, outcome = ? WHERE work_item_id = ? AND completed_at IS NULL`,
formatNullTime(item.CompletedAt), item.Outcome, item.ID,
)
if err != nil {
return err
}
}
return tx.Commit()
}
func (s *Server) fetchWorkItem(id string) (model.WorkItem, error) {
row := s.db.QueryRow(`SELECT id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes FROM work_items WHERE id = ?`, id)
return scanWorkItem(row)
}
func (s *Server) fetchDispatchLogs(workItemID string) ([]model.DispatchLog, error) {
rows, err := s.db.Query(`SELECT id, work_item_id, dispatched_at, agent, completed_at, outcome FROM dispatch_log WHERE work_item_id = ? ORDER BY dispatched_at ASC`, workItemID)
if err != nil {
return nil, err
}
defer rows.Close()
var logs []model.DispatchLog
for rows.Next() {
var log model.DispatchLog
var dispatchedAt, completedAt sql.NullString
var outcome sql.NullString
if err := rows.Scan(&log.ID, &log.WorkItemID, &dispatchedAt, &log.Agent, &completedAt, &outcome); err != nil {
return nil, err
}
log.DispatchedAt, err = time.Parse(timeLayout, dispatchedAt.String)
if err != nil {
return nil, err
}
if completedAt.Valid {
parsed, err := time.Parse(timeLayout, completedAt.String)
if err != nil {
return nil, err
}
log.CompletedAt = &parsed
}
if outcome.Valid {
log.Outcome = &outcome.String
}
logs = append(logs, log)
}
return logs, rows.Err()
}
func scanProject(scanner interface{ Scan(dest ...any) error }) (model.Project, error) {
var project model.Project
var externalRef sql.NullString
var createdAt, updatedAt string
if err := scanner.Scan(&project.ID, &project.Name, &externalRef, &createdAt, &updatedAt); err != nil {
return project, err
}
if externalRef.Valid {
project.ExternalRef = &externalRef.String
}
var err error
project.CreatedAt, err = time.Parse(timeLayout, createdAt)
if err != nil {
return project, err
}
project.UpdatedAt, err = time.Parse(timeLayout, updatedAt)
return project, err
}
func scanWorkItem(scanner interface{ Scan(dest ...any) error }) (model.WorkItem, error) {
var item model.WorkItem
var projectID, payload, assignedAgent, createdBy, completedAt, outcome, notes sql.NullString
var createdAt, updatedAt string
if err := scanner.Scan(&item.ID, &projectID, &item.Type, &item.Description, &payload, &item.Priority, &item.Status, &assignedAgent, &createdBy, &createdAt, &updatedAt, &completedAt, &outcome, &notes); err != nil {
return item, err
}
if projectID.Valid {
item.ProjectID = &projectID.String
}
if payload.Valid && payload.String != "" {
var decoded any
if err := json.Unmarshal([]byte(payload.String), &decoded); err == nil {
item.Payload = decoded
}
}
if assignedAgent.Valid {
item.AssignedAgent = &assignedAgent.String
}
if createdBy.Valid {
item.CreatedBy = &createdBy.String
}
if notes.Valid {
item.Notes = &notes.String
}
var err error
item.CreatedAt, err = time.Parse(timeLayout, createdAt)
if err != nil {
return item, err
}
item.UpdatedAt, err = time.Parse(timeLayout, updatedAt)
if err != nil {
return item, err
}
if completedAt.Valid {
parsed, err := time.Parse(timeLayout, completedAt.String)
if err != nil {
return item, err
}
item.CompletedAt = &parsed
}
if outcome.Valid {
item.Outcome = &outcome.String
}
return item, nil
}
func nullablePayload(payload json.RawMessage) any {
if len(payload) == 0 {
return nil
}
return string(payload)
}
func marshalPayload(payload any) any {
if payload == nil {
return nil
}
bytes, err := json.Marshal(payload)
if err != nil {
return nil
}
return string(bytes)
}
func formatNullTime(t *time.Time) any {
if t == nil {
return nil
}
return t.Format(timeLayout)
}
func ptr(s string) *string { return &s }
func deref(s *string) string {
if s == nil {
return ""
}
return *s
}
func isUniqueConstraint(err error) bool {
if err == nil {
return false
}
return strings.Contains(strings.ToLower(err.Error()), "unique constraint failed")
}