Files
ersteller/workflow/workflow.go
T

129 lines
3.1 KiB
Go

package workflow
import (
"context"
"fmt"
"time"
"git.gorlug.de/code/ersteller"
"git.gorlug.de/code/ersteller/queue"
"git.gorlug.de/code/ersteller/schema/ent"
)
type StepHandler func(ctx context.Context, currentStep *Step,
job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, *Step, error)
type StepParams struct {
Name string
Identifier string
Description string
Client *ent.Client
Processors int
Handler StepHandler
MaxRetries int
}
type Step struct {
StepParams
Queue *queue.GeneralQueue
NextSteps map[string]*Step
}
func NewStep(params *StepParams) *Step {
if params.MaxRetries == 0 {
params.MaxRetries = 3
}
step := Step{
StepParams: *params,
NextSteps: make(map[string]*Step),
}
step.Queue = queue.NewGeneralQueue(params.Identifier,
params.Client, step.HandleQueue, params.Processors)
return &step
}
func (s *Step) AddNextStep(step *Step) {
s.NextSteps[step.Name] = step
}
const workflowIdParam = "workflowId"
func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
ersteller.Debug("Executing step '%s'", s.Name, "payload", payload)
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
if err != nil {
return err
}
err = s.Queue.Start(ctx)
return err
}
func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) {
workflowId := job.Payload[workflowIdParam].(string)
result, nextStep, err := s.Handler(ctx, s, job)
if err != nil {
return result, err
}
if nextStep != nil {
result.ResultPayload[workflowIdParam] = workflowId
ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload)
err = nextStep.Execute(ctx, result.ResultPayload)
if err != nil {
failurePayload := result.FailurePayload
failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error())
return queue.GeneralQueueHandlerResult{
ResultPayload: result.ResultPayload,
FailurePayload: failurePayload,
}, err
}
}
return result, nil
}
type Workflow struct {
Name string
Identifier string
FirstStep *Step
AllSteps []*Step
}
func NewWorkflow(name string, identifier string, firstStep *Step, allSteps []*Step) *Workflow {
return &Workflow{
Name: name,
FirstStep: firstStep,
Identifier: identifier,
AllSteps: allSteps,
}
}
func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error {
payload[workflowIdParam] = w.GenerateId()
return w.FirstStep.Execute(ctx, payload)
}
func (w *Workflow) GenerateId() string {
now := time.Now()
return w.Identifier + "_" + now.Format("20060102150405")
}
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
go func() {
ticker := time.NewTicker(d)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
err := workflow.FirstStep.Execute(ctx, map[string]interface{}{
"workflowId": workflow.GenerateId(),
})
if err != nil {
ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error())
}
}
}
}()
}