Add frequency_minutes field to schedule recurring jobs. Jobs with frequency > 0 run repeatedly at specified intervals, automatically rescheduling after each execution. One-time jobs (frequency = 0) remain unchanged. Status transitions from pending to active for recurring jobs.
115 lines
3.0 KiB
Go
115 lines
3.0 KiB
Go
package db
|
|
|
|
import (
|
|
"database/sql"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
type Job struct {
|
|
ID int
|
|
Status string
|
|
CreatedAt time.Time
|
|
WebhookURL string
|
|
URL string
|
|
Selector string
|
|
ExtractedContent string
|
|
RawHTML string
|
|
FrequencyMinutes int
|
|
NextRunAt time.Time
|
|
}
|
|
|
|
func Init(dbPath string) (*sql.DB, error) {
|
|
db, err := sql.Open("sqlite3", dbPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
schema := `
|
|
CREATE TABLE IF NOT EXISTS jobs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
status TEXT NOT NULL DEFAULT 'pending',
|
|
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
|
webhook_url TEXT,
|
|
url TEXT,
|
|
selector TEXT,
|
|
extracted_content TEXT,
|
|
raw_html TEXT,
|
|
frequency_minutes INTEGER DEFAULT 0,
|
|
next_run_at DATETIME
|
|
);`
|
|
|
|
_, err = db.Exec(schema)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return db, nil
|
|
}
|
|
|
|
func CreateJob(db *sql.DB, webhookURL, url, selector string, frequencyMinutes int) (int64, error) {
|
|
nextRunAt := time.Now()
|
|
result, err := db.Exec("INSERT INTO jobs (status, webhook_url, url, selector, frequency_minutes, next_run_at) VALUES ('pending', ?, ?, ?, ?, ?)",
|
|
webhookURL, url, selector, frequencyMinutes, nextRunAt)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.LastInsertId()
|
|
}
|
|
|
|
func GetPendingJobs(db *sql.DB) ([]Job, error) {
|
|
now := time.Now()
|
|
rows, err := db.Query(`SELECT id, status, created_at, webhook_url, url, selector, extracted_content, raw_html, frequency_minutes, next_run_at
|
|
FROM jobs
|
|
WHERE (status = 'pending' OR status = 'active') AND next_run_at <= ?`, now)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
var jobs []Job
|
|
for rows.Next() {
|
|
var job Job
|
|
var webhookURL, url, selector, extractedContent, rawHTML sql.NullString
|
|
var nextRunAt sql.NullTime
|
|
if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &webhookURL, &url, &selector, &extractedContent, &rawHTML, &job.FrequencyMinutes, &nextRunAt); err != nil {
|
|
return nil, err
|
|
}
|
|
job.WebhookURL = webhookURL.String
|
|
job.URL = url.String
|
|
job.Selector = selector.String
|
|
job.ExtractedContent = extractedContent.String
|
|
job.RawHTML = rawHTML.String
|
|
if nextRunAt.Valid {
|
|
job.NextRunAt = nextRunAt.Time
|
|
}
|
|
jobs = append(jobs, job)
|
|
}
|
|
return jobs, nil
|
|
}
|
|
|
|
func UpdateJobContent(db *sql.DB, id int, content string) error {
|
|
_, err := db.Exec("UPDATE jobs SET extracted_content = ? WHERE id = ?", content, id)
|
|
return err
|
|
}
|
|
|
|
func UpdateJobHTML(db *sql.DB, id int, html string) error {
|
|
_, err := db.Exec("UPDATE jobs SET raw_html = ? WHERE id = ?", html, id)
|
|
return err
|
|
}
|
|
|
|
func MarkJobDone(db *sql.DB, id int) error {
|
|
_, err := db.Exec("UPDATE jobs SET status = 'done' WHERE id = ?", id)
|
|
return err
|
|
}
|
|
|
|
func UpdateNextRun(db *sql.DB, id int, frequencyMinutes int) error {
|
|
if frequencyMinutes > 0 {
|
|
nextRunAt := time.Now().Add(time.Duration(frequencyMinutes) * time.Minute)
|
|
_, err := db.Exec("UPDATE jobs SET status = 'active', next_run_at = ? WHERE id = ?", nextRunAt, id)
|
|
return err
|
|
}
|
|
return MarkJobDone(db, id)
|
|
}
|