Add SQLite persistence and worker
Add jobs table with ID, status, and created_at fields. POST /jobs endpoint creates pending jobs in SQLite. Worker polls every 5s for pending jobs, processes them with 2s delay, and marks as done.
This commit is contained in:
parent
c45b61ae0c
commit
40d194beb1
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
*.db
|
||||||
@ -1,12 +1,42 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"alpenqueue/pkg/db"
|
||||||
|
"alpenqueue/pkg/worker"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
database, err := db.Init("./alpenqueue.db")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer database.Close()
|
||||||
|
|
||||||
|
worker.Start(database)
|
||||||
|
|
||||||
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Write([]byte("AlpenQueue running!"))
|
w.Write([]byte("AlpenQueue running!"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
http.HandleFunc("/jobs", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
id, err := db.CreateJob(database)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
fmt.Fprintf(w, "Job %d created\n", id)
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Println("Server starting on :8080")
|
||||||
http.ListenAndServe(":8080", nil)
|
http.ListenAndServe(":8080", nil)
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@ -1,3 +1,5 @@
|
|||||||
module alpenqueue
|
module alpenqueue
|
||||||
|
|
||||||
go 1.25.4
|
go 1.25.4
|
||||||
|
|
||||||
|
require github.com/mattn/go-sqlite3 v1.14.32 // indirect
|
||||||
|
|||||||
2
go.sum
Normal file
2
go.sum
Normal file
@ -0,0 +1,2 @@
|
|||||||
|
github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs=
|
||||||
|
github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||||
66
pkg/db/db.go
Normal file
66
pkg/db/db.go
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
ID int
|
||||||
|
Status string
|
||||||
|
CreatedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func Init(dbPath string) (*sql.DB, error) {
|
||||||
|
db, err := sql.Open("sqlite3", dbPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
schema := `
|
||||||
|
CREATE TABLE IF NOT EXISTS jobs (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
status TEXT NOT NULL DEFAULT 'pending',
|
||||||
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
|
||||||
|
);`
|
||||||
|
|
||||||
|
_, err = db.Exec(schema)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return db, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateJob(db *sql.DB) (int64, error) {
|
||||||
|
result, err := db.Exec("INSERT INTO jobs (status) VALUES ('pending')")
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return result.LastInsertId()
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetPendingJobs(db *sql.DB) ([]Job, error) {
|
||||||
|
rows, err := db.Query("SELECT id, status, created_at FROM jobs WHERE status = 'pending'")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
var jobs []Job
|
||||||
|
for rows.Next() {
|
||||||
|
var job Job
|
||||||
|
if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
jobs = append(jobs, job)
|
||||||
|
}
|
||||||
|
return jobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func MarkJobDone(db *sql.DB, id int) error {
|
||||||
|
_, err := db.Exec("UPDATE jobs SET status = 'done' WHERE id = ?", id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
34
pkg/worker/worker.go
Normal file
34
pkg/worker/worker.go
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
package worker
|
||||||
|
|
||||||
|
import (
|
||||||
|
"alpenqueue/pkg/db"
|
||||||
|
"database/sql"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Start(database *sql.DB) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
jobs, err := db.GetPendingJobs(database)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error fetching jobs: %v", err)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, job := range jobs {
|
||||||
|
log.Printf("Processing job %d", job.ID)
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
if err := db.MarkJobDone(database, job.ID); err != nil {
|
||||||
|
log.Printf("Error marking job %d done: %v", job.ID, err)
|
||||||
|
} else {
|
||||||
|
log.Printf("Job %d completed", job.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user