Files
2026-04-08 17:14:32 +02:00

189 lines
5.0 KiB
Go

package workflow
import (
"context"
"errors"
"fmt"
"time"
"git.gorlug.de/code/ersteller"
"git.gorlug.de/code/ersteller/queue"
"git.gorlug.de/code/ersteller/schema/ent"
)
type StepHandler func(ctx context.Context, currentStep *Step,
job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, *Step, error)
type StepParams struct {
Name string
Identifier string
Description string
Client *ent.Client
Processors int
Handler StepHandler
MaxRetries int
}
type Step struct {
StepParams
Queue *queue.GeneralQueue
NextSteps map[string]*Step
}
func NewStep(params *StepParams) *Step {
if params.MaxRetries == 0 {
params.MaxRetries = 3
}
step := Step{
StepParams: *params,
NextSteps: make(map[string]*Step),
}
return &step
}
func (s *Step) initQueue() {
s.Queue = queue.NewGeneralQueue(s.Identifier,
s.Client, s.HandleQueue, s.Processors,
queue.WithAutoStop(true))
}
func (s *Step) AddNextStep(step *Step) {
s.NextSteps[step.Name] = step
}
const workflowIdParam = "workflowId"
func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
ersteller.Debug("Executing step ", s.Name)
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
if err != nil {
return err
}
err = s.Queue.Start(ctx)
return err
}
func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) {
workflowId := job.Payload[workflowIdParam].(string)
result, nextStep, err := s.Handler(ctx, s, job)
if err != nil {
return result, err
}
if nextStep != nil {
result.ResultPayload[workflowIdParam] = workflowId
ersteller.Debug("Moving to next step ", nextStep.Name)
err = nextStep.Execute(context.Background(), result.ResultPayload)
if err != nil {
failurePayload := result.FailurePayload
failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error())
return queue.GeneralQueueHandlerResult{
ResultPayload: result.ResultPayload,
FailurePayload: failurePayload,
}, err
}
}
return result, nil
}
type Workflow struct {
Name string
Identifier string
FirstStep *Step
AllSteps []*Step
}
func NewWorkflow(name string, identifier string, firstStep *Step, allSteps []*Step) *Workflow {
for _, step := range allSteps {
step.Identifier = identifier + "_" + step.Identifier
step.initQueue()
}
return &Workflow{
Name: name,
FirstStep: firstStep,
Identifier: identifier,
AllSteps: allSteps,
}
}
func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error {
payload[workflowIdParam] = w.GenerateId()
return w.FirstStep.Execute(ctx, payload)
}
func (w *Workflow) GenerateId() string {
now := time.Now()
return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5)
}
// Resume restarts a specific workflow execution by its workflowId by
// setting any in-progress jobs of all steps back to pending, then starts
// their queues again if they are running.
func (w *Workflow) Resume(ctx context.Context, workflowId string) error {
var allErr error
for _, step := range w.AllSteps {
if step.Queue == nil {
// Safety: ensure queues are initialized
step.initQueue()
}
if err := step.Queue.ResumeWorkflow(ctx, workflowId); err != nil {
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
}
}
return allErr
}
// ResumeAll restarts all executions of this workflow across all steps.
// For each step it resets any in-progress jobs to pending, then starts the
// queue again if it is configured as running.
func (w *Workflow) ResumeAll(ctx context.Context) error {
var allErr error
for _, step := range w.AllSteps {
if step.Queue == nil {
// Safety: ensure queues are initialized
step.initQueue()
}
if err := step.Queue.Resume(ctx); err != nil {
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
}
}
return allErr
}
// RetryFailed restarts only failed jobs of a specific workflow execution by its
// workflowId across all steps by resetting them to pending and their try count
// to 0, then starts their queues again if they are running.
func (w *Workflow) RetryFailed(ctx context.Context, workflowId string) error {
var allErr error
for _, step := range w.AllSteps {
if step.Queue == nil {
// Safety: ensure queues are initialized
step.initQueue()
}
if err := step.Queue.RetryFailedWorkflow(ctx, workflowId); err != nil {
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
}
}
return allErr
}
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
go func() {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := workflow.FirstStep.Execute(ctx, map[string]interface{}{
"workflowId": workflow.GenerateId(),
})
if err != nil {
ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error())
}
}
}
}()
}