Add webhook callback support

Add webhook_url column to jobs table. POST /jobs endpoint accepts JSON payload with optional webhook_url. After job completion, worker POSTs to webhook with status and duration.
This commit is contained in:
Soldier 2025-11-16 08:01:53 +00:00
parent 40d194beb1
commit 018d699e31
3 changed files with 44 additions and 9 deletions

View File

@ -3,7 +3,9 @@ package main
import ( import (
"alpenqueue/pkg/db" "alpenqueue/pkg/db"
"alpenqueue/pkg/worker" "alpenqueue/pkg/worker"
"encoding/json"
"fmt" "fmt"
"io"
"log" "log"
"net/http" "net/http"
) )
@ -27,7 +29,14 @@ func main() {
return return
} }
id, err := db.CreateJob(database) var req struct {
WebhookURL string `json:"webhook_url"`
}
body, _ := io.ReadAll(r.Body)
json.Unmarshal(body, &req)
id, err := db.CreateJob(database, req.WebhookURL)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return

View File

@ -11,6 +11,7 @@ type Job struct {
ID int ID int
Status string Status string
CreatedAt time.Time CreatedAt time.Time
WebhookURL string
} }
func Init(dbPath string) (*sql.DB, error) { func Init(dbPath string) (*sql.DB, error) {
@ -23,7 +24,8 @@ func Init(dbPath string) (*sql.DB, error) {
CREATE TABLE IF NOT EXISTS jobs ( CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT, id INTEGER PRIMARY KEY AUTOINCREMENT,
status TEXT NOT NULL DEFAULT 'pending', status TEXT NOT NULL DEFAULT 'pending',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
webhook_url TEXT
);` );`
_, err = db.Exec(schema) _, err = db.Exec(schema)
@ -34,8 +36,8 @@ func Init(dbPath string) (*sql.DB, error) {
return db, nil return db, nil
} }
func CreateJob(db *sql.DB) (int64, error) { func CreateJob(db *sql.DB, webhookURL string) (int64, error) {
result, err := db.Exec("INSERT INTO jobs (status) VALUES ('pending')") result, err := db.Exec("INSERT INTO jobs (status, webhook_url) VALUES ('pending', ?)", webhookURL)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -43,7 +45,7 @@ func CreateJob(db *sql.DB) (int64, error) {
} }
func GetPendingJobs(db *sql.DB) ([]Job, error) { func GetPendingJobs(db *sql.DB) ([]Job, error) {
rows, err := db.Query("SELECT id, status, created_at FROM jobs WHERE status = 'pending'") rows, err := db.Query("SELECT id, status, created_at, webhook_url FROM jobs WHERE status = 'pending'")
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -52,7 +54,7 @@ func GetPendingJobs(db *sql.DB) ([]Job, error) {
var jobs []Job var jobs []Job
for rows.Next() { for rows.Next() {
var job Job var job Job
if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt); err != nil { if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &job.WebhookURL); err != nil {
return nil, err return nil, err
} }
jobs = append(jobs, job) jobs = append(jobs, job)

View File

@ -2,8 +2,12 @@ package worker
import ( import (
"alpenqueue/pkg/db" "alpenqueue/pkg/db"
"bytes"
"database/sql" "database/sql"
"encoding/json"
"fmt"
"log" "log"
"net/http"
"time" "time"
) )
@ -19,8 +23,28 @@ func Start(database *sql.DB) {
for _, job := range jobs { for _, job := range jobs {
log.Printf("Processing job %d", job.ID) log.Printf("Processing job %d", job.ID)
start := time.Now()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
duration := time.Since(start)
if job.WebhookURL != "" {
payload := map[string]string{
"status": "ok",
"took": fmt.Sprintf("%.1fs", duration.Seconds()),
}
jsonData, _ := json.Marshal(payload)
resp, err := http.Post(job.WebhookURL, "application/json", bytes.NewBuffer(jsonData))
if err != nil {
log.Printf("Error posting webhook for job %d: %v", job.ID, err)
} else {
resp.Body.Close()
log.Printf("Webhook posted for job %d", job.ID)
}
}
if err := db.MarkJobDone(database, job.ID); err != nil { if err := db.MarkJobDone(database, job.ID); err != nil {
log.Printf("Error marking job %d done: %v", job.ID, err) log.Printf("Error marking job %d done: %v", job.ID, err)
} else { } else {