package workflow import ( "context" "fmt" "time" "git.gorlug.de/code/ersteller" "git.gorlug.de/code/ersteller/queue" "git.gorlug.de/code/ersteller/schema/ent" ) type StepHandler func(ctx context.Context, currentStep *Step, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, *Step, error) type StepParams struct { Name string Identifier string Description string Client *ent.Client Processors int Handler StepHandler MaxRetries int } type Step struct { StepParams Queue *queue.GeneralQueue NextSteps map[string]*Step } func NewStep(params *StepParams) *Step { if params.MaxRetries == 0 { params.MaxRetries = 3 } step := Step{ StepParams: *params, NextSteps: make(map[string]*Step), } return &step } func (s *Step) initQueue() { s.Queue = queue.NewGeneralQueue(s.Identifier, s.Client, s.HandleQueue, s.Processors, queue.WithAutoStop(true)) } func (s *Step) AddNextStep(step *Step) { s.NextSteps[step.Name] = step } const workflowIdParam = "workflowId" func (s *Step) Execute(ctx context.Context, payload map[string]any) error { ersteller.Debug("Executing step ", s.Name, "payload", payload) _, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string))) if err != nil { return err } err = s.Queue.Start(ctx) return err } func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) { workflowId := job.Payload[workflowIdParam].(string) result, nextStep, err := s.Handler(ctx, s, job) if err != nil { return result, err } if nextStep != nil { result.ResultPayload[workflowIdParam] = workflowId ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload) err = nextStep.Execute(context.Background(), result.ResultPayload) if err != nil { failurePayload := result.FailurePayload failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error()) return queue.GeneralQueueHandlerResult{ ResultPayload: result.ResultPayload, FailurePayload: failurePayload, }, err } } return result, nil } type Workflow struct { Name string Identifier string FirstStep *Step AllSteps []*Step } func NewWorkflow(name string, identifier string, firstStep *Step, allSteps []*Step) *Workflow { for _, step := range allSteps { step.Identifier = identifier + "_" + step.Identifier step.initQueue() } return &Workflow{ Name: name, FirstStep: firstStep, Identifier: identifier, AllSteps: allSteps, } } func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error { payload[workflowIdParam] = w.GenerateId() return w.FirstStep.Execute(ctx, payload) } func (w *Workflow) GenerateId() string { now := time.Now() return w.Identifier + "_" + now.Format("20060102150405") } func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { go func() { ticker := time.NewTicker(d) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: err := workflow.FirstStep.Execute(ctx, map[string]interface{}{ "workflowId": workflow.GenerateId(), }) if err != nil { ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error()) } } } }() }