From 12d22efc74971a176c6a0ec825ee5534d98fd6dc Mon Sep 17 00:00:00 2001 From: Achim Rohn Date: Wed, 8 Apr 2026 15:38:07 +0200 Subject: [PATCH] Resume workflow by id --- queue/general_queue.go | 56 ++++++++++++++++++++++++++++++++++++++++++ workflow/workflow.go | 18 ++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/queue/general_queue.go b/queue/general_queue.go index c83b089..b1c6795 100644 --- a/queue/general_queue.go +++ b/queue/general_queue.go @@ -575,6 +575,62 @@ func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTri return nil } +// ResumeWorkflow resets jobs for a specific workflow in this queue so they can be processed again. +// - All jobs with status InProgress are set back to Pending. +// - All jobs with status Failed are set back to Pending and their NumberOfTries is reset to 0. +// After resetting, if the queue is marked running, it will be started. +func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) error { + // Reset in-progress jobs back to pending for the specific workflow + inProgressCount, err := q.client.GeneralQueue.Update(). + Where( + generalqueue.NameEQ(q.Name), + generalqueue.WorkflowIDEQ(workflowId), + generalqueue.StatusEQ(generalqueue.StatusInProgress), + ). + SetStatus(generalqueue.StatusPending). + SetUpdatedAt(time.Now()). + Save(ctx) + if err != nil { + return fmt.Errorf("failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err) + } + if inProgressCount > 0 { + Debug("Reset ", inProgressCount, " in_progress jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")") + } + + // Reset failed jobs back to pending and reset their try counter for the specific workflow + failedCount, err := q.client.GeneralQueue.Update(). + Where( + generalqueue.NameEQ(q.Name), + generalqueue.WorkflowIDEQ(workflowId), + generalqueue.StatusEQ(generalqueue.StatusFailed), + ). + SetStatus(generalqueue.StatusPending). + SetNumberOfTries(0). + SetUpdatedAt(time.Now()). + Save(ctx) + if err != nil { + return fmt.Errorf("failed to reset failed jobs to pending for queue '%s': %w", q.Name, err) + } + if failedCount > 0 { + Debug("Reset ", failedCount, " failed jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")") + } + + // Start the queue again if it is configured as running + if isRunning, err := q.IsRunning(ctx); err != nil { + Error("Failed to check ", q.Name, " queue state:", err) + } else if isRunning { + Debug("restarting ", q.Name, " queue") + if err := q.Start(ctx); err != nil { + Error("Failed to restart ", q.Name, " queue:", err) + return err + } + Debug(q.Name, " queue auto-started") + } else { + Debug(q.Name, " queue not running") + } + return nil +} + func (q *GeneralQueue) Resume(ctx context.Context) error { // Reset all in_progress jobs back to pending count, err := q.client.GeneralQueue.Update(). diff --git a/workflow/workflow.go b/workflow/workflow.go index 9de266e..7052a3f 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -2,6 +2,7 @@ package workflow import ( "context" + "errors" "fmt" "time" @@ -114,6 +115,23 @@ func (w *Workflow) GenerateId() string { return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5) } +// Resume restarts a specific workflow execution by its workflowId. +// It resets failed and in-progress jobs of all steps back to pending and +// starts their queues again if they are running. +func (w *Workflow) Resume(ctx context.Context, workflowId string) error { + var allErr error + for _, step := range w.AllSteps { + if step.Queue == nil { + // Safety: ensure queues are initialized + step.initQueue() + } + if err := step.Queue.ResumeWorkflow(ctx, workflowId); err != nil { + allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err)) + } + } + return allErr +} + func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { go func() { ticker := time.NewTicker(d)