From 018d699e31de8295aeae7bc89a00324b3df9be80 Mon Sep 17 00:00:00 2001 From: Soldier Date: Sun, 16 Nov 2025 08:01:53 +0000 Subject: [PATCH] Add webhook callback support Add webhook_url column to jobs table. POST /jobs endpoint accepts JSON payload with optional webhook_url. After job completion, worker POSTs to webhook with status and duration. --- cmd/alpenqueue/main.go | 11 ++++++++++- pkg/db/db.go | 18 ++++++++++-------- pkg/worker/worker.go | 24 ++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/cmd/alpenqueue/main.go b/cmd/alpenqueue/main.go index 5ec6e0c..322be7c 100644 --- a/cmd/alpenqueue/main.go +++ b/cmd/alpenqueue/main.go @@ -3,7 +3,9 @@ package main import ( "alpenqueue/pkg/db" "alpenqueue/pkg/worker" + "encoding/json" "fmt" + "io" "log" "net/http" ) @@ -27,7 +29,14 @@ func main() { return } - id, err := db.CreateJob(database) + var req struct { + WebhookURL string `json:"webhook_url"` + } + + body, _ := io.ReadAll(r.Body) + json.Unmarshal(body, &req) + + id, err := db.CreateJob(database, req.WebhookURL) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/db/db.go b/pkg/db/db.go index fd75fcd..5406b2d 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -8,9 +8,10 @@ import ( ) type Job struct { - ID int - Status string - CreatedAt time.Time + ID int + Status string + CreatedAt time.Time + WebhookURL string } func Init(dbPath string) (*sql.DB, error) { @@ -23,7 +24,8 @@ func Init(dbPath string) (*sql.DB, error) { CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, status TEXT NOT NULL DEFAULT 'pending', - created_at DATETIME DEFAULT CURRENT_TIMESTAMP + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + webhook_url TEXT );` _, err = db.Exec(schema) @@ -34,8 +36,8 @@ func Init(dbPath string) (*sql.DB, error) { return db, nil } -func CreateJob(db *sql.DB) (int64, error) { - result, err := db.Exec("INSERT INTO jobs (status) VALUES ('pending')") +func CreateJob(db *sql.DB, webhookURL string) (int64, error) { + result, err := db.Exec("INSERT INTO jobs (status, webhook_url) VALUES ('pending', ?)", webhookURL) if err != nil { return 0, err } @@ -43,7 +45,7 @@ func CreateJob(db *sql.DB) (int64, error) { } func GetPendingJobs(db *sql.DB) ([]Job, error) { - rows, err := db.Query("SELECT id, status, created_at FROM jobs WHERE status = 'pending'") + rows, err := db.Query("SELECT id, status, created_at, webhook_url FROM jobs WHERE status = 'pending'") if err != nil { return nil, err } @@ -52,7 +54,7 @@ func GetPendingJobs(db *sql.DB) ([]Job, error) { var jobs []Job for rows.Next() { var job Job - if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt); err != nil { + if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &job.WebhookURL); err != nil { return nil, err } jobs = append(jobs, job) diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index fe7e525..74ac649 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -2,8 +2,12 @@ package worker import ( "alpenqueue/pkg/db" + "bytes" "database/sql" + "encoding/json" + "fmt" "log" + "net/http" "time" ) @@ -19,8 +23,28 @@ func Start(database *sql.DB) { for _, job := range jobs { log.Printf("Processing job %d", job.ID) + start := time.Now() + time.Sleep(2 * time.Second) + duration := time.Since(start) + + if job.WebhookURL != "" { + payload := map[string]string{ + "status": "ok", + "took": fmt.Sprintf("%.1fs", duration.Seconds()), + } + jsonData, _ := json.Marshal(payload) + + resp, err := http.Post(job.WebhookURL, "application/json", bytes.NewBuffer(jsonData)) + if err != nil { + log.Printf("Error posting webhook for job %d: %v", job.ID, err) + } else { + resp.Body.Close() + log.Printf("Webhook posted for job %d", job.ID) + } + } + if err := db.MarkJobDone(database, job.ID); err != nil { log.Printf("Error marking job %d done: %v", job.ID, err) } else {