155 lines
3.9 KiB
Go
155 lines
3.9 KiB
Go
package workflow
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.gorlug.de/code/ersteller"
|
|
"git.gorlug.de/code/ersteller/queue"
|
|
"git.gorlug.de/code/ersteller/schema/ent"
|
|
)
|
|
|
|
type StepHandler func(ctx context.Context, currentStep *Step,
|
|
job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, *Step, error)
|
|
|
|
type StepParams struct {
|
|
Name string
|
|
Identifier string
|
|
Description string
|
|
Client *ent.Client
|
|
Processors int
|
|
Handler StepHandler
|
|
MaxRetries int
|
|
}
|
|
|
|
type Step struct {
|
|
StepParams
|
|
Queue *queue.GeneralQueue
|
|
NextSteps map[string]*Step
|
|
}
|
|
|
|
func NewStep(params *StepParams) *Step {
|
|
if params.MaxRetries == 0 {
|
|
params.MaxRetries = 3
|
|
}
|
|
step := Step{
|
|
StepParams: *params,
|
|
NextSteps: make(map[string]*Step),
|
|
}
|
|
return &step
|
|
}
|
|
|
|
func (s *Step) initQueue() {
|
|
s.Queue = queue.NewGeneralQueue(s.Identifier,
|
|
s.Client, s.HandleQueue, s.Processors,
|
|
queue.WithAutoStop(true))
|
|
}
|
|
|
|
func (s *Step) AddNextStep(step *Step) {
|
|
s.NextSteps[step.Name] = step
|
|
}
|
|
|
|
const workflowIdParam = "workflowId"
|
|
|
|
func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
|
|
ersteller.Debug("Executing step ", s.Name, "payload", payload)
|
|
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = s.Queue.Start(ctx)
|
|
return err
|
|
}
|
|
|
|
func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) {
|
|
workflowId := job.Payload[workflowIdParam].(string)
|
|
result, nextStep, err := s.Handler(ctx, s, job)
|
|
if err != nil {
|
|
return result, err
|
|
}
|
|
if nextStep != nil {
|
|
result.ResultPayload[workflowIdParam] = workflowId
|
|
ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload)
|
|
err = nextStep.Execute(context.Background(), result.ResultPayload)
|
|
if err != nil {
|
|
failurePayload := result.FailurePayload
|
|
failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error())
|
|
return queue.GeneralQueueHandlerResult{
|
|
ResultPayload: result.ResultPayload,
|
|
FailurePayload: failurePayload,
|
|
}, err
|
|
}
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
type Workflow struct {
|
|
Name string
|
|
Identifier string
|
|
FirstStep *Step
|
|
AllSteps []*Step
|
|
}
|
|
|
|
func NewWorkflow(name string, identifier string, firstStep *Step, allSteps []*Step) *Workflow {
|
|
for _, step := range allSteps {
|
|
step.Identifier = identifier + "_" + step.Identifier
|
|
step.initQueue()
|
|
}
|
|
return &Workflow{
|
|
Name: name,
|
|
FirstStep: firstStep,
|
|
Identifier: identifier,
|
|
AllSteps: allSteps,
|
|
}
|
|
}
|
|
|
|
func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error {
|
|
payload[workflowIdParam] = w.GenerateId()
|
|
return w.FirstStep.Execute(ctx, payload)
|
|
}
|
|
|
|
func (w *Workflow) GenerateId() string {
|
|
now := time.Now()
|
|
return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5)
|
|
}
|
|
|
|
// Resume restarts a specific workflow execution by its workflowId.
|
|
// It resets failed and in-progress jobs of all steps back to pending and
|
|
// starts their queues again if they are running.
|
|
func (w *Workflow) Resume(ctx context.Context, workflowId string) error {
|
|
var allErr error
|
|
for _, step := range w.AllSteps {
|
|
if step.Queue == nil {
|
|
// Safety: ensure queues are initialized
|
|
step.initQueue()
|
|
}
|
|
if err := step.Queue.ResumeWorkflow(ctx, workflowId); err != nil {
|
|
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
|
}
|
|
}
|
|
return allErr
|
|
}
|
|
|
|
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
|
|
go func() {
|
|
ticker := time.NewTicker(d)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
err := workflow.FirstStep.Execute(ctx, map[string]interface{}{
|
|
"workflowId": workflow.GenerateId(),
|
|
})
|
|
if err != nil {
|
|
ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error())
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|