Add recurring job scheduling with frequency
Add frequency_minutes field to schedule recurring jobs. Jobs with frequency > 0 run repeatedly at specified intervals, automatically rescheduling after each execution. One-time jobs (frequency = 0) remain unchanged. Status transitions from pending to active for recurring jobs.
This commit is contained in:
parent
985d340855
commit
c395a57b38
@ -33,12 +33,13 @@ func main() {
|
|||||||
WebhookURL string `json:"webhook_url"`
|
WebhookURL string `json:"webhook_url"`
|
||||||
URL string `json:"url"`
|
URL string `json:"url"`
|
||||||
Selector string `json:"selector"`
|
Selector string `json:"selector"`
|
||||||
|
FrequencyMinutes int `json:"frequency_minutes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
body, _ := io.ReadAll(r.Body)
|
body, _ := io.ReadAll(r.Body)
|
||||||
json.Unmarshal(body, &req)
|
json.Unmarshal(body, &req)
|
||||||
|
|
||||||
id, err := db.CreateJob(database, req.WebhookURL, req.URL, req.Selector)
|
id, err := db.CreateJob(database, req.WebhookURL, req.URL, req.Selector, req.FrequencyMinutes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|||||||
32
pkg/db/db.go
32
pkg/db/db.go
@ -16,6 +16,8 @@ type Job struct {
|
|||||||
Selector string
|
Selector string
|
||||||
ExtractedContent string
|
ExtractedContent string
|
||||||
RawHTML string
|
RawHTML string
|
||||||
|
FrequencyMinutes int
|
||||||
|
NextRunAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func Init(dbPath string) (*sql.DB, error) {
|
func Init(dbPath string) (*sql.DB, error) {
|
||||||
@ -33,7 +35,9 @@ func Init(dbPath string) (*sql.DB, error) {
|
|||||||
url TEXT,
|
url TEXT,
|
||||||
selector TEXT,
|
selector TEXT,
|
||||||
extracted_content TEXT,
|
extracted_content TEXT,
|
||||||
raw_html TEXT
|
raw_html TEXT,
|
||||||
|
frequency_minutes INTEGER DEFAULT 0,
|
||||||
|
next_run_at DATETIME
|
||||||
);`
|
);`
|
||||||
|
|
||||||
_, err = db.Exec(schema)
|
_, err = db.Exec(schema)
|
||||||
@ -44,8 +48,10 @@ func Init(dbPath string) (*sql.DB, error) {
|
|||||||
return db, nil
|
return db, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateJob(db *sql.DB, webhookURL, url, selector string) (int64, error) {
|
func CreateJob(db *sql.DB, webhookURL, url, selector string, frequencyMinutes int) (int64, error) {
|
||||||
result, err := db.Exec("INSERT INTO jobs (status, webhook_url, url, selector) VALUES ('pending', ?, ?, ?)", webhookURL, url, selector)
|
nextRunAt := time.Now()
|
||||||
|
result, err := db.Exec("INSERT INTO jobs (status, webhook_url, url, selector, frequency_minutes, next_run_at) VALUES ('pending', ?, ?, ?, ?, ?)",
|
||||||
|
webhookURL, url, selector, frequencyMinutes, nextRunAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@ -53,7 +59,10 @@ func CreateJob(db *sql.DB, webhookURL, url, selector string) (int64, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func GetPendingJobs(db *sql.DB) ([]Job, error) {
|
func GetPendingJobs(db *sql.DB) ([]Job, error) {
|
||||||
rows, err := db.Query("SELECT id, status, created_at, webhook_url, url, selector, extracted_content, raw_html FROM jobs WHERE status = 'pending'")
|
now := time.Now()
|
||||||
|
rows, err := db.Query(`SELECT id, status, created_at, webhook_url, url, selector, extracted_content, raw_html, frequency_minutes, next_run_at
|
||||||
|
FROM jobs
|
||||||
|
WHERE (status = 'pending' OR status = 'active') AND next_run_at <= ?`, now)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -63,7 +72,8 @@ func GetPendingJobs(db *sql.DB) ([]Job, error) {
|
|||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var job Job
|
var job Job
|
||||||
var webhookURL, url, selector, extractedContent, rawHTML sql.NullString
|
var webhookURL, url, selector, extractedContent, rawHTML sql.NullString
|
||||||
if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &webhookURL, &url, &selector, &extractedContent, &rawHTML); err != nil {
|
var nextRunAt sql.NullTime
|
||||||
|
if err := rows.Scan(&job.ID, &job.Status, &job.CreatedAt, &webhookURL, &url, &selector, &extractedContent, &rawHTML, &job.FrequencyMinutes, &nextRunAt); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
job.WebhookURL = webhookURL.String
|
job.WebhookURL = webhookURL.String
|
||||||
@ -71,6 +81,9 @@ func GetPendingJobs(db *sql.DB) ([]Job, error) {
|
|||||||
job.Selector = selector.String
|
job.Selector = selector.String
|
||||||
job.ExtractedContent = extractedContent.String
|
job.ExtractedContent = extractedContent.String
|
||||||
job.RawHTML = rawHTML.String
|
job.RawHTML = rawHTML.String
|
||||||
|
if nextRunAt.Valid {
|
||||||
|
job.NextRunAt = nextRunAt.Time
|
||||||
|
}
|
||||||
jobs = append(jobs, job)
|
jobs = append(jobs, job)
|
||||||
}
|
}
|
||||||
return jobs, nil
|
return jobs, nil
|
||||||
@ -90,3 +103,12 @@ func MarkJobDone(db *sql.DB, id int) error {
|
|||||||
_, err := db.Exec("UPDATE jobs SET status = 'done' WHERE id = ?", id)
|
_, err := db.Exec("UPDATE jobs SET status = 'done' WHERE id = ?", id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UpdateNextRun(db *sql.DB, id int, frequencyMinutes int) error {
|
||||||
|
if frequencyMinutes > 0 {
|
||||||
|
nextRunAt := time.Now().Add(time.Duration(frequencyMinutes) * time.Minute)
|
||||||
|
_, err := db.Exec("UPDATE jobs SET status = 'active', next_run_at = ? WHERE id = ?", nextRunAt, id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return MarkJobDone(db, id)
|
||||||
|
}
|
||||||
|
|||||||
@ -163,12 +163,16 @@ func Start(database *sql.DB) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := db.MarkJobDone(database, job.ID); err != nil {
|
if err := db.UpdateNextRun(database, job.ID, job.FrequencyMinutes); err != nil {
|
||||||
log.Printf("Error marking job %d done: %v", job.ID, err)
|
log.Printf("Error updating job %d: %v", job.ID, err)
|
||||||
|
} else {
|
||||||
|
if job.FrequencyMinutes > 0 {
|
||||||
|
log.Printf("Job %d completed, next run in %d minutes", job.ID, job.FrequencyMinutes)
|
||||||
} else {
|
} else {
|
||||||
log.Printf("Job %d completed", job.ID)
|
log.Printf("Job %d completed", job.ID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user