From d4008e365503db283fd735553cca54d8a45eb03a Mon Sep 17 00:00:00 2001 From: Achim Rohn Date: Wed, 8 Apr 2026 15:41:22 +0200 Subject: [PATCH] Add methods to retry failed workflows and jobs across all steps and queues --- queue/general_queue.go | 25 ++++++++++++++++++++++--- workflow/workflow.go | 28 ++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/queue/general_queue.go b/queue/general_queue.go index b1c6795..be65414 100644 --- a/queue/general_queue.go +++ b/queue/general_queue.go @@ -575,9 +575,8 @@ func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTri return nil } -// ResumeWorkflow resets jobs for a specific workflow in this queue so they can be processed again. -// - All jobs with status InProgress are set back to Pending. -// - All jobs with status Failed are set back to Pending and their NumberOfTries is reset to 0. +// ResumeWorkflow resumes a specific workflow's execution in this queue by +// setting all InProgress jobs back to Pending, allowing them to be picked up again. // After resetting, if the queue is marked running, it will be started. func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) error { // Reset in-progress jobs back to pending for the specific workflow @@ -597,6 +596,26 @@ func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) er Debug("Reset ", inProgressCount, " in_progress jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")") } + // Start the queue again if it is configured as running + if isRunning, err := q.IsRunning(ctx); err != nil { + Error("Failed to check ", q.Name, " queue state:", err) + } else if isRunning { + Debug("restarting ", q.Name, " queue") + if err := q.Start(ctx); err != nil { + Error("Failed to restart ", q.Name, " queue:", err) + return err + } + Debug(q.Name, " queue auto-started") + } else { + Debug(q.Name, " queue not running") + } + return nil +} + +// RetryFailedWorkflow retries failed jobs for a specific workflow in this queue by +// setting all Failed jobs back to Pending and resetting their NumberOfTries to 0. +// After resetting, if the queue is marked running, it will be started. +func (q *GeneralQueue) RetryFailedWorkflow(ctx context.Context, workflowId string) error { // Reset failed jobs back to pending and reset their try counter for the specific workflow failedCount, err := q.client.GeneralQueue.Update(). Where( diff --git a/workflow/workflow.go b/workflow/workflow.go index db39dab..c377671 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -115,9 +115,9 @@ func (w *Workflow) GenerateId() string { return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5) } -// Resume restarts a specific workflow execution by its workflowId. -// It resets failed and in-progress jobs of all steps back to pending and -// starts their queues again if they are running. +// Resume restarts a specific workflow execution by its workflowId by +// setting any in-progress jobs of all steps back to pending, then starts +// their queues again if they are running. func (w *Workflow) Resume(ctx context.Context, workflowId string) error { var allErr error for _, step := range w.AllSteps { @@ -133,9 +133,8 @@ func (w *Workflow) Resume(ctx context.Context, workflowId string) error { } // ResumeAll restarts all executions of this workflow across all steps. -// For each step it resets any in-progress jobs to pending and failed jobs to -// pending with NumberOfTries reset to 0, then starts the queue again if it is -// configured as running. +// For each step it resets any in-progress jobs to pending, then starts the +// queue again if it is configured as running. func (w *Workflow) ResumeAll(ctx context.Context) error { var allErr error for _, step := range w.AllSteps { @@ -150,6 +149,23 @@ func (w *Workflow) ResumeAll(ctx context.Context) error { return allErr } +// RetryFailed restarts only failed jobs of a specific workflow execution by its +// workflowId across all steps by resetting them to pending and their try count +// to 0, then starts their queues again if they are running. +func (w *Workflow) RetryFailed(ctx context.Context, workflowId string) error { + var allErr error + for _, step := range w.AllSteps { + if step.Queue == nil { + // Safety: ensure queues are initialized + step.initQueue() + } + if err := step.Queue.RetryFailedWorkflow(ctx, workflowId); err != nil { + allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err)) + } + } + return allErr +} + func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { go func() { ticker := time.NewTicker(d)