From 9a3fb06c7a3bedc294ad0e2ffab00ac85da8e854 Mon Sep 17 00:00:00 2001 From: Achim Rohn Date: Sat, 20 Dec 2025 13:01:05 +0100 Subject: [PATCH] Only cancel if there really are no more work left --- queue/general_queue.go | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/queue/general_queue.go b/queue/general_queue.go index a5122a6..df42c09 100644 --- a/queue/general_queue.go +++ b/queue/general_queue.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "sync" + atomicpkg "sync/atomic" "time" . "git.gorlug.de/code/ersteller" @@ -49,6 +50,7 @@ type GeneralQueue struct { cancel context.CancelFunc startCtx context.Context wg sync.WaitGroup + inFlight int64 } // NewGeneralQueue creates a new general queue instance @@ -190,10 +192,18 @@ func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandl } // If no job was processed, initiate auto-stop (match previous behavior) if err == nil && !processed { - if q.cancel != nil { - q.cancel() + // Only stop if there is truly no work left: no pending or in-progress jobs + hasWork, hwErr := q.hasActiveOrPending(ctx) + if hwErr != nil { + LogError("Queue '%s' work-check error:", q.Name, hwErr) } - return + if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 { + if q.cancel != nil { + q.cancel() + } + return + } + // Otherwise, continue ticking while others are working or new jobs may arrive } } } @@ -209,6 +219,10 @@ func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHand return false, nil // No pending jobs } + // Mark this worker as processing a job + atomicpkg.AddInt64(&q.inFlight, 1) + defer atomicpkg.AddInt64(&q.inFlight, -1) + // Create job struct for handler queueJob := GeneralQueueJob{ ID: job.ID, @@ -301,6 +315,20 @@ func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHand return true, nil } +// hasActiveOrPending checks if there are any jobs still pending or currently in progress for this queue +func (q *GeneralQueue) hasActiveOrPending(ctx context.Context) (bool, error) { + count, err := q.client.GeneralQueue.Query(). + Where( + generalqueue.NameEQ(q.Name), + generalqueue.StatusIn(generalqueue.StatusPending, generalqueue.StatusInProgress), + ). + Count(ctx) + if err != nil { + return false, fmt.Errorf("failed to check active/pending jobs: %w", err) + } + return count > 0, nil +} + // claimNextPendingJob atomically claims the next pending job by setting it to in_progress. // It avoids races when multiple workers attempt to pick the same job. func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {