diff --git a/cmd/alpenqueue/main.go b/cmd/alpenqueue/main.go index 1df2e1c..70a8068 100644 --- a/cmd/alpenqueue/main.go +++ b/cmd/alpenqueue/main.go @@ -30,15 +30,16 @@ func main() { } var req struct { - WebhookURL string `json:"webhook_url"` - URL string `json:"url"` - Selector string `json:"selector"` + WebhookURL string `json:"webhook_url"` + URL string `json:"url"` + Selector string `json:"selector"` + FrequencyMinutes int `json:"frequency_minutes"` } body, _ := io.ReadAll(r.Body) json.Unmarshal(body, &req) - id, err := db.CreateJob(database, req.WebhookURL, req.URL, req.Selector) + id, err := db.CreateJob(database, req.WebhookURL, req.URL, req.Selector, req.FrequencyMinutes) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return diff --git a/pkg/db/db.go b/pkg/db/db.go index 4f23254..3d38964 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -16,6 +16,8 @@ type Job struct { Selector string ExtractedContent string RawHTML string + FrequencyMinutes int + NextRunAt time.Time } func Init(dbPath string) (*sql.DB, error) { @@ -33,7 +35,9 @@ func Init(dbPath string) (*sql.DB, error) { url TEXT, selector TEXT, extracted_content TEXT, - raw_html TEXT + raw_html TEXT, + frequency_minutes INTEGER DEFAULT 0, + next_run_at DATETIME );` _, err = db.Exec(schema) @@ -44,8 +48,10 @@ func Init(dbPath string) (*sql.DB, error) { return db, nil } -func CreateJob(db *sql.DB, webhookURL, url, selector string) (int64, error) { - result, err := db.Exec("INSERT INTO jobs (status, webhook_url, url, selector) VALUES ('pending', ?, ?, ?)", webhookURL, url, selector) +func CreateJob(db *sql.DB, webhookURL, url, selector string, frequencyMinutes int) (int64, error) { + nextRunAt := time.Now() + result, err := db.Exec("INSERT INTO jobs (status, webhook_url, url, selector, frequency_minutes, next_run_at) VALUES ('pending', ?, ?, ?, ?, ?)", + webhookURL, url, selector, frequencyMinutes, nextRunAt) if err != nil { return 0, err } @@ -53,7 +59,10 @@ func CreateJob(db *sql.DB, webhookURL, url, selector string) (int64, error) { } func GetPendingJobs(db *sql.DB) ([]Job, error) { - rows, err := db.Query("SELECT id, status, created_at, webhook_url, url, selector, extracted_content, raw_html FROM jobs WHERE status = 'pending'") + now := time.Now() + rows, err := db.Query(`SELECT id, status, created_at, webhook_url, url, selector, extracted_content, raw_html, frequency_minutes, next_run_at + FROM jobs + WHERE (status = 'pending' OR status = 'active') AND next_run_at <= ?`, now) if err != nil { return nil, err } @@ -63,7 +72,8 @@ func GetPendingJobs(db *sql.DB) ([]Job, error) { for rows.Next() { var job Job var webhookURL, url, selector, extractedContent, rawHTML sql.NullString - if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &webhookURL, &url, &selector, &extractedContent, &rawHTML); err != nil { + var nextRunAt sql.NullTime + if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &webhookURL, &url, &selector, &extractedContent, &rawHTML, &job.FrequencyMinutes, &nextRunAt); err != nil { return nil, err } job.WebhookURL = webhookURL.String @@ -71,6 +81,9 @@ func GetPendingJobs(db *sql.DB) ([]Job, error) { job.Selector = selector.String job.ExtractedContent = extractedContent.String job.RawHTML = rawHTML.String + if nextRunAt.Valid { + job.NextRunAt = nextRunAt.Time + } jobs = append(jobs, job) } return jobs, nil @@ -90,3 +103,12 @@ func MarkJobDone(db *sql.DB, id int) error { _, err := db.Exec("UPDATE jobs SET status = 'done' WHERE id = ?", id) return err } + +func UpdateNextRun(db *sql.DB, id int, frequencyMinutes int) error { + if frequencyMinutes > 0 { + nextRunAt := time.Now().Add(time.Duration(frequencyMinutes) * time.Minute) + _, err := db.Exec("UPDATE jobs SET status = 'active', next_run_at = ? WHERE id = ?", nextRunAt, id) + return err + } + return MarkJobDone(db, id) +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 9ee1d74..8ceab5d 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -163,10 +163,14 @@ func Start(database *sql.DB) { } } - if err := db.MarkJobDone(database, job.ID); err != nil { - log.Printf("Error marking job %d done: %v", job.ID, err) + if err := db.UpdateNextRun(database, job.ID, job.FrequencyMinutes); err != nil { + log.Printf("Error updating job %d: %v", job.ID, err) } else { - log.Printf("Job %d completed", job.ID) + if job.FrequencyMinutes > 0 { + log.Printf("Job %d completed, next run in %d minutes", job.ID, job.FrequencyMinutes) + } else { + log.Printf("Job %d completed", job.ID) + } } }