Use a new context for the next step, because the current one of the job is already stopped
This commit is contained in:
+65
-21
@@ -40,6 +40,18 @@ type GeneralQueueHandlerResult struct {
|
||||
|
||||
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
|
||||
|
||||
type GeneralQueueParams struct {
|
||||
AutoStop bool
|
||||
}
|
||||
|
||||
type GeneraleQueueOptions func(p *GeneralQueueParams)
|
||||
|
||||
func WithAutoStop(autoStop bool) GeneraleQueueOptions {
|
||||
return func(q *GeneralQueueParams) {
|
||||
q.AutoStop = autoStop
|
||||
}
|
||||
}
|
||||
|
||||
type GeneralQueue struct {
|
||||
Name string
|
||||
client *ent.Client
|
||||
@@ -51,20 +63,29 @@ type GeneralQueue struct {
|
||||
startCtx context.Context
|
||||
wg sync.WaitGroup
|
||||
inFlight int64
|
||||
GeneralQueueParams
|
||||
}
|
||||
|
||||
// NewGeneralQueue creates a new general queue instance
|
||||
func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int) *GeneralQueue {
|
||||
func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int,
|
||||
options ...GeneraleQueueOptions) *GeneralQueue {
|
||||
params := GeneralQueueParams{
|
||||
AutoStop: true,
|
||||
}
|
||||
for _, option := range options {
|
||||
option(¶ms)
|
||||
}
|
||||
if processors <= 0 {
|
||||
processors = 1
|
||||
}
|
||||
return &GeneralQueue{
|
||||
Name: name,
|
||||
client: client,
|
||||
running: false,
|
||||
handler: handler,
|
||||
workerCount: processors,
|
||||
pollInterval: 5 * time.Second,
|
||||
Name: name,
|
||||
client: client,
|
||||
running: false,
|
||||
handler: handler,
|
||||
workerCount: processors,
|
||||
pollInterval: 5 * time.Second,
|
||||
GeneralQueueParams: params,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -142,9 +163,18 @@ func (q *GeneralQueue) Start(ctx context.Context) error {
|
||||
q.wg.Add(1)
|
||||
go q.workerLoop(q.startCtx, q.handler)
|
||||
}
|
||||
if q.GeneralQueueParams.AutoStop {
|
||||
q.runAutoStop()
|
||||
}
|
||||
LogDebug("Queue '%s' started with %d processors", q.Name, workers)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *GeneralQueue) runAutoStop() {
|
||||
// Auto-stop persistence: when all workers finish (e.g., no more jobs),
|
||||
// mark queue as not running in DB and in memory, mirroring previous behavior.
|
||||
go func(name string) {
|
||||
time.Sleep(q.pollInterval)
|
||||
q.wg.Wait()
|
||||
q.running = false
|
||||
if err := q.SetRunning(context.Background(), false); err != nil {
|
||||
@@ -152,8 +182,6 @@ func (q *GeneralQueue) Start(ctx context.Context) error {
|
||||
}
|
||||
LogDebug("Queue '%s' completed. Auto-stopping.", name)
|
||||
}(q.Name)
|
||||
LogDebug("Queue '%s' started with %d processors", q.Name, workers)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the queue from processing new jobs
|
||||
@@ -176,6 +204,7 @@ func (q *GeneralQueue) Stop(ctx context.Context) error {
|
||||
|
||||
// loop processes jobs continuously until stopped
|
||||
func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandler) {
|
||||
Debug(q.Name, "worker loop started")
|
||||
defer q.wg.Done()
|
||||
ticker := time.NewTicker(q.pollInterval)
|
||||
defer ticker.Stop()
|
||||
@@ -183,6 +212,7 @@ func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandl
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
Debug(q.Name, "worker loop context done")
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Process at most one job per tick per worker
|
||||
@@ -190,34 +220,47 @@ func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandl
|
||||
if err != nil {
|
||||
LogError("Queue '%s' processing error:", q.Name, err)
|
||||
}
|
||||
// If no job was processed, initiate auto-stop (match previous behavior)
|
||||
if err == nil && !processed {
|
||||
// Only stop if there is truly no work left: no pending or in-progress jobs
|
||||
hasWork, hwErr := q.hasActiveOrPending(ctx)
|
||||
if hwErr != nil {
|
||||
LogError("Queue '%s' work-check error:", q.Name, hwErr)
|
||||
}
|
||||
if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 {
|
||||
if q.cancel != nil {
|
||||
q.cancel()
|
||||
}
|
||||
if q.AutoStop {
|
||||
if q.autoStopCheckForWork(ctx, err, processed) {
|
||||
return
|
||||
}
|
||||
// Otherwise, continue ticking while others are working or new jobs may arrive
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (q *GeneralQueue) autoStopCheckForWork(ctx context.Context, err error, processed bool) bool {
|
||||
// If no job was processed, initiate auto-stop (match previous behavior)
|
||||
if err == nil && !processed {
|
||||
// Only stop if there is truly no work left: no pending or in-progress jobs
|
||||
hasWork, hwErr := q.hasActiveOrPending(ctx)
|
||||
if hwErr != nil {
|
||||
LogError("Queue '%s' work-check error:", q.Name, hwErr)
|
||||
}
|
||||
Debug("Queue", q.Name, "hasWork:", hasWork)
|
||||
if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 {
|
||||
if q.cancel != nil {
|
||||
q.cancel()
|
||||
}
|
||||
return true
|
||||
}
|
||||
// Otherwise, continue ticking while others are working or new jobs may arrive
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// processNext processes the next pending job and returns true if a job was processed
|
||||
func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHandler) (bool, error) {
|
||||
job, err := q.claimNextPendingJob(ctx)
|
||||
if err != nil {
|
||||
Error(q.Name, "Failed to claim next pending job:", err)
|
||||
return false, err
|
||||
}
|
||||
if job == nil {
|
||||
Debug(q.Name, "no pending jobs")
|
||||
return false, nil // No pending jobs
|
||||
}
|
||||
Debug(q.Name, "found next pending job:", job.ID, job.Name)
|
||||
|
||||
// Mark this worker as processing a job
|
||||
atomicpkg.AddInt64(&q.inFlight, 1)
|
||||
@@ -374,6 +417,7 @@ func WithWorkflowId(id string) EnqueueOption {
|
||||
|
||||
// Enqueue adds a new job to the queue
|
||||
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) {
|
||||
Debug("Enqueueing job to queue:", q.Name, "payload", payload)
|
||||
params := EnqueueParams{}
|
||||
for _, option := range options {
|
||||
option(¶ms)
|
||||
|
||||
Reference in New Issue
Block a user