From 5bab8de04002b2c6c862073fe0b25acf5dfbf775 Mon Sep 17 00:00:00 2001 From: Achim Rohn Date: Sun, 5 Apr 2026 11:27:38 +0200 Subject: [PATCH] Use a new context for the next step, because the current one of the job is already stopped --- logger.go | 7 ++-- queue/general_queue.go | 86 ++++++++++++++++++++++++++++---------- starter/.gitignore | 2 +- starter/env/environment.go | 5 ++- workflow/workflow.go | 10 +++-- 5 files changed, 80 insertions(+), 30 deletions(-) diff --git a/logger.go b/logger.go index 48db58b..914d8fb 100644 --- a/logger.go +++ b/logger.go @@ -2,6 +2,7 @@ package ersteller import ( "fmt" + "log" "strings" ) @@ -16,7 +17,7 @@ func LogDebug(message string, a ...any) { } func PrintLogDebug(message string, a ...any) { - println(fmt.Sprintf(message, a...)) + log.Println(fmt.Sprintf(message, a...)) } func Debug(a ...any) { @@ -29,7 +30,7 @@ func Debug(a ...any) { func PrintDebug(a ...any) { stringValue := joinStrings(a) - println(stringValue) + log.Println(stringValue) } func joinStrings(a []any) string { @@ -51,7 +52,7 @@ func LogError(message string, a ...any) { } func PrintLogError(message string, a ...any) { - println(fmt.Sprintf("Error: %v", fmt.Sprintf(message, a...))) + log.Println(fmt.Sprintf("Error: %v", fmt.Sprintf(message, a...))) } func Error(a ...any) { diff --git a/queue/general_queue.go b/queue/general_queue.go index 5d03d24..6aa6098 100644 --- a/queue/general_queue.go +++ b/queue/general_queue.go @@ -40,6 +40,18 @@ type GeneralQueueHandlerResult struct { type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error) +type GeneralQueueParams struct { + AutoStop bool +} + +type GeneraleQueueOptions func(p *GeneralQueueParams) + +func WithAutoStop(autoStop bool) GeneraleQueueOptions { + return func(q *GeneralQueueParams) { + q.AutoStop = autoStop + } +} + type GeneralQueue struct { Name string client *ent.Client @@ -51,20 +63,29 @@ type GeneralQueue struct { startCtx context.Context wg sync.WaitGroup inFlight int64 + GeneralQueueParams } // NewGeneralQueue creates a new general queue instance -func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int) *GeneralQueue { +func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int, + options ...GeneraleQueueOptions) *GeneralQueue { + params := GeneralQueueParams{ + AutoStop: true, + } + for _, option := range options { + option(¶ms) + } if processors <= 0 { processors = 1 } return &GeneralQueue{ - Name: name, - client: client, - running: false, - handler: handler, - workerCount: processors, - pollInterval: 5 * time.Second, + Name: name, + client: client, + running: false, + handler: handler, + workerCount: processors, + pollInterval: 5 * time.Second, + GeneralQueueParams: params, } } @@ -142,9 +163,18 @@ func (q *GeneralQueue) Start(ctx context.Context) error { q.wg.Add(1) go q.workerLoop(q.startCtx, q.handler) } + if q.GeneralQueueParams.AutoStop { + q.runAutoStop() + } + LogDebug("Queue '%s' started with %d processors", q.Name, workers) + return nil +} + +func (q *GeneralQueue) runAutoStop() { // Auto-stop persistence: when all workers finish (e.g., no more jobs), // mark queue as not running in DB and in memory, mirroring previous behavior. go func(name string) { + time.Sleep(q.pollInterval) q.wg.Wait() q.running = false if err := q.SetRunning(context.Background(), false); err != nil { @@ -152,8 +182,6 @@ func (q *GeneralQueue) Start(ctx context.Context) error { } LogDebug("Queue '%s' completed. Auto-stopping.", name) }(q.Name) - LogDebug("Queue '%s' started with %d processors", q.Name, workers) - return nil } // Stop stops the queue from processing new jobs @@ -176,6 +204,7 @@ func (q *GeneralQueue) Stop(ctx context.Context) error { // loop processes jobs continuously until stopped func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandler) { + Debug(q.Name, "worker loop started") defer q.wg.Done() ticker := time.NewTicker(q.pollInterval) defer ticker.Stop() @@ -183,6 +212,7 @@ func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandl for { select { case <-ctx.Done(): + Debug(q.Name, "worker loop context done") return case <-ticker.C: // Process at most one job per tick per worker @@ -190,34 +220,47 @@ func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandl if err != nil { LogError("Queue '%s' processing error:", q.Name, err) } - // If no job was processed, initiate auto-stop (match previous behavior) - if err == nil && !processed { - // Only stop if there is truly no work left: no pending or in-progress jobs - hasWork, hwErr := q.hasActiveOrPending(ctx) - if hwErr != nil { - LogError("Queue '%s' work-check error:", q.Name, hwErr) - } - if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 { - if q.cancel != nil { - q.cancel() - } + if q.AutoStop { + if q.autoStopCheckForWork(ctx, err, processed) { return } - // Otherwise, continue ticking while others are working or new jobs may arrive } } } } +func (q *GeneralQueue) autoStopCheckForWork(ctx context.Context, err error, processed bool) bool { + // If no job was processed, initiate auto-stop (match previous behavior) + if err == nil && !processed { + // Only stop if there is truly no work left: no pending or in-progress jobs + hasWork, hwErr := q.hasActiveOrPending(ctx) + if hwErr != nil { + LogError("Queue '%s' work-check error:", q.Name, hwErr) + } + Debug("Queue", q.Name, "hasWork:", hasWork) + if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 { + if q.cancel != nil { + q.cancel() + } + return true + } + // Otherwise, continue ticking while others are working or new jobs may arrive + } + return false +} + // processNext processes the next pending job and returns true if a job was processed func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHandler) (bool, error) { job, err := q.claimNextPendingJob(ctx) if err != nil { + Error(q.Name, "Failed to claim next pending job:", err) return false, err } if job == nil { + Debug(q.Name, "no pending jobs") return false, nil // No pending jobs } + Debug(q.Name, "found next pending job:", job.ID, job.Name) // Mark this worker as processing a job atomicpkg.AddInt64(&q.inFlight, 1) @@ -374,6 +417,7 @@ func WithWorkflowId(id string) EnqueueOption { // Enqueue adds a new job to the queue func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) { + Debug("Enqueueing job to queue:", q.Name, "payload", payload) params := EnqueueParams{} for _, option := range options { option(¶ms) diff --git a/starter/.gitignore b/starter/.gitignore index bd05a09..47173af 100644 --- a/starter/.gitignore +++ b/starter/.gitignore @@ -1,4 +1,4 @@ .env .idea/ tmp/ -sqlite3.db +sqlite3.db* diff --git a/starter/env/environment.go b/starter/env/environment.go index 7c0abb6..cf8ad0b 100644 --- a/starter/env/environment.go +++ b/starter/env/environment.go @@ -2,10 +2,11 @@ package env import ( "fmt" - . "git.gorlug.de/code/ersteller" "os" "path/filepath" + . "git.gorlug.de/code/ersteller" + "github.com/joho/godotenv" ) @@ -133,7 +134,7 @@ func GenerateEnvFile(rootPath string, overwrite bool) error { // Define default values and comments for specific keys defaults := map[string]string{ - EnvKeyDatabaseURL: "\"sqlite3.db?_fk=1\"", + EnvKeyDatabaseURL: "\"sqlite3.db?_fk=1&_journal_mode=WAL\"", EnvKeyBaseURL: "\"http://localhost:8090\"", EnvKeyIsLocal: "true", EnvKeyIsDev: "true", diff --git a/workflow/workflow.go b/workflow/workflow.go index 9906c88..e33bc71 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -38,7 +38,8 @@ func NewStep(params *StepParams) *Step { NextSteps: make(map[string]*Step), } step.Queue = queue.NewGeneralQueue(params.Identifier, - params.Client, step.HandleQueue, params.Processors) + params.Client, step.HandleQueue, params.Processors, + queue.WithAutoStop(true)) return &step } @@ -49,7 +50,7 @@ func (s *Step) AddNextStep(step *Step) { const workflowIdParam = "workflowId" func (s *Step) Execute(ctx context.Context, payload map[string]any) error { - ersteller.Debug("Executing step '%s'", s.Name, "payload", payload) + ersteller.Debug("Executing step ", s.Name, "payload", payload) _, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string))) if err != nil { return err @@ -67,7 +68,7 @@ func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queu if nextStep != nil { result.ResultPayload[workflowIdParam] = workflowId ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload) - err = nextStep.Execute(ctx, result.ResultPayload) + err = nextStep.Execute(context.Background(), result.ResultPayload) if err != nil { failurePayload := result.FailurePayload failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error()) @@ -88,6 +89,9 @@ type Workflow struct { } func NewWorkflow(name string, identifier string, firstStep *Step, allSteps []*Step) *Workflow { + for _, step := range allSteps { + step.Identifier = identifier + "_" + step.Name + } return &Workflow{ Name: name, FirstStep: firstStep,