diff --git a/starter/example/workflow.go b/starter/example/workflow.go index 17efdb8..16c7840 100644 --- a/starter/example/workflow.go +++ b/starter/example/workflow.go @@ -8,7 +8,7 @@ import ( . "git.gorlug.de/code/ersteller/workflow" ) -func CreateExampleWorkflow(client *ent.Client) { +func CreateExampleWorkflow(client *ent.Client) *Workflow { lastStep := NewStep(&StepParams{ Name: "Last step", Identifier: "last_step", @@ -38,7 +38,8 @@ func CreateExampleWorkflow(client *ent.Client) { }, lastStep, nil }, }) - NewWorkflow("Example", "example", firstStep) + w := NewWorkflow("Example", "example", firstStep, []*Step{firstStep, lastStep}) + return w } //type name struct { diff --git a/starter/main.go b/starter/main.go index df40ad2..714caee 100644 --- a/starter/main.go +++ b/starter/main.go @@ -24,32 +24,29 @@ func main() { if err != nil { log.Fatalf("failed opening connection to sqlite: %v", err) } - log.Println("client", client) defer client.Close() - err = client.Schema.Create(context.Background()) - if err != nil { - log.Fatalf("failed creating schema resources: %v", err) - } entClient, err := erstellerEnt.Open("sqlite3", environment.DatabaseUrl) if err != nil { log.Fatalf("failed opening connection to sqlite: %v", err) } defer entClient.Close() - err = entClient.Schema.Create(context.Background()) - if err != nil { - log.Fatalf("failed creating schema resources: %v", err) - } - example.CreateExampleWorkflow(entClient) ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) defer cancel() + err = entClient.Schema.Create(ctx) + if err != nil { + log.Fatalf("failed creating schema resources: %v", err) + } + exampleWorkflow := example.CreateExampleWorkflow(entClient) + Debug("example workflow created:", exampleWorkflow) + if err := client.Schema.Create(ctx); err != nil { log.Fatalf("failed creating schema resources: %v", err) } Debug("starting white label app on port 8090") - handler := routes.CreateApi(environment, client) + handler := routes.CreateApi(environment, client, entClient, exampleWorkflow) log.Fatal(http.ListenAndServe(":8090", handler)) } diff --git a/starter/routes/routing.go b/starter/routes/routing.go index 88a8137..13a09b2 100644 --- a/starter/routes/routing.go +++ b/starter/routes/routing.go @@ -6,6 +6,7 @@ import ( . "git.gorlug.de/code/ersteller" "git.gorlug.de/code/ersteller/authentication" google_http "git.gorlug.de/code/ersteller/authentication/google/http" + erstellerEnt "git.gorlug.de/code/ersteller/schema/ent" "git.gorlug.de/code/ersteller/starter/about" "git.gorlug.de/code/ersteller/starter/contact" "git.gorlug.de/code/ersteller/starter/ent" @@ -15,12 +16,14 @@ import ( "git.gorlug.de/code/ersteller/starter/login" "git.gorlug.de/code/ersteller/starter/page" "git.gorlug.de/code/ersteller/starter/todos" + "git.gorlug.de/code/ersteller/starter/workflow_executions" + workflowPkg "git.gorlug.de/code/ersteller/workflow" "github.com/gorilla/sessions" . "maragu.dev/gomponents" ) -func CreateApi(environment env.Environment, db *ent.Client) http.Handler { +func CreateApi(environment env.Environment, db *ent.Client, workflowDb *erstellerEnt.Client, exampleWorkflow *workflowPkg.Workflow) http.Handler { server := NewHtmxServer() HtmxRouteDebugTrace = true @@ -73,8 +76,17 @@ func CreateApi(environment env.Environment, db *ent.Client) http.Handler { De: todos.TodosPathDe, }) + // Workflow Executions navigation item + workflowExecutionsActivePath := NewActivePath(map[Language]string{ + En: "Workflows", + De: "Workflows", + }, LanguagePaths{ + En: workflow_executions.WorkflowExecutionsPath, + De: workflow_executions.WorkflowExecutionsPathDe, + }) + // Main navigation items - activePaths := []ActivePath{indexActivePath, aboutActivePath, contactActivePath, todosActivePath} + activePaths := []ActivePath{indexActivePath, aboutActivePath, contactActivePath, todosActivePath, workflowExecutionsActivePath} // Footer navigation items (placeholder - can be customized) footerPaths := []ActivePath{} @@ -89,6 +101,9 @@ func CreateApi(environment env.Environment, db *ent.Client) http.Handler { // Create Todos page _ = todos.NewPage(createPageFunc, server, &todosActivePath, db) + // Create Workflow Executions page + _ = workflow_executions.NewPage(createPageFunc, server, &workflowExecutionsActivePath, workflowDb, exampleWorkflow) + // Create Login page loginPaths := LanguagePaths{ En: login.LoginPath, diff --git a/starter/workflow_executions/workflow_executions.go b/starter/workflow_executions/workflow_executions.go new file mode 100644 index 0000000..9f2d919 --- /dev/null +++ b/starter/workflow_executions/workflow_executions.go @@ -0,0 +1,184 @@ +package workflow_executions + +import ( + "context" + "fmt" + "sort" + "strings" + + . "git.gorlug.de/code/ersteller" + "git.gorlug.de/code/ersteller/schema/ent" + "git.gorlug.de/code/ersteller/schema/ent/generalqueue" + "git.gorlug.de/code/ersteller/workflow" + + . "maragu.dev/gomponents" + . "maragu.dev/gomponents/html" +) + +const WorkflowExecutionsPath = "/workflow-executions" +const WorkflowExecutionsPathDe = "/workflow-ausfuehrungen" + +var texts *Texts + +type Texts struct { + PageTitle I18nText + PageDescription I18nText + HeroTitle I18nText + TriggerPlaceholder I18nText + TriggerButton I18nText +} + +type Page struct { + createPage CreateHtmxPageFunc + db *ent.Client + wf *workflow.Workflow + + ViewRoute HtmxRoute + TriggerRoute HtmxRoute +} + +func NewPage(createPage CreateHtmxPageFunc, server HtmxServer, path *ActivePath, db *ent.Client, wf *workflow.Workflow) *Page { + if texts == nil { + createTexts() + } + p := &Page{createPage: createPage, db: db, wf: wf} + p.ViewRoute = NewHtmxGetRoute(p.View, LanguagePaths{En: WorkflowExecutionsPath, De: WorkflowExecutionsPathDe}).SetActivePath(path) + p.ViewRoute.Add(server) + + p.TriggerRoute = NewHtmxPostRoute(p.Trigger, LanguagePaths{En: WorkflowExecutionsPath + "/trigger", De: WorkflowExecutionsPathDe + "/trigger"}) + p.TriggerRoute.Add(server) + return p +} + +func createTexts() { + texts = &Texts{ + PageTitle: NewI18nText(map[Language]string{En: "Workflow Executions", De: "Workflow-Ausführungen"}), + PageDescription: NewI18nText(map[Language]string{En: "Monitor workflow executions", De: "Überwache Workflow-Ausführungen"}), + HeroTitle: NewI18nText(map[Language]string{En: "Workflow Executions", De: "Workflow-Ausführungen"}), + TriggerPlaceholder: NewI18nText(map[Language]string{En: "Input for workflow", De: "Input für Workflow"}), + TriggerButton: NewI18nText(map[Language]string{En: "Trigger Workflow", De: "Workflow starten"}), + } +} + +func (p *Page) getMetaData() PageWebsiteMetaData { + return PageWebsiteMetaData{ + Title: texts.PageTitle, + Lang: En, + Description: texts.PageDescription, + } +} + +func (p *Page) View(c HtmxContext) { + language := c.GetLanguage() + + // Query all jobs ordered by created_at desc + jobs, _ := p.db.GeneralQueue.Query(). + Order(ent.Desc(generalqueue.FieldCreatedAt)). + All(context.Background()) + + // Group jobs by workflow_id + executions := make(map[string][]*ent.GeneralQueue) + var workflowIds []string + for _, j := range jobs { + if j.WorkflowID == "" { + continue + } + if _, ok := executions[j.WorkflowID]; !ok { + workflowIds = append(workflowIds, j.WorkflowID) + } + executions[j.WorkflowID] = append(executions[j.WorkflowID], j) + } + + content := Group{ + Div(Class("hero-section"), + H1(Class("hero-title"), Text(texts.HeroTitle.FromLang(language))), + ), + Div(Class("content-section"), + p.triggerForm(c), + Div(ID("executions-list"), p.executionsList(c, workflowIds, executions)), + ), + } + p.createPage(c, p.getMetaData(), content) +} + +func (p *Page) triggerForm(c HtmxContext) Node { + lang := c.GetLanguage() + return Form(Action(p.TriggerRoute.ToUrlFromContext(c, lang)), Method("post"), + Attr("hx-post", p.TriggerRoute.ToUrlFromContext(c, lang)), + Attr("hx-target", "#executions-list"), + Attr("hx-swap", "outerHTML"), + Div(Class("form-row"), + Input(Type("text"), Name("input"), Placeholder(texts.TriggerPlaceholder.FromLang(lang))), + Button(Type("submit"), Text(texts.TriggerButton.FromLang(lang))), + ), + ) +} + +func (p *Page) executionsList(c HtmxContext, workflowIds []string, executions map[string][]*ent.GeneralQueue) Node { + items := make([]Node, 0, len(workflowIds)) + for _, id := range workflowIds { + items = append(items, p.executionItem(id, executions[id])) + } + return Div(Class("executions-container"), Group(items)) +} + +func (p *Page) executionItem(workflowId string, jobs []*ent.GeneralQueue) Node { + // Sort jobs by created_at within execution + sort.Slice(jobs, func(i, j int) bool { + return jobs[i].CreatedAt.Before(jobs[j].CreatedAt) + }) + + jobNodes := make([]Node, 0, len(jobs)) + for _, j := range jobs { + jobNodes = append(jobNodes, Div(Class("job-step"), + H4(Text(fmt.Sprintf("Step: %s", j.Name))), + P(Text(fmt.Sprintf("Status: %s", j.Status))), + P(Text(fmt.Sprintf("Tries: %d/%d", j.NumberOfTries, j.MaxRetries))), + If(j.ErrorMessage != "", P(Class("error-message"), Text(fmt.Sprintf("Error: %s", j.ErrorMessage)))), + If(len(j.Payload) > 0, Details(Summary(Text("Payload")), Pre(Class("payload-pre"), Text(fmt.Sprintf("%+v", j.Payload))))), + If(len(j.ResultPayload) > 0, Details(Summary(Text("Result")), Pre(Class("result-pre"), Text(fmt.Sprintf("%+v", j.ResultPayload))))), + )) + } + + return Div(Class("execution-card"), + H3(Text(fmt.Sprintf("Execution: %s", workflowId))), + Div(Class("steps-list"), Group(jobNodes)), + Hr(), + ) +} + +func (p *Page) Trigger(c HtmxContext) { + input := strings.TrimSpace(c.GetFormValue("input")) + + payload := map[string]interface{}{ + "workflowId": p.wf.GenerateId(), + "input": input, + } + + err := p.wf.Execute(context.Background(), payload) + if err != nil { + // handle error? + } + + p.renderList(c) +} + +func (p *Page) renderList(c HtmxContext) { + jobs, _ := p.db.GeneralQueue.Query(). + Order(ent.Desc(generalqueue.FieldCreatedAt)). + All(context.Background()) + + executions := make(map[string][]*ent.GeneralQueue) + var workflowIds []string + for _, j := range jobs { + if j.WorkflowID == "" { + continue + } + if _, ok := executions[j.WorkflowID]; !ok { + workflowIds = append(workflowIds, j.WorkflowID) + } + executions[j.WorkflowID] = append(executions[j.WorkflowID], j) + } + + c.Render(Div(ID("executions-list"), p.executionsList(c, workflowIds, executions))) +} diff --git a/workflow/workflow.go b/workflow/workflow.go index 264b9e4..9906c88 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -48,7 +48,8 @@ func (s *Step) AddNextStep(step *Step) { const workflowIdParam = "workflowId" -func (s *Step) Execute(ctx context.Context, payload map[string]interface{}) error { +func (s *Step) Execute(ctx context.Context, payload map[string]any) error { + ersteller.Debug("Executing step '%s'", s.Name, "payload", payload) _, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string))) if err != nil { return err @@ -58,11 +59,14 @@ func (s *Step) Execute(ctx context.Context, payload map[string]interface{}) erro } func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) { + workflowId := job.Payload[workflowIdParam].(string) result, nextStep, err := s.Handler(ctx, s, job) if err != nil { return result, err } if nextStep != nil { + result.ResultPayload[workflowIdParam] = workflowId + ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload) err = nextStep.Execute(ctx, result.ResultPayload) if err != nil { failurePayload := result.FailurePayload @@ -80,18 +84,24 @@ type Workflow struct { Name string Identifier string FirstStep *Step + AllSteps []*Step } -func NewWorkflow(name string, identifier string, firstStep *Step) *Workflow { - go func() {}() +func NewWorkflow(name string, identifier string, firstStep *Step, allSteps []*Step) *Workflow { return &Workflow{ Name: name, FirstStep: firstStep, Identifier: identifier, + AllSteps: allSteps, } } -func (w *Workflow) generateId() string { +func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error { + payload[workflowIdParam] = w.GenerateId() + return w.FirstStep.Execute(ctx, payload) +} + +func (w *Workflow) GenerateId() string { now := time.Now() return w.Identifier + "_" + now.Format("20060102150405") } @@ -107,7 +117,7 @@ func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { return case <-ticker.C: err := workflow.FirstStep.Execute(ctx, map[string]interface{}{ - "workflowId": workflow.generateId(), + "workflowId": workflow.GenerateId(), }) if err != nil { ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error())