First workflow implementation
This commit is contained in:
@@ -0,0 +1,120 @@
|
||||
package workflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.gorlug.de/code/ersteller"
|
||||
"git.gorlug.de/code/ersteller/queue"
|
||||
"git.gorlug.de/code/ersteller/schema/ent"
|
||||
)
|
||||
|
||||
type StepHandler interface {
|
||||
Handle(ctx context.Context, currentStep *Step,
|
||||
job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, *Step, error)
|
||||
}
|
||||
|
||||
type StepParams struct {
|
||||
Name string
|
||||
Identifier string
|
||||
Description string
|
||||
Client *ent.Client
|
||||
Processors int
|
||||
Handler StepHandler
|
||||
MaxRetries int
|
||||
}
|
||||
|
||||
type Step struct {
|
||||
StepParams
|
||||
Queue *queue.GeneralQueue
|
||||
NextSteps map[string]*Step
|
||||
}
|
||||
|
||||
func NewStep(params *StepParams) *Step {
|
||||
if params.MaxRetries == 0 {
|
||||
params.MaxRetries = 3
|
||||
}
|
||||
step := Step{
|
||||
StepParams: *params,
|
||||
NextSteps: make(map[string]*Step),
|
||||
}
|
||||
step.Queue = queue.NewGeneralQueue(params.Identifier,
|
||||
params.Client, step.HandleQueue, params.Processors)
|
||||
return &step
|
||||
}
|
||||
|
||||
func (s *Step) AddNextStep(step *Step) {
|
||||
s.NextSteps[step.Name] = step
|
||||
}
|
||||
|
||||
const workflowIdParam = "workflowId"
|
||||
|
||||
func (s *Step) Execute(ctx context.Context, payload map[string]interface{}) error {
|
||||
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = s.Queue.Start(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) {
|
||||
result, nextStep, err := s.Handler.Handle(ctx, s, job)
|
||||
if err != nil {
|
||||
return result, err
|
||||
}
|
||||
if nextStep != nil {
|
||||
err = nextStep.Execute(ctx, result.ResultPayload)
|
||||
if err != nil {
|
||||
failurePayload := result.FailurePayload
|
||||
failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error())
|
||||
return queue.GeneralQueueHandlerResult{
|
||||
ResultPayload: result.ResultPayload,
|
||||
FailurePayload: failurePayload,
|
||||
}, err
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type Workflow struct {
|
||||
Name string
|
||||
Identifier string
|
||||
FirstStep *Step
|
||||
}
|
||||
|
||||
func NewWorkflow(name string, identifier string, firstStep *Step) *Workflow {
|
||||
go func() {}()
|
||||
return &Workflow{
|
||||
Name: name,
|
||||
FirstStep: firstStep,
|
||||
Identifier: identifier,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Workflow) generateId() string {
|
||||
now := time.Now()
|
||||
return w.Identifier + "_" + now.Format("20060102150405")
|
||||
}
|
||||
|
||||
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(d)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
err := workflow.FirstStep.Execute(ctx, map[string]interface{}{
|
||||
"workflowId": workflow.generateId(),
|
||||
})
|
||||
if err != nil {
|
||||
ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user