feat: scaffold work queue API
This commit is contained in:
38
cmd/work-queue-api/main.go
Normal file
38
cmd/work-queue-api/main.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"work-queue-api/internal/api"
|
||||
"work-queue-api/internal/db"
|
||||
)
|
||||
|
||||
func main() {
|
||||
port := os.Getenv("PORT")
|
||||
if port == "" {
|
||||
port = "8080"
|
||||
}
|
||||
|
||||
databaseURL := os.Getenv("DATABASE_URL")
|
||||
if databaseURL == "" {
|
||||
databaseURL = "./work_queue.db"
|
||||
}
|
||||
|
||||
sqliteDB, err := db.Open(databaseURL)
|
||||
if err != nil {
|
||||
log.Fatalf("open db: %v", err)
|
||||
}
|
||||
defer sqliteDB.Close()
|
||||
|
||||
server, err := api.NewServer(sqliteDB)
|
||||
if err != nil {
|
||||
log.Fatalf("build server: %v", err)
|
||||
}
|
||||
|
||||
log.Printf("work queue api listening on :%s", port)
|
||||
if err := http.ListenAndServe(":"+port, server.Router()); err != nil {
|
||||
log.Fatalf("serve: %v", err)
|
||||
}
|
||||
}
|
||||
9
go.mod
Normal file
9
go.mod
Normal file
@@ -0,0 +1,9 @@
|
||||
module work-queue-api
|
||||
|
||||
go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/go-chi/chi/v5 v5.2.3
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/mattn/go-sqlite3 v1.14.32
|
||||
)
|
||||
6
go.sum
Normal file
6
go.sum
Normal file
@@ -0,0 +1,6 @@
|
||||
github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE=
|
||||
github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
|
||||
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
132
internal/api/handlers_projects.go
Normal file
132
internal/api/handlers_projects.go
Normal file
@@ -0,0 +1,132 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"work-queue-api/internal/model"
|
||||
)
|
||||
|
||||
type createProjectRequest struct {
|
||||
Name string `json:"name"`
|
||||
ExternalRef *string `json:"external_ref"`
|
||||
}
|
||||
|
||||
type patchProjectRequest struct {
|
||||
Name *string `json:"name"`
|
||||
ExternalRef *string `json:"external_ref"`
|
||||
}
|
||||
|
||||
func (s *Server) handleCreateProject(w http.ResponseWriter, r *http.Request) {
|
||||
var req createProjectRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json body")
|
||||
return
|
||||
}
|
||||
if req.Name == "" {
|
||||
writeError(w, http.StatusBadRequest, "name is required")
|
||||
return
|
||||
}
|
||||
|
||||
now := nowUTC()
|
||||
project := model.Project{
|
||||
ID: uuid.NewString(),
|
||||
Name: req.Name,
|
||||
ExternalRef: req.ExternalRef,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
_, err := s.db.Exec(`INSERT INTO projects (id, name, external_ref, created_at, updated_at) VALUES (?, ?, ?, ?, ?)`,
|
||||
project.ID, project.Name, project.ExternalRef, project.CreatedAt.Format(timeLayout), project.UpdatedAt.Format(timeLayout),
|
||||
)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, project)
|
||||
}
|
||||
|
||||
func (s *Server) handleListProjects(w http.ResponseWriter, r *http.Request) {
|
||||
rows, err := s.db.Query(`SELECT id, name, external_ref, created_at, updated_at FROM projects ORDER BY created_at ASC`)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var projects []model.Project
|
||||
for rows.Next() {
|
||||
project, err := scanProject(rows)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
projects = append(projects, project)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, projects)
|
||||
}
|
||||
|
||||
func (s *Server) handleGetProject(w http.ResponseWriter, r *http.Request) {
|
||||
project, err := s.fetchProject(chi.URLParam(r, "id"))
|
||||
if err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusNotFound, "project not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
writeJSON(w, http.StatusOK, project)
|
||||
}
|
||||
|
||||
func (s *Server) handlePatchProject(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
project, err := s.fetchProject(id)
|
||||
if err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusNotFound, "project not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var req patchProjectRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.Name != nil {
|
||||
if *req.Name == "" {
|
||||
writeError(w, http.StatusBadRequest, "name cannot be empty")
|
||||
return
|
||||
}
|
||||
project.Name = *req.Name
|
||||
}
|
||||
if req.ExternalRef != nil {
|
||||
project.ExternalRef = req.ExternalRef
|
||||
}
|
||||
project.UpdatedAt = nowUTC()
|
||||
|
||||
_, err = s.db.Exec(`UPDATE projects SET name = ?, external_ref = ?, updated_at = ? WHERE id = ?`,
|
||||
project.Name, project.ExternalRef, project.UpdatedAt.Format(timeLayout), project.ID,
|
||||
)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, project)
|
||||
}
|
||||
|
||||
func (s *Server) fetchProject(id string) (model.Project, error) {
|
||||
row := s.db.QueryRow(`SELECT id, name, external_ref, created_at, updated_at FROM projects WHERE id = ?`, id)
|
||||
return scanProject(row)
|
||||
}
|
||||
473
internal/api/handlers_work.go
Normal file
473
internal/api/handlers_work.go
Normal file
@@ -0,0 +1,473 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"work-queue-api/internal/model"
|
||||
)
|
||||
|
||||
const timeLayout = time.RFC3339
|
||||
|
||||
var terminalStatuses = map[string]bool{"completed": true, "failed": true, "cancelled": true}
|
||||
|
||||
var validTransitions = map[string]map[string]bool{
|
||||
"queued": {"dispatched": true, "cancelled": true},
|
||||
"dispatched": {"in_progress": true, "cancelled": true},
|
||||
"in_progress": {"blocked": true, "completed": true, "failed": true},
|
||||
"blocked": {"in_progress": true, "failed": true, "completed": true},
|
||||
"completed": {},
|
||||
"failed": {},
|
||||
"cancelled": {},
|
||||
}
|
||||
|
||||
type createWorkRequest struct {
|
||||
ProjectID *string `json:"project_id"`
|
||||
Type string `json:"type"`
|
||||
Description string `json:"description"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
Priority *int `json:"priority"`
|
||||
AssignedAgent *string `json:"assigned_agent"`
|
||||
CreatedBy *string `json:"created_by"`
|
||||
}
|
||||
|
||||
type patchWorkRequest struct {
|
||||
Status *string `json:"status"`
|
||||
Outcome *string `json:"outcome"`
|
||||
Notes *string `json:"notes"`
|
||||
AssignedAgent *string `json:"assigned_agent"`
|
||||
}
|
||||
|
||||
func (s *Server) handleCreateWork(w http.ResponseWriter, r *http.Request) {
|
||||
var req createWorkRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json body")
|
||||
return
|
||||
}
|
||||
if req.Type == "" || req.Description == "" {
|
||||
writeError(w, http.StatusBadRequest, "type and description are required")
|
||||
return
|
||||
}
|
||||
priority := 3
|
||||
if req.Priority != nil {
|
||||
priority = *req.Priority
|
||||
}
|
||||
if priority < 1 || priority > 5 {
|
||||
writeError(w, http.StatusBadRequest, "priority must be between 1 and 5")
|
||||
return
|
||||
}
|
||||
if req.ProjectID != nil {
|
||||
if _, err := s.fetchProject(*req.ProjectID); err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusBadRequest, "project_id does not exist")
|
||||
return
|
||||
} else if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
now := nowUTC()
|
||||
item := model.WorkItem{
|
||||
ID: uuid.NewString(),
|
||||
ProjectID: req.ProjectID,
|
||||
Type: req.Type,
|
||||
Description: req.Description,
|
||||
Priority: priority,
|
||||
Status: "queued",
|
||||
AssignedAgent: req.AssignedAgent,
|
||||
CreatedBy: req.CreatedBy,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
payloadText := nullablePayload(req.Payload)
|
||||
if len(req.Payload) > 0 {
|
||||
_ = json.Unmarshal(req.Payload, &item.Payload)
|
||||
}
|
||||
|
||||
_, err := s.db.Exec(`INSERT INTO work_items (id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)`,
|
||||
item.ID, item.ProjectID, item.Type, item.Description, payloadText, item.Priority, item.Status, item.AssignedAgent, item.CreatedBy,
|
||||
item.CreatedAt.Format(timeLayout), item.UpdatedAt.Format(timeLayout),
|
||||
)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, item)
|
||||
}
|
||||
|
||||
func (s *Server) handleListWork(w http.ResponseWriter, r *http.Request) {
|
||||
query := `SELECT id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes FROM work_items`
|
||||
var where []string
|
||||
var args []any
|
||||
|
||||
if status := r.URL.Query().Get("status"); status != "" {
|
||||
where = append(where, "status = ?")
|
||||
args = append(args, status)
|
||||
}
|
||||
if agent := r.URL.Query().Get("agent"); agent != "" {
|
||||
where = append(where, "assigned_agent = ?")
|
||||
args = append(args, agent)
|
||||
}
|
||||
if projectID := r.URL.Query().Get("project_id"); projectID != "" {
|
||||
where = append(where, "project_id = ?")
|
||||
args = append(args, projectID)
|
||||
}
|
||||
if since := r.URL.Query().Get("since"); since != "" {
|
||||
if _, err := time.Parse(timeLayout, since); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "since must be ISO8601/RFC3339")
|
||||
return
|
||||
}
|
||||
where = append(where, "created_at > ?")
|
||||
args = append(args, since)
|
||||
}
|
||||
if len(where) > 0 {
|
||||
query += " WHERE " + strings.Join(where, " AND ")
|
||||
}
|
||||
if r.URL.Query().Get("since") != "" {
|
||||
query += " ORDER BY created_at ASC"
|
||||
} else {
|
||||
query += " ORDER BY priority ASC, created_at ASC"
|
||||
}
|
||||
|
||||
rows, err := s.db.Query(query, args...)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var items []model.WorkItem
|
||||
for rows.Next() {
|
||||
item, err := scanWorkItem(rows)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, items)
|
||||
}
|
||||
|
||||
func (s *Server) handleGetWork(w http.ResponseWriter, r *http.Request) {
|
||||
item, err := s.fetchWorkItem(chi.URLParam(r, "id"))
|
||||
if err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusNotFound, "work item not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logs, err := s.fetchDispatchLogs(item.ID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
item.DispatchLog = logs
|
||||
|
||||
writeJSON(w, http.StatusOK, item)
|
||||
}
|
||||
|
||||
func (s *Server) handlePatchWork(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
item, err := s.fetchWorkItem(id)
|
||||
if err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusNotFound, "work item not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var req patchWorkRequest
|
||||
if err := decodeJSON(r, &req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid json body")
|
||||
return
|
||||
}
|
||||
|
||||
if req.AssignedAgent != nil {
|
||||
item.AssignedAgent = req.AssignedAgent
|
||||
}
|
||||
if req.Notes != nil {
|
||||
item.Notes = req.Notes
|
||||
}
|
||||
|
||||
if req.Status != nil {
|
||||
if err := applyStatusTransition(&item, *req.Status, req.Outcome, req.Notes); err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
item.UpdatedAt = nowUTC()
|
||||
|
||||
if err := s.persistPatchedWork(item, req.Status); err != nil {
|
||||
if isUniqueConstraint(err) {
|
||||
writeError(w, http.StatusConflict, "agent already has an in_progress work item")
|
||||
return
|
||||
}
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, item)
|
||||
}
|
||||
|
||||
func (s *Server) handleDeleteWork(w http.ResponseWriter, r *http.Request) {
|
||||
item, err := s.fetchWorkItem(chi.URLParam(r, "id"))
|
||||
if err == sql.ErrNoRows {
|
||||
writeError(w, http.StatusNotFound, "work item not found")
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
if item.Status != "queued" && item.Status != "dispatched" {
|
||||
writeError(w, http.StatusBadRequest, "only queued or dispatched items can be cancelled")
|
||||
return
|
||||
}
|
||||
outcome := "cancelled"
|
||||
if err := applyStatusTransition(&item, "cancelled", &outcome, nil); err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
item.UpdatedAt = nowUTC()
|
||||
|
||||
if err := s.persistPatchedWork(item, ptr("cancelled")); err != nil {
|
||||
writeError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func applyStatusTransition(item *model.WorkItem, nextStatus string, outcome *string, notes *string) error {
|
||||
nextStatus = strings.TrimSpace(nextStatus)
|
||||
if !validTransitions[item.Status][nextStatus] {
|
||||
return fmt.Errorf("invalid status transition: %s -> %s", item.Status, nextStatus)
|
||||
}
|
||||
if item.Status == "queued" && nextStatus == "dispatched" && item.AssignedAgent == nil {
|
||||
return errors.New("assigned_agent is required for queued -> dispatched")
|
||||
}
|
||||
if nextStatus == "blocked" && (notes == nil || strings.TrimSpace(*notes) == "") {
|
||||
return errors.New("notes are required when status is blocked")
|
||||
}
|
||||
if (nextStatus == "completed" || nextStatus == "failed" || nextStatus == "cancelled") && outcome == nil {
|
||||
return errors.New("outcome is required for terminal statuses")
|
||||
}
|
||||
if nextStatus == "completed" && *outcome != "success" {
|
||||
return errors.New("completed requires outcome=success")
|
||||
}
|
||||
if nextStatus == "failed" && *outcome != "failed" {
|
||||
return errors.New("failed requires outcome=failed")
|
||||
}
|
||||
if nextStatus == "cancelled" && *outcome != "cancelled" {
|
||||
return errors.New("cancelled requires outcome=cancelled")
|
||||
}
|
||||
|
||||
item.Status = nextStatus
|
||||
if terminalStatuses[nextStatus] {
|
||||
now := nowUTC()
|
||||
item.CompletedAt = &now
|
||||
item.Outcome = outcome
|
||||
} else {
|
||||
item.CompletedAt = nil
|
||||
item.Outcome = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) persistPatchedWork(item model.WorkItem, requestedStatus *string) error {
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
_, err = tx.Exec(`UPDATE work_items
|
||||
SET project_id = ?, type = ?, description = ?, payload = ?, priority = ?, status = ?, assigned_agent = ?, created_by = ?, created_at = ?, updated_at = ?, completed_at = ?, outcome = ?, notes = ?
|
||||
WHERE id = ?`,
|
||||
item.ProjectID, item.Type, item.Description, marshalPayload(item.Payload), item.Priority, item.Status, item.AssignedAgent, item.CreatedBy,
|
||||
item.CreatedAt.Format(timeLayout), item.UpdatedAt.Format(timeLayout), formatNullTime(item.CompletedAt), item.Outcome, item.Notes, item.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if requestedStatus != nil && *requestedStatus == "dispatched" {
|
||||
_, err = tx.Exec(`INSERT INTO dispatch_log (id, work_item_id, dispatched_at, agent, completed_at, outcome) VALUES (?, ?, ?, ?, NULL, NULL)`,
|
||||
uuid.NewString(), item.ID, item.UpdatedAt.Format(timeLayout), deref(item.AssignedAgent),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if requestedStatus != nil && terminalStatuses[*requestedStatus] {
|
||||
_, err = tx.Exec(`UPDATE dispatch_log SET completed_at = ?, outcome = ? WHERE work_item_id = ? AND completed_at IS NULL`,
|
||||
formatNullTime(item.CompletedAt), item.Outcome, item.ID,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *Server) fetchWorkItem(id string) (model.WorkItem, error) {
|
||||
row := s.db.QueryRow(`SELECT id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes FROM work_items WHERE id = ?`, id)
|
||||
return scanWorkItem(row)
|
||||
}
|
||||
|
||||
func (s *Server) fetchDispatchLogs(workItemID string) ([]model.DispatchLog, error) {
|
||||
rows, err := s.db.Query(`SELECT id, work_item_id, dispatched_at, agent, completed_at, outcome FROM dispatch_log WHERE work_item_id = ? ORDER BY dispatched_at ASC`, workItemID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var logs []model.DispatchLog
|
||||
for rows.Next() {
|
||||
var log model.DispatchLog
|
||||
var dispatchedAt, completedAt sql.NullString
|
||||
var outcome sql.NullString
|
||||
if err := rows.Scan(&log.ID, &log.WorkItemID, &dispatchedAt, &log.Agent, &completedAt, &outcome); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.DispatchedAt, err = time.Parse(timeLayout, dispatchedAt.String)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if completedAt.Valid {
|
||||
parsed, err := time.Parse(timeLayout, completedAt.String)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.CompletedAt = &parsed
|
||||
}
|
||||
if outcome.Valid {
|
||||
log.Outcome = &outcome.String
|
||||
}
|
||||
logs = append(logs, log)
|
||||
}
|
||||
return logs, rows.Err()
|
||||
}
|
||||
|
||||
func scanProject(scanner interface{ Scan(dest ...any) error }) (model.Project, error) {
|
||||
var project model.Project
|
||||
var externalRef sql.NullString
|
||||
var createdAt, updatedAt string
|
||||
if err := scanner.Scan(&project.ID, &project.Name, &externalRef, &createdAt, &updatedAt); err != nil {
|
||||
return project, err
|
||||
}
|
||||
if externalRef.Valid {
|
||||
project.ExternalRef = &externalRef.String
|
||||
}
|
||||
var err error
|
||||
project.CreatedAt, err = time.Parse(timeLayout, createdAt)
|
||||
if err != nil {
|
||||
return project, err
|
||||
}
|
||||
project.UpdatedAt, err = time.Parse(timeLayout, updatedAt)
|
||||
return project, err
|
||||
}
|
||||
|
||||
func scanWorkItem(scanner interface{ Scan(dest ...any) error }) (model.WorkItem, error) {
|
||||
var item model.WorkItem
|
||||
var projectID, payload, assignedAgent, createdBy, completedAt, outcome, notes sql.NullString
|
||||
var createdAt, updatedAt string
|
||||
if err := scanner.Scan(&item.ID, &projectID, &item.Type, &item.Description, &payload, &item.Priority, &item.Status, &assignedAgent, &createdBy, &createdAt, &updatedAt, &completedAt, &outcome, ¬es); err != nil {
|
||||
return item, err
|
||||
}
|
||||
if projectID.Valid {
|
||||
item.ProjectID = &projectID.String
|
||||
}
|
||||
if payload.Valid && payload.String != "" {
|
||||
var decoded any
|
||||
if err := json.Unmarshal([]byte(payload.String), &decoded); err == nil {
|
||||
item.Payload = decoded
|
||||
}
|
||||
}
|
||||
if assignedAgent.Valid {
|
||||
item.AssignedAgent = &assignedAgent.String
|
||||
}
|
||||
if createdBy.Valid {
|
||||
item.CreatedBy = &createdBy.String
|
||||
}
|
||||
if notes.Valid {
|
||||
item.Notes = ¬es.String
|
||||
}
|
||||
var err error
|
||||
item.CreatedAt, err = time.Parse(timeLayout, createdAt)
|
||||
if err != nil {
|
||||
return item, err
|
||||
}
|
||||
item.UpdatedAt, err = time.Parse(timeLayout, updatedAt)
|
||||
if err != nil {
|
||||
return item, err
|
||||
}
|
||||
if completedAt.Valid {
|
||||
parsed, err := time.Parse(timeLayout, completedAt.String)
|
||||
if err != nil {
|
||||
return item, err
|
||||
}
|
||||
item.CompletedAt = &parsed
|
||||
}
|
||||
if outcome.Valid {
|
||||
item.Outcome = &outcome.String
|
||||
}
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func nullablePayload(payload json.RawMessage) any {
|
||||
if len(payload) == 0 {
|
||||
return nil
|
||||
}
|
||||
return string(payload)
|
||||
}
|
||||
|
||||
func marshalPayload(payload any) any {
|
||||
if payload == nil {
|
||||
return nil
|
||||
}
|
||||
bytes, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return string(bytes)
|
||||
}
|
||||
|
||||
func formatNullTime(t *time.Time) any {
|
||||
if t == nil {
|
||||
return nil
|
||||
}
|
||||
return t.Format(timeLayout)
|
||||
}
|
||||
|
||||
func ptr(s string) *string { return &s }
|
||||
func deref(s *string) string {
|
||||
if s == nil {
|
||||
return ""
|
||||
}
|
||||
return *s
|
||||
}
|
||||
|
||||
func isUniqueConstraint(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(strings.ToLower(err.Error()), "unique constraint failed")
|
||||
}
|
||||
34
internal/api/middleware.go
Normal file
34
internal/api/middleware.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
func jsonMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
||||
func decodeJSON(r *http.Request, dst any) error {
|
||||
defer r.Body.Close()
|
||||
dec := json.NewDecoder(r.Body)
|
||||
dec.DisallowUnknownFields()
|
||||
return dec.Decode(dst)
|
||||
}
|
||||
|
||||
func writeJSON(w http.ResponseWriter, status int, value any) {
|
||||
w.WriteHeader(status)
|
||||
_ = json.NewEncoder(w).Encode(value)
|
||||
}
|
||||
|
||||
func writeError(w http.ResponseWriter, status int, msg string) {
|
||||
writeJSON(w, status, map[string]string{"error": msg})
|
||||
}
|
||||
|
||||
func nowUTC() time.Time {
|
||||
return time.Now().UTC().Truncate(time.Second)
|
||||
}
|
||||
48
internal/api/server.go
Normal file
48
internal/api/server.go
Normal file
@@ -0,0 +1,48 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/go-chi/chi/v5/middleware"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func NewServer(db *sql.DB) (*Server, error) {
|
||||
return &Server{db: db}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Router() http.Handler {
|
||||
r := chi.NewRouter()
|
||||
r.Use(middleware.RequestID)
|
||||
r.Use(middleware.RealIP)
|
||||
r.Use(middleware.Recoverer)
|
||||
r.Use(jsonMiddleware)
|
||||
|
||||
r.Get("/health", s.handleHealth)
|
||||
|
||||
r.Route("/projects", func(r chi.Router) {
|
||||
r.Post("/", s.handleCreateProject)
|
||||
r.Get("/", s.handleListProjects)
|
||||
r.Get("/{id}", s.handleGetProject)
|
||||
r.Patch("/{id}", s.handlePatchProject)
|
||||
})
|
||||
|
||||
r.Route("/work", func(r chi.Router) {
|
||||
r.Post("/", s.handleCreateWork)
|
||||
r.Get("/", s.handleListWork)
|
||||
r.Get("/{id}", s.handleGetWork)
|
||||
r.Patch("/{id}", s.handlePatchWork)
|
||||
r.Delete("/{id}", s.handleDeleteWork)
|
||||
})
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
49
internal/db/migrations/001_initial.sql
Normal file
49
internal/db/migrations/001_initial.sql
Normal file
@@ -0,0 +1,49 @@
|
||||
PRAGMA foreign_keys = ON;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS projects (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
external_ref TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS work_items (
|
||||
id TEXT PRIMARY KEY,
|
||||
project_id TEXT,
|
||||
type TEXT NOT NULL,
|
||||
description TEXT NOT NULL,
|
||||
payload TEXT,
|
||||
priority INTEGER NOT NULL DEFAULT 3 CHECK(priority BETWEEN 1 AND 5),
|
||||
status TEXT NOT NULL CHECK(status IN ('queued','dispatched','in_progress','blocked','failed','completed','cancelled')),
|
||||
assigned_agent TEXT,
|
||||
created_by TEXT,
|
||||
created_at TEXT NOT NULL,
|
||||
updated_at TEXT NOT NULL,
|
||||
completed_at TEXT,
|
||||
outcome TEXT CHECK(outcome IN ('success','failed','cancelled') OR outcome IS NULL),
|
||||
notes TEXT,
|
||||
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL,
|
||||
CHECK ((status IN ('completed','failed','cancelled') AND completed_at IS NOT NULL AND outcome IS NOT NULL) OR (status NOT IN ('completed','failed','cancelled') AND completed_at IS NULL AND outcome IS NULL))
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_work_items_one_in_progress_per_agent
|
||||
ON work_items(assigned_agent)
|
||||
WHERE status = 'in_progress' AND assigned_agent IS NOT NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_work_items_status ON work_items(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_work_items_agent_status ON work_items(assigned_agent, status);
|
||||
CREATE INDEX IF NOT EXISTS idx_work_items_project_id ON work_items(project_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_work_items_created_at ON work_items(created_at);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS dispatch_log (
|
||||
id TEXT PRIMARY KEY,
|
||||
work_item_id TEXT NOT NULL,
|
||||
dispatched_at TEXT NOT NULL,
|
||||
agent TEXT NOT NULL,
|
||||
completed_at TEXT,
|
||||
outcome TEXT CHECK(outcome IN ('success','failed','cancelled') OR outcome IS NULL),
|
||||
FOREIGN KEY(work_item_id) REFERENCES work_items(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_dispatch_log_work_item_id ON dispatch_log(work_item_id);
|
||||
32
internal/db/sqlite.go
Normal file
32
internal/db/sqlite.go
Normal file
@@ -0,0 +1,32 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
//go:embed migrations/001_initial.sql
|
||||
var initialMigration string
|
||||
|
||||
func Open(databaseURL string) (*sql.DB, error) {
|
||||
if err := os.MkdirAll(filepath.Dir(databaseURL), 0o755); err != nil && filepath.Dir(databaseURL) != "." {
|
||||
return nil, fmt.Errorf("mkdir db dir: %w", err)
|
||||
}
|
||||
|
||||
db, err := sql.Open("sqlite3", databaseURL+"?_foreign_keys=on&_busy_timeout=5000")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, err := db.Exec(initialMigration); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("apply migration: %w", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
38
internal/model/models.go
Normal file
38
internal/model/models.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
type Project struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
ExternalRef *string `json:"external_ref,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
}
|
||||
|
||||
type DispatchLog struct {
|
||||
ID string `json:"id"`
|
||||
WorkItemID string `json:"work_item_id"`
|
||||
DispatchedAt time.Time `json:"dispatched_at"`
|
||||
Agent string `json:"agent"`
|
||||
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
||||
Outcome *string `json:"outcome,omitempty"`
|
||||
}
|
||||
|
||||
type WorkItem struct {
|
||||
ID string `json:"id"`
|
||||
ProjectID *string `json:"project_id,omitempty"`
|
||||
Type string `json:"type"`
|
||||
Description string `json:"description"`
|
||||
Payload any `json:"payload,omitempty"`
|
||||
Priority int `json:"priority"`
|
||||
Status string `json:"status"`
|
||||
AssignedAgent *string `json:"assigned_agent,omitempty"`
|
||||
CreatedBy *string `json:"created_by,omitempty"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
CompletedAt *time.Time `json:"completed_at,omitempty"`
|
||||
Outcome *string `json:"outcome,omitempty"`
|
||||
Notes *string `json:"notes,omitempty"`
|
||||
DispatchLog []DispatchLog `json:"dispatch_log,omitempty"`
|
||||
}
|
||||
Reference in New Issue
Block a user