From d4714c4f27ae3c583ec5a22022d6f25037f9f864 Mon Sep 17 00:00:00 2001 From: Achim Rohn Date: Wed, 8 Apr 2026 15:39:08 +0200 Subject: [PATCH] Add method to resume all workflow steps --- workflow/workflow.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/workflow/workflow.go b/workflow/workflow.go index 7052a3f..db39dab 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -132,6 +132,24 @@ func (w *Workflow) Resume(ctx context.Context, workflowId string) error { return allErr } +// ResumeAll restarts all executions of this workflow across all steps. +// For each step it resets any in-progress jobs to pending and failed jobs to +// pending with NumberOfTries reset to 0, then starts the queue again if it is +// configured as running. +func (w *Workflow) ResumeAll(ctx context.Context) error { + var allErr error + for _, step := range w.AllSteps { + if step.Queue == nil { + // Safety: ensure queues are initialized + step.initQueue() + } + if err := step.Queue.Resume(ctx); err != nil { + allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err)) + } + } + return allErr +} + func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { go func() { ticker := time.NewTicker(d)