From 6b0bdade20e76ed2db47916759c8a1cff30abbae Mon Sep 17 00:00:00 2001 From: Achim Rohn Date: Sat, 20 Dec 2025 12:38:11 +0100 Subject: [PATCH] Support multiple workers in GeneralQueue --- queue/general_queue.go | 123 ++++++++++++++++++++++++++++------------- 1 file changed, 86 insertions(+), 37 deletions(-) diff --git a/queue/general_queue.go b/queue/general_queue.go index d99d5c3..a5122a6 100644 --- a/queue/general_queue.go +++ b/queue/general_queue.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "sync" "time" . "git.gorlug.de/code/ersteller" @@ -39,21 +40,29 @@ type GeneralQueueHandlerResult struct { type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error) type GeneralQueue struct { - Name string - client *ent.Client - running bool - stopChan chan bool - handler GeneralQueueHandler + Name string + client *ent.Client + running bool + handler GeneralQueueHandler + workerCount int + pollInterval time.Duration + cancel context.CancelFunc + startCtx context.Context + wg sync.WaitGroup } // NewGeneralQueue creates a new general queue instance -func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler) *GeneralQueue { +func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int) *GeneralQueue { + if processors <= 0 { + processors = 1 + } return &GeneralQueue{ - Name: name, - client: client, - running: false, - stopChan: make(chan bool, 1), - handler: handler, + Name: name, + client: client, + running: false, + handler: handler, + workerCount: processors, + pollInterval: 5 * time.Second, } } @@ -120,8 +129,28 @@ func (q *GeneralQueue) Start(ctx context.Context) error { Error("Failed to persist queue running state (start):", err) return err } - go q.loop(ctx, q.handler) - LogDebug("Queue '%s' started", q.Name) + // create a cancellable context used by workers + q.startCtx, q.cancel = context.WithCancel(ctx) + // start workers + workers := q.workerCount + if workers <= 0 { + workers = 1 + } + for i := 0; i < workers; i++ { + q.wg.Add(1) + go q.workerLoop(q.startCtx, q.handler) + } + // Auto-stop persistence: when all workers finish (e.g., no more jobs), + // mark queue as not running in DB and in memory, mirroring previous behavior. + go func(name string) { + q.wg.Wait() + q.running = false + if err := q.SetRunning(context.Background(), false); err != nil { + LogError("Failed to persist queue running state (auto-stop):", err) + } + LogDebug("Queue '%s' completed. Auto-stopping.", name) + }(q.Name) + LogDebug("Queue '%s' started with %d processors", q.Name, workers) return nil } @@ -130,7 +159,10 @@ func (q *GeneralQueue) Stop(ctx context.Context) error { if !q.running { return nil } - q.stopChan <- true + if q.cancel != nil { + q.cancel() + } + q.wg.Wait() q.running = false if err := q.SetRunning(ctx, false); err != nil { Error("Failed to persist queue running state (stop):", err) @@ -141,33 +173,26 @@ func (q *GeneralQueue) Stop(ctx context.Context) error { } // loop processes jobs continuously until stopped -func (q *GeneralQueue) loop(ctx context.Context, handler GeneralQueueHandler) { - ticker := time.NewTicker(5 * time.Second) +func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandler) { + defer q.wg.Done() + ticker := time.NewTicker(q.pollInterval) defer ticker.Stop() for { select { - case <-q.stopChan: - return case <-ctx.Done(): - q.running = false - if err := q.SetRunning(ctx, false); err != nil { - Error("Failed to persist queue running state (context done):", err) - } return case <-ticker.C: - // Process at most one job per tick + // Process at most one job per tick per worker processed, err := q.processNext(ctx, handler) if err != nil { LogError("Queue '%s' processing error:", q.Name, err) } - if !processed { - // No more jobs, auto-stop - q.running = false - if err := q.SetRunning(ctx, false); err != nil { - LogError("Failed to persist queue running state (auto-stop):", err) + // If no job was processed, initiate auto-stop (match previous behavior) + if err == nil && !processed { + if q.cancel != nil { + q.cancel() } - LogDebug("Queue '%s' completed. Auto-stopping.", q.Name) return } } @@ -176,7 +201,7 @@ func (q *GeneralQueue) loop(ctx context.Context, handler GeneralQueueHandler) { // processNext processes the next pending job and returns true if a job was processed func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHandler) (bool, error) { - job, err := q.GetNextPendingJob(ctx) + job, err := q.claimNextPendingJob(ctx) if err != nil { return false, err } @@ -184,13 +209,6 @@ func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHand return false, nil // No pending jobs } - // Mark job as in progress - err = q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusInProgress, "", nil, nil) - if err != nil { - log.Printf("Failed to mark job %d as in progress: %v", job.ID, err) - return true, err - } - // Create job struct for handler queueJob := GeneralQueueJob{ ID: job.ID, @@ -283,6 +301,37 @@ func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHand return true, nil } +// claimNextPendingJob atomically claims the next pending job by setting it to in_progress. +// It avoids races when multiple workers attempt to pick the same job. +func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) { + // Try a few times in case of races with other workers + for attempts := 0; attempts < 3; attempts++ { + job, err := q.GetNextPendingJob(ctx) + if err != nil { + return nil, err + } + if job == nil { + return nil, nil + } + + // Attempt to move to in_progress only if it's still pending + updated, err := q.client.GeneralQueue.UpdateOneID(job.ID). + Where(generalqueue.StatusEQ(generalqueue.StatusPending)). + SetStatus(generalqueue.StatusInProgress). + SetUpdatedAt(time.Now()). + Save(ctx) + if err != nil { + if ent.IsNotFound(err) { + // Lost the race; retry + continue + } + return nil, fmt.Errorf("failed to claim pending job: %w", err) + } + return updated, nil + } + return nil, nil +} + // Enqueue adds a new job to the queue func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int) (*ent.GeneralQueue, error) { now := time.Now()