Only cancel if there really are no more work left
This commit is contained in:
+31
-3
@@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
atomicpkg "sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "git.gorlug.de/code/ersteller"
|
. "git.gorlug.de/code/ersteller"
|
||||||
@@ -49,6 +50,7 @@ type GeneralQueue struct {
|
|||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
startCtx context.Context
|
startCtx context.Context
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
|
inFlight int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewGeneralQueue creates a new general queue instance
|
// NewGeneralQueue creates a new general queue instance
|
||||||
@@ -190,10 +192,18 @@ func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandl
|
|||||||
}
|
}
|
||||||
// If no job was processed, initiate auto-stop (match previous behavior)
|
// If no job was processed, initiate auto-stop (match previous behavior)
|
||||||
if err == nil && !processed {
|
if err == nil && !processed {
|
||||||
if q.cancel != nil {
|
// Only stop if there is truly no work left: no pending or in-progress jobs
|
||||||
q.cancel()
|
hasWork, hwErr := q.hasActiveOrPending(ctx)
|
||||||
|
if hwErr != nil {
|
||||||
|
LogError("Queue '%s' work-check error:", q.Name, hwErr)
|
||||||
}
|
}
|
||||||
return
|
if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 {
|
||||||
|
if q.cancel != nil {
|
||||||
|
q.cancel()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Otherwise, continue ticking while others are working or new jobs may arrive
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -209,6 +219,10 @@ func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHand
|
|||||||
return false, nil // No pending jobs
|
return false, nil // No pending jobs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Mark this worker as processing a job
|
||||||
|
atomicpkg.AddInt64(&q.inFlight, 1)
|
||||||
|
defer atomicpkg.AddInt64(&q.inFlight, -1)
|
||||||
|
|
||||||
// Create job struct for handler
|
// Create job struct for handler
|
||||||
queueJob := GeneralQueueJob{
|
queueJob := GeneralQueueJob{
|
||||||
ID: job.ID,
|
ID: job.ID,
|
||||||
@@ -301,6 +315,20 @@ func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHand
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// hasActiveOrPending checks if there are any jobs still pending or currently in progress for this queue
|
||||||
|
func (q *GeneralQueue) hasActiveOrPending(ctx context.Context) (bool, error) {
|
||||||
|
count, err := q.client.GeneralQueue.Query().
|
||||||
|
Where(
|
||||||
|
generalqueue.NameEQ(q.Name),
|
||||||
|
generalqueue.StatusIn(generalqueue.StatusPending, generalqueue.StatusInProgress),
|
||||||
|
).
|
||||||
|
Count(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to check active/pending jobs: %w", err)
|
||||||
|
}
|
||||||
|
return count > 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
// claimNextPendingJob atomically claims the next pending job by setting it to in_progress.
|
// claimNextPendingJob atomically claims the next pending job by setting it to in_progress.
|
||||||
// It avoids races when multiple workers attempt to pick the same job.
|
// It avoids races when multiple workers attempt to pick the same job.
|
||||||
func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {
|
func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user