From 87efe766a2fbda3905bc4dc202c1aefe7206f1cb Mon Sep 17 00:00:00 2001 From: Steve W Date: Sat, 11 Apr 2026 18:38:52 +0000 Subject: [PATCH] feat: scaffold work queue API --- cmd/work-queue-api/main.go | 38 ++ go.mod | 9 + go.sum | 6 + internal/api/handlers_projects.go | 132 +++++++ internal/api/handlers_work.go | 473 +++++++++++++++++++++++++ internal/api/middleware.go | 34 ++ internal/api/server.go | 48 +++ internal/db/migrations/001_initial.sql | 49 +++ internal/db/sqlite.go | 32 ++ internal/model/models.go | 38 ++ 10 files changed, 859 insertions(+) create mode 100644 cmd/work-queue-api/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/api/handlers_projects.go create mode 100644 internal/api/handlers_work.go create mode 100644 internal/api/middleware.go create mode 100644 internal/api/server.go create mode 100644 internal/db/migrations/001_initial.sql create mode 100644 internal/db/sqlite.go create mode 100644 internal/model/models.go diff --git a/cmd/work-queue-api/main.go b/cmd/work-queue-api/main.go new file mode 100644 index 0000000..ac554bc --- /dev/null +++ b/cmd/work-queue-api/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "log" + "net/http" + "os" + + "work-queue-api/internal/api" + "work-queue-api/internal/db" +) + +func main() { + port := os.Getenv("PORT") + if port == "" { + port = "8080" + } + + databaseURL := os.Getenv("DATABASE_URL") + if databaseURL == "" { + databaseURL = "./work_queue.db" + } + + sqliteDB, err := db.Open(databaseURL) + if err != nil { + log.Fatalf("open db: %v", err) + } + defer sqliteDB.Close() + + server, err := api.NewServer(sqliteDB) + if err != nil { + log.Fatalf("build server: %v", err) + } + + log.Printf("work queue api listening on :%s", port) + if err := http.ListenAndServe(":"+port, server.Router()); err != nil { + log.Fatalf("serve: %v", err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0d3d140 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module work-queue-api + +go 1.24.0 + +require ( + github.com/go-chi/chi/v5 v5.2.3 + github.com/google/uuid v1.6.0 + github.com/mattn/go-sqlite3 v1.14.32 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..df4eb1f --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= +github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= +github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= diff --git a/internal/api/handlers_projects.go b/internal/api/handlers_projects.go new file mode 100644 index 0000000..74783b0 --- /dev/null +++ b/internal/api/handlers_projects.go @@ -0,0 +1,132 @@ +package api + +import ( + "database/sql" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + + "work-queue-api/internal/model" +) + +type createProjectRequest struct { + Name string `json:"name"` + ExternalRef *string `json:"external_ref"` +} + +type patchProjectRequest struct { + Name *string `json:"name"` + ExternalRef *string `json:"external_ref"` +} + +func (s *Server) handleCreateProject(w http.ResponseWriter, r *http.Request) { + var req createProjectRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + if req.Name == "" { + writeError(w, http.StatusBadRequest, "name is required") + return + } + + now := nowUTC() + project := model.Project{ + ID: uuid.NewString(), + Name: req.Name, + ExternalRef: req.ExternalRef, + CreatedAt: now, + UpdatedAt: now, + } + + _, err := s.db.Exec(`INSERT INTO projects (id, name, external_ref, created_at, updated_at) VALUES (?, ?, ?, ?, ?)`, + project.ID, project.Name, project.ExternalRef, project.CreatedAt.Format(timeLayout), project.UpdatedAt.Format(timeLayout), + ) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusCreated, project) +} + +func (s *Server) handleListProjects(w http.ResponseWriter, r *http.Request) { + rows, err := s.db.Query(`SELECT id, name, external_ref, created_at, updated_at FROM projects ORDER BY created_at ASC`) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer rows.Close() + + var projects []model.Project + for rows.Next() { + project, err := scanProject(rows) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + projects = append(projects, project) + } + + writeJSON(w, http.StatusOK, projects) +} + +func (s *Server) handleGetProject(w http.ResponseWriter, r *http.Request) { + project, err := s.fetchProject(chi.URLParam(r, "id")) + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, "project not found") + return + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusOK, project) +} + +func (s *Server) handlePatchProject(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + project, err := s.fetchProject(id) + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, "project not found") + return + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + var req patchProjectRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + + if req.Name != nil { + if *req.Name == "" { + writeError(w, http.StatusBadRequest, "name cannot be empty") + return + } + project.Name = *req.Name + } + if req.ExternalRef != nil { + project.ExternalRef = req.ExternalRef + } + project.UpdatedAt = nowUTC() + + _, err = s.db.Exec(`UPDATE projects SET name = ?, external_ref = ?, updated_at = ? WHERE id = ?`, + project.Name, project.ExternalRef, project.UpdatedAt.Format(timeLayout), project.ID, + ) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusOK, project) +} + +func (s *Server) fetchProject(id string) (model.Project, error) { + row := s.db.QueryRow(`SELECT id, name, external_ref, created_at, updated_at FROM projects WHERE id = ?`, id) + return scanProject(row) +} diff --git a/internal/api/handlers_work.go b/internal/api/handlers_work.go new file mode 100644 index 0000000..54c3f21 --- /dev/null +++ b/internal/api/handlers_work.go @@ -0,0 +1,473 @@ +package api + +import ( + "database/sql" + "encoding/json" + "errors" + "fmt" + "net/http" + "strings" + "time" + + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + + "work-queue-api/internal/model" +) + +const timeLayout = time.RFC3339 + +var terminalStatuses = map[string]bool{"completed": true, "failed": true, "cancelled": true} + +var validTransitions = map[string]map[string]bool{ + "queued": {"dispatched": true, "cancelled": true}, + "dispatched": {"in_progress": true, "cancelled": true}, + "in_progress": {"blocked": true, "completed": true, "failed": true}, + "blocked": {"in_progress": true, "failed": true, "completed": true}, + "completed": {}, + "failed": {}, + "cancelled": {}, +} + +type createWorkRequest struct { + ProjectID *string `json:"project_id"` + Type string `json:"type"` + Description string `json:"description"` + Payload json.RawMessage `json:"payload"` + Priority *int `json:"priority"` + AssignedAgent *string `json:"assigned_agent"` + CreatedBy *string `json:"created_by"` +} + +type patchWorkRequest struct { + Status *string `json:"status"` + Outcome *string `json:"outcome"` + Notes *string `json:"notes"` + AssignedAgent *string `json:"assigned_agent"` +} + +func (s *Server) handleCreateWork(w http.ResponseWriter, r *http.Request) { + var req createWorkRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + if req.Type == "" || req.Description == "" { + writeError(w, http.StatusBadRequest, "type and description are required") + return + } + priority := 3 + if req.Priority != nil { + priority = *req.Priority + } + if priority < 1 || priority > 5 { + writeError(w, http.StatusBadRequest, "priority must be between 1 and 5") + return + } + if req.ProjectID != nil { + if _, err := s.fetchProject(*req.ProjectID); err == sql.ErrNoRows { + writeError(w, http.StatusBadRequest, "project_id does not exist") + return + } else if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + } + + now := nowUTC() + item := model.WorkItem{ + ID: uuid.NewString(), + ProjectID: req.ProjectID, + Type: req.Type, + Description: req.Description, + Priority: priority, + Status: "queued", + AssignedAgent: req.AssignedAgent, + CreatedBy: req.CreatedBy, + CreatedAt: now, + UpdatedAt: now, + } + payloadText := nullablePayload(req.Payload) + if len(req.Payload) > 0 { + _ = json.Unmarshal(req.Payload, &item.Payload) + } + + _, err := s.db.Exec(`INSERT INTO work_items (id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, NULL, NULL)`, + item.ID, item.ProjectID, item.Type, item.Description, payloadText, item.Priority, item.Status, item.AssignedAgent, item.CreatedBy, + item.CreatedAt.Format(timeLayout), item.UpdatedAt.Format(timeLayout), + ) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusCreated, item) +} + +func (s *Server) handleListWork(w http.ResponseWriter, r *http.Request) { + query := `SELECT id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes FROM work_items` + var where []string + var args []any + + if status := r.URL.Query().Get("status"); status != "" { + where = append(where, "status = ?") + args = append(args, status) + } + if agent := r.URL.Query().Get("agent"); agent != "" { + where = append(where, "assigned_agent = ?") + args = append(args, agent) + } + if projectID := r.URL.Query().Get("project_id"); projectID != "" { + where = append(where, "project_id = ?") + args = append(args, projectID) + } + if since := r.URL.Query().Get("since"); since != "" { + if _, err := time.Parse(timeLayout, since); err != nil { + writeError(w, http.StatusBadRequest, "since must be ISO8601/RFC3339") + return + } + where = append(where, "created_at > ?") + args = append(args, since) + } + if len(where) > 0 { + query += " WHERE " + strings.Join(where, " AND ") + } + if r.URL.Query().Get("since") != "" { + query += " ORDER BY created_at ASC" + } else { + query += " ORDER BY priority ASC, created_at ASC" + } + + rows, err := s.db.Query(query, args...) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + defer rows.Close() + + var items []model.WorkItem + for rows.Next() { + item, err := scanWorkItem(rows) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + items = append(items, item) + } + + writeJSON(w, http.StatusOK, items) +} + +func (s *Server) handleGetWork(w http.ResponseWriter, r *http.Request) { + item, err := s.fetchWorkItem(chi.URLParam(r, "id")) + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, "work item not found") + return + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + logs, err := s.fetchDispatchLogs(item.ID) + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + item.DispatchLog = logs + + writeJSON(w, http.StatusOK, item) +} + +func (s *Server) handlePatchWork(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + item, err := s.fetchWorkItem(id) + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, "work item not found") + return + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + var req patchWorkRequest + if err := decodeJSON(r, &req); err != nil { + writeError(w, http.StatusBadRequest, "invalid json body") + return + } + + if req.AssignedAgent != nil { + item.AssignedAgent = req.AssignedAgent + } + if req.Notes != nil { + item.Notes = req.Notes + } + + if req.Status != nil { + if err := applyStatusTransition(&item, *req.Status, req.Outcome, req.Notes); err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + } + item.UpdatedAt = nowUTC() + + if err := s.persistPatchedWork(item, req.Status); err != nil { + if isUniqueConstraint(err) { + writeError(w, http.StatusConflict, "agent already has an in_progress work item") + return + } + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + writeJSON(w, http.StatusOK, item) +} + +func (s *Server) handleDeleteWork(w http.ResponseWriter, r *http.Request) { + item, err := s.fetchWorkItem(chi.URLParam(r, "id")) + if err == sql.ErrNoRows { + writeError(w, http.StatusNotFound, "work item not found") + return + } + if err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + if item.Status != "queued" && item.Status != "dispatched" { + writeError(w, http.StatusBadRequest, "only queued or dispatched items can be cancelled") + return + } + outcome := "cancelled" + if err := applyStatusTransition(&item, "cancelled", &outcome, nil); err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + item.UpdatedAt = nowUTC() + + if err := s.persistPatchedWork(item, ptr("cancelled")); err != nil { + writeError(w, http.StatusInternalServerError, err.Error()) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func applyStatusTransition(item *model.WorkItem, nextStatus string, outcome *string, notes *string) error { + nextStatus = strings.TrimSpace(nextStatus) + if !validTransitions[item.Status][nextStatus] { + return fmt.Errorf("invalid status transition: %s -> %s", item.Status, nextStatus) + } + if item.Status == "queued" && nextStatus == "dispatched" && item.AssignedAgent == nil { + return errors.New("assigned_agent is required for queued -> dispatched") + } + if nextStatus == "blocked" && (notes == nil || strings.TrimSpace(*notes) == "") { + return errors.New("notes are required when status is blocked") + } + if (nextStatus == "completed" || nextStatus == "failed" || nextStatus == "cancelled") && outcome == nil { + return errors.New("outcome is required for terminal statuses") + } + if nextStatus == "completed" && *outcome != "success" { + return errors.New("completed requires outcome=success") + } + if nextStatus == "failed" && *outcome != "failed" { + return errors.New("failed requires outcome=failed") + } + if nextStatus == "cancelled" && *outcome != "cancelled" { + return errors.New("cancelled requires outcome=cancelled") + } + + item.Status = nextStatus + if terminalStatuses[nextStatus] { + now := nowUTC() + item.CompletedAt = &now + item.Outcome = outcome + } else { + item.CompletedAt = nil + item.Outcome = nil + } + return nil +} + +func (s *Server) persistPatchedWork(item model.WorkItem, requestedStatus *string) error { + tx, err := s.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + _, err = tx.Exec(`UPDATE work_items + SET project_id = ?, type = ?, description = ?, payload = ?, priority = ?, status = ?, assigned_agent = ?, created_by = ?, created_at = ?, updated_at = ?, completed_at = ?, outcome = ?, notes = ? + WHERE id = ?`, + item.ProjectID, item.Type, item.Description, marshalPayload(item.Payload), item.Priority, item.Status, item.AssignedAgent, item.CreatedBy, + item.CreatedAt.Format(timeLayout), item.UpdatedAt.Format(timeLayout), formatNullTime(item.CompletedAt), item.Outcome, item.Notes, item.ID, + ) + if err != nil { + return err + } + + if requestedStatus != nil && *requestedStatus == "dispatched" { + _, err = tx.Exec(`INSERT INTO dispatch_log (id, work_item_id, dispatched_at, agent, completed_at, outcome) VALUES (?, ?, ?, ?, NULL, NULL)`, + uuid.NewString(), item.ID, item.UpdatedAt.Format(timeLayout), deref(item.AssignedAgent), + ) + if err != nil { + return err + } + } + if requestedStatus != nil && terminalStatuses[*requestedStatus] { + _, err = tx.Exec(`UPDATE dispatch_log SET completed_at = ?, outcome = ? WHERE work_item_id = ? AND completed_at IS NULL`, + formatNullTime(item.CompletedAt), item.Outcome, item.ID, + ) + if err != nil { + return err + } + } + + return tx.Commit() +} + +func (s *Server) fetchWorkItem(id string) (model.WorkItem, error) { + row := s.db.QueryRow(`SELECT id, project_id, type, description, payload, priority, status, assigned_agent, created_by, created_at, updated_at, completed_at, outcome, notes FROM work_items WHERE id = ?`, id) + return scanWorkItem(row) +} + +func (s *Server) fetchDispatchLogs(workItemID string) ([]model.DispatchLog, error) { + rows, err := s.db.Query(`SELECT id, work_item_id, dispatched_at, agent, completed_at, outcome FROM dispatch_log WHERE work_item_id = ? ORDER BY dispatched_at ASC`, workItemID) + if err != nil { + return nil, err + } + defer rows.Close() + + var logs []model.DispatchLog + for rows.Next() { + var log model.DispatchLog + var dispatchedAt, completedAt sql.NullString + var outcome sql.NullString + if err := rows.Scan(&log.ID, &log.WorkItemID, &dispatchedAt, &log.Agent, &completedAt, &outcome); err != nil { + return nil, err + } + log.DispatchedAt, err = time.Parse(timeLayout, dispatchedAt.String) + if err != nil { + return nil, err + } + if completedAt.Valid { + parsed, err := time.Parse(timeLayout, completedAt.String) + if err != nil { + return nil, err + } + log.CompletedAt = &parsed + } + if outcome.Valid { + log.Outcome = &outcome.String + } + logs = append(logs, log) + } + return logs, rows.Err() +} + +func scanProject(scanner interface{ Scan(dest ...any) error }) (model.Project, error) { + var project model.Project + var externalRef sql.NullString + var createdAt, updatedAt string + if err := scanner.Scan(&project.ID, &project.Name, &externalRef, &createdAt, &updatedAt); err != nil { + return project, err + } + if externalRef.Valid { + project.ExternalRef = &externalRef.String + } + var err error + project.CreatedAt, err = time.Parse(timeLayout, createdAt) + if err != nil { + return project, err + } + project.UpdatedAt, err = time.Parse(timeLayout, updatedAt) + return project, err +} + +func scanWorkItem(scanner interface{ Scan(dest ...any) error }) (model.WorkItem, error) { + var item model.WorkItem + var projectID, payload, assignedAgent, createdBy, completedAt, outcome, notes sql.NullString + var createdAt, updatedAt string + if err := scanner.Scan(&item.ID, &projectID, &item.Type, &item.Description, &payload, &item.Priority, &item.Status, &assignedAgent, &createdBy, &createdAt, &updatedAt, &completedAt, &outcome, ¬es); err != nil { + return item, err + } + if projectID.Valid { + item.ProjectID = &projectID.String + } + if payload.Valid && payload.String != "" { + var decoded any + if err := json.Unmarshal([]byte(payload.String), &decoded); err == nil { + item.Payload = decoded + } + } + if assignedAgent.Valid { + item.AssignedAgent = &assignedAgent.String + } + if createdBy.Valid { + item.CreatedBy = &createdBy.String + } + if notes.Valid { + item.Notes = ¬es.String + } + var err error + item.CreatedAt, err = time.Parse(timeLayout, createdAt) + if err != nil { + return item, err + } + item.UpdatedAt, err = time.Parse(timeLayout, updatedAt) + if err != nil { + return item, err + } + if completedAt.Valid { + parsed, err := time.Parse(timeLayout, completedAt.String) + if err != nil { + return item, err + } + item.CompletedAt = &parsed + } + if outcome.Valid { + item.Outcome = &outcome.String + } + return item, nil +} + +func nullablePayload(payload json.RawMessage) any { + if len(payload) == 0 { + return nil + } + return string(payload) +} + +func marshalPayload(payload any) any { + if payload == nil { + return nil + } + bytes, err := json.Marshal(payload) + if err != nil { + return nil + } + return string(bytes) +} + +func formatNullTime(t *time.Time) any { + if t == nil { + return nil + } + return t.Format(timeLayout) +} + +func ptr(s string) *string { return &s } +func deref(s *string) string { + if s == nil { + return "" + } + return *s +} + +func isUniqueConstraint(err error) bool { + if err == nil { + return false + } + return strings.Contains(strings.ToLower(err.Error()), "unique constraint failed") +} diff --git a/internal/api/middleware.go b/internal/api/middleware.go new file mode 100644 index 0000000..a47618b --- /dev/null +++ b/internal/api/middleware.go @@ -0,0 +1,34 @@ +package api + +import ( + "encoding/json" + "net/http" + "time" +) + +func jsonMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + next.ServeHTTP(w, r) + }) +} + +func decodeJSON(r *http.Request, dst any) error { + defer r.Body.Close() + dec := json.NewDecoder(r.Body) + dec.DisallowUnknownFields() + return dec.Decode(dst) +} + +func writeJSON(w http.ResponseWriter, status int, value any) { + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(value) +} + +func writeError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +func nowUTC() time.Time { + return time.Now().UTC().Truncate(time.Second) +} diff --git a/internal/api/server.go b/internal/api/server.go new file mode 100644 index 0000000..b4915b5 --- /dev/null +++ b/internal/api/server.go @@ -0,0 +1,48 @@ +package api + +import ( + "database/sql" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" +) + +type Server struct { + db *sql.DB +} + +func NewServer(db *sql.DB) (*Server, error) { + return &Server{db: db}, nil +} + +func (s *Server) Router() http.Handler { + r := chi.NewRouter() + r.Use(middleware.RequestID) + r.Use(middleware.RealIP) + r.Use(middleware.Recoverer) + r.Use(jsonMiddleware) + + r.Get("/health", s.handleHealth) + + r.Route("/projects", func(r chi.Router) { + r.Post("/", s.handleCreateProject) + r.Get("/", s.handleListProjects) + r.Get("/{id}", s.handleGetProject) + r.Patch("/{id}", s.handlePatchProject) + }) + + r.Route("/work", func(r chi.Router) { + r.Post("/", s.handleCreateWork) + r.Get("/", s.handleListWork) + r.Get("/{id}", s.handleGetWork) + r.Patch("/{id}", s.handlePatchWork) + r.Delete("/{id}", s.handleDeleteWork) + }) + + return r +} + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) +} diff --git a/internal/db/migrations/001_initial.sql b/internal/db/migrations/001_initial.sql new file mode 100644 index 0000000..bc94675 --- /dev/null +++ b/internal/db/migrations/001_initial.sql @@ -0,0 +1,49 @@ +PRAGMA foreign_keys = ON; + +CREATE TABLE IF NOT EXISTS projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + external_ref TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS work_items ( + id TEXT PRIMARY KEY, + project_id TEXT, + type TEXT NOT NULL, + description TEXT NOT NULL, + payload TEXT, + priority INTEGER NOT NULL DEFAULT 3 CHECK(priority BETWEEN 1 AND 5), + status TEXT NOT NULL CHECK(status IN ('queued','dispatched','in_progress','blocked','failed','completed','cancelled')), + assigned_agent TEXT, + created_by TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + completed_at TEXT, + outcome TEXT CHECK(outcome IN ('success','failed','cancelled') OR outcome IS NULL), + notes TEXT, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE SET NULL, + CHECK ((status IN ('completed','failed','cancelled') AND completed_at IS NOT NULL AND outcome IS NOT NULL) OR (status NOT IN ('completed','failed','cancelled') AND completed_at IS NULL AND outcome IS NULL)) +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_work_items_one_in_progress_per_agent +ON work_items(assigned_agent) +WHERE status = 'in_progress' AND assigned_agent IS NOT NULL; + +CREATE INDEX IF NOT EXISTS idx_work_items_status ON work_items(status); +CREATE INDEX IF NOT EXISTS idx_work_items_agent_status ON work_items(assigned_agent, status); +CREATE INDEX IF NOT EXISTS idx_work_items_project_id ON work_items(project_id); +CREATE INDEX IF NOT EXISTS idx_work_items_created_at ON work_items(created_at); + +CREATE TABLE IF NOT EXISTS dispatch_log ( + id TEXT PRIMARY KEY, + work_item_id TEXT NOT NULL, + dispatched_at TEXT NOT NULL, + agent TEXT NOT NULL, + completed_at TEXT, + outcome TEXT CHECK(outcome IN ('success','failed','cancelled') OR outcome IS NULL), + FOREIGN KEY(work_item_id) REFERENCES work_items(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_dispatch_log_work_item_id ON dispatch_log(work_item_id); diff --git a/internal/db/sqlite.go b/internal/db/sqlite.go new file mode 100644 index 0000000..98d4d6b --- /dev/null +++ b/internal/db/sqlite.go @@ -0,0 +1,32 @@ +package db + +import ( + "database/sql" + _ "embed" + "fmt" + "os" + "path/filepath" + + _ "github.com/mattn/go-sqlite3" +) + +//go:embed migrations/001_initial.sql +var initialMigration string + +func Open(databaseURL string) (*sql.DB, error) { + if err := os.MkdirAll(filepath.Dir(databaseURL), 0o755); err != nil && filepath.Dir(databaseURL) != "." { + return nil, fmt.Errorf("mkdir db dir: %w", err) + } + + db, err := sql.Open("sqlite3", databaseURL+"?_foreign_keys=on&_busy_timeout=5000") + if err != nil { + return nil, err + } + + if _, err := db.Exec(initialMigration); err != nil { + db.Close() + return nil, fmt.Errorf("apply migration: %w", err) + } + + return db, nil +} diff --git a/internal/model/models.go b/internal/model/models.go new file mode 100644 index 0000000..d9e3d0a --- /dev/null +++ b/internal/model/models.go @@ -0,0 +1,38 @@ +package model + +import "time" + +type Project struct { + ID string `json:"id"` + Name string `json:"name"` + ExternalRef *string `json:"external_ref,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +type DispatchLog struct { + ID string `json:"id"` + WorkItemID string `json:"work_item_id"` + DispatchedAt time.Time `json:"dispatched_at"` + Agent string `json:"agent"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Outcome *string `json:"outcome,omitempty"` +} + +type WorkItem struct { + ID string `json:"id"` + ProjectID *string `json:"project_id,omitempty"` + Type string `json:"type"` + Description string `json:"description"` + Payload any `json:"payload,omitempty"` + Priority int `json:"priority"` + Status string `json:"status"` + AssignedAgent *string `json:"assigned_agent,omitempty"` + CreatedBy *string `json:"created_by,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Outcome *string `json:"outcome,omitempty"` + Notes *string `json:"notes,omitempty"` + DispatchLog []DispatchLog `json:"dispatch_log,omitempty"` +}