553 lines
15 KiB
Go
553 lines
15 KiB
Go
package queue
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
atomicpkg "sync/atomic"
|
|
"time"
|
|
|
|
. "git.gorlug.de/code/ersteller"
|
|
"git.gorlug.de/code/ersteller/schema/ent"
|
|
"git.gorlug.de/code/ersteller/schema/ent/generalqueue"
|
|
"git.gorlug.de/code/ersteller/schema/ent/generalqueuestate"
|
|
)
|
|
|
|
type GeneralQueueStatus string
|
|
|
|
const (
|
|
GeneralQueueStatusPending GeneralQueueStatus = "pending"
|
|
GeneralQueueStatusInProgress GeneralQueueStatus = "in_progress"
|
|
GeneralQueueStatusCompleted GeneralQueueStatus = "completed"
|
|
GeneralQueueStatusFailed GeneralQueueStatus = "failed"
|
|
)
|
|
|
|
type GeneralQueueJob struct {
|
|
ID int
|
|
CreatedAt time.Time
|
|
NumberOfTries int
|
|
Payload map[string]interface{}
|
|
Status GeneralQueueStatus
|
|
FailurePayload map[string]interface{}
|
|
ResultPayload map[string]interface{}
|
|
}
|
|
|
|
type GeneralQueueHandlerResult struct {
|
|
ResultPayload map[string]interface{}
|
|
FailurePayload map[string]interface{}
|
|
}
|
|
|
|
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
|
|
|
|
type GeneralQueueParams struct {
|
|
AutoStop bool
|
|
}
|
|
|
|
type GeneraleQueueOptions func(p *GeneralQueueParams)
|
|
|
|
func WithAutoStop(autoStop bool) GeneraleQueueOptions {
|
|
return func(q *GeneralQueueParams) {
|
|
q.AutoStop = autoStop
|
|
}
|
|
}
|
|
|
|
type GeneralQueue struct {
|
|
Name string
|
|
client *ent.Client
|
|
running bool
|
|
handler GeneralQueueHandler
|
|
workerCount int
|
|
pollInterval time.Duration
|
|
cancel context.CancelFunc
|
|
startCtx context.Context
|
|
wg sync.WaitGroup
|
|
inFlight int64
|
|
GeneralQueueParams
|
|
}
|
|
|
|
// NewGeneralQueue creates a new general queue instance
|
|
func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int,
|
|
options ...GeneraleQueueOptions) *GeneralQueue {
|
|
params := GeneralQueueParams{
|
|
AutoStop: true,
|
|
}
|
|
for _, option := range options {
|
|
option(¶ms)
|
|
}
|
|
if processors <= 0 {
|
|
processors = 1
|
|
}
|
|
return &GeneralQueue{
|
|
Name: name,
|
|
client: client,
|
|
running: false,
|
|
handler: handler,
|
|
workerCount: processors,
|
|
pollInterval: 5 * time.Second,
|
|
GeneralQueueParams: params,
|
|
}
|
|
}
|
|
|
|
// SetRunning persists the running state of the queue to the database
|
|
func (q *GeneralQueue) SetRunning(ctx context.Context, running bool) error {
|
|
now := time.Now()
|
|
|
|
// Try to find existing state
|
|
existing, err := q.client.GeneralQueueState.Query().
|
|
Where(generalqueuestate.NameEQ(q.Name)).
|
|
First(ctx)
|
|
|
|
if err != nil && !ent.IsNotFound(err) {
|
|
return fmt.Errorf("failed to query queue state: %w", err)
|
|
}
|
|
|
|
if existing != nil {
|
|
// Update existing
|
|
err = q.client.GeneralQueueState.UpdateOne(existing).
|
|
SetRunning(running).
|
|
SetUpdatedAt(now).
|
|
Exec(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update queue running state: %w", err)
|
|
}
|
|
} else {
|
|
// Create new
|
|
_, err = q.client.GeneralQueueState.Create().
|
|
SetName(q.Name).
|
|
SetRunning(running).
|
|
SetUpdatedAt(now).
|
|
Save(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create queue running state: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IsRunning returns the persisted running state of the queue from the database
|
|
func (q *GeneralQueue) IsRunning(ctx context.Context) (bool, error) {
|
|
state, err := q.client.GeneralQueueState.Query().
|
|
Where(generalqueuestate.NameEQ(q.Name)).
|
|
First(ctx)
|
|
|
|
if err != nil {
|
|
if ent.IsNotFound(err) {
|
|
return false, nil // No state record means not running
|
|
}
|
|
return false, fmt.Errorf("failed to get queue running state: %w", err)
|
|
}
|
|
|
|
return state.Running, nil
|
|
}
|
|
|
|
// Start begins processing jobs from the queue in a background goroutine
|
|
func (q *GeneralQueue) Start(ctx context.Context) error {
|
|
if q.running {
|
|
return nil
|
|
}
|
|
q.running = true
|
|
if err := q.SetRunning(ctx, true); err != nil {
|
|
Error("Failed to persist queue running state (start):", err)
|
|
return err
|
|
}
|
|
// create a cancellable context used by workers
|
|
q.startCtx, q.cancel = context.WithCancel(ctx)
|
|
// start workers
|
|
workers := q.workerCount
|
|
if workers <= 0 {
|
|
workers = 1
|
|
}
|
|
for i := 0; i < workers; i++ {
|
|
q.wg.Add(1)
|
|
go q.workerLoop(q.startCtx, q.handler)
|
|
}
|
|
if q.GeneralQueueParams.AutoStop {
|
|
q.runAutoStop()
|
|
}
|
|
LogDebug("Queue '%s' started with %d processors", q.Name, workers)
|
|
return nil
|
|
}
|
|
|
|
func (q *GeneralQueue) runAutoStop() {
|
|
// Auto-stop persistence: when all workers finish (e.g., no more jobs),
|
|
// mark queue as not running in DB and in memory, mirroring previous behavior.
|
|
go func(name string) {
|
|
time.Sleep(q.pollInterval)
|
|
q.wg.Wait()
|
|
q.running = false
|
|
if err := q.SetRunning(context.Background(), false); err != nil {
|
|
LogError("Failed to persist queue running state (auto-stop):", err)
|
|
}
|
|
LogDebug("Queue '%s' completed. Auto-stopping.", name)
|
|
}(q.Name)
|
|
}
|
|
|
|
// Stop stops the queue from processing new jobs
|
|
func (q *GeneralQueue) Stop(ctx context.Context) error {
|
|
if !q.running {
|
|
return nil
|
|
}
|
|
if q.cancel != nil {
|
|
q.cancel()
|
|
}
|
|
q.wg.Wait()
|
|
q.running = false
|
|
if err := q.SetRunning(ctx, false); err != nil {
|
|
Error("Failed to persist queue running state (stop):", err)
|
|
return err
|
|
}
|
|
LogDebug("Queue '%s' stopped", q.Name)
|
|
return nil
|
|
}
|
|
|
|
// loop processes jobs continuously until stopped
|
|
func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandler) {
|
|
Debug(q.Name, "worker loop started")
|
|
defer q.wg.Done()
|
|
ticker := time.NewTicker(q.pollInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
Debug(q.Name, "worker loop context done")
|
|
return
|
|
case <-ticker.C:
|
|
// Process at most one job per tick per worker
|
|
processed, err := q.processNext(ctx, handler)
|
|
if err != nil {
|
|
LogError("Queue '%s' processing error:", q.Name, err)
|
|
}
|
|
if q.AutoStop {
|
|
if q.autoStopCheckForWork(ctx, err, processed) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *GeneralQueue) autoStopCheckForWork(ctx context.Context, err error, processed bool) bool {
|
|
// If no job was processed, initiate auto-stop (match previous behavior)
|
|
if err == nil && !processed {
|
|
// Only stop if there is truly no work left: no pending or in-progress jobs
|
|
hasWork, hwErr := q.hasActiveOrPending(ctx)
|
|
if hwErr != nil {
|
|
LogError("Queue '%s' work-check error:", q.Name, hwErr)
|
|
}
|
|
Debug("Queue", q.Name, "hasWork:", hasWork)
|
|
if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 {
|
|
if q.cancel != nil {
|
|
q.cancel()
|
|
}
|
|
return true
|
|
}
|
|
// Otherwise, continue ticking while others are working or new jobs may arrive
|
|
}
|
|
return false
|
|
}
|
|
|
|
// processNext processes the next pending job and returns true if a job was processed
|
|
func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHandler) (bool, error) {
|
|
job, err := q.claimNextPendingJob(ctx)
|
|
if err != nil {
|
|
Error(q.Name, "Failed to claim next pending job:", err)
|
|
return false, err
|
|
}
|
|
if job == nil {
|
|
Debug(q.Name, "no pending jobs")
|
|
return false, nil // No pending jobs
|
|
}
|
|
Debug(q.Name, "found next pending job:", job.ID, job.Name)
|
|
|
|
// Mark this worker as processing a job
|
|
atomicpkg.AddInt64(&q.inFlight, 1)
|
|
defer atomicpkg.AddInt64(&q.inFlight, -1)
|
|
|
|
// Create job struct for handler
|
|
queueJob := GeneralQueueJob{
|
|
ID: job.ID,
|
|
CreatedAt: job.CreatedAt,
|
|
NumberOfTries: job.NumberOfTries,
|
|
Payload: job.Payload,
|
|
Status: GeneralQueueStatus(job.Status),
|
|
FailurePayload: job.FailurePayload,
|
|
}
|
|
|
|
// Create a context with 15-minute timeout for job processing
|
|
jobCtx, cancel := context.WithTimeout(ctx, 15*time.Minute)
|
|
defer cancel()
|
|
|
|
// Process the job with timeout
|
|
type handlerResult struct {
|
|
err error
|
|
resultPayload any
|
|
errPayload any
|
|
}
|
|
resultChan := make(chan handlerResult, 1)
|
|
|
|
go func() {
|
|
//handlerErr, handlerResultPayload, handlerErrPayload := handler(jobCtx, queueJob)
|
|
result, err := handler(jobCtx, queueJob)
|
|
resultChan <- handlerResult{err: err, resultPayload: result.ResultPayload, errPayload: result.FailurePayload}
|
|
}()
|
|
|
|
var handlerErr error
|
|
var resultPayload any
|
|
var errPayload any
|
|
|
|
select {
|
|
case result := <-resultChan:
|
|
handlerErr = result.err
|
|
resultPayload = result.resultPayload
|
|
errPayload = result.errPayload
|
|
case <-jobCtx.Done():
|
|
// Timeout occurred
|
|
handlerErr = fmt.Errorf("job processing timeout after 15 minutes")
|
|
resultPayload = nil
|
|
errPayload = nil
|
|
}
|
|
|
|
// Process the job
|
|
err = handlerErr
|
|
if err != nil {
|
|
// Increment tries
|
|
if incrementErr := q.IncrementTries(ctx, job.ID, job.NumberOfTries); incrementErr != nil {
|
|
log.Printf("Failed to increment tries for job %d: %v", job.ID, incrementErr)
|
|
}
|
|
|
|
// Check if we should retry or fail
|
|
if job.NumberOfTries+1 >= job.MaxRetries {
|
|
errPayloadMap, mapErr := StructToMap(errPayload)
|
|
if mapErr != nil {
|
|
Error("failed to convert error payload to map:", mapErr)
|
|
errPayloadMap = map[string]interface{}{}
|
|
}
|
|
|
|
// Max retries reached, mark as failed
|
|
failurePayload := map[string]interface{}{
|
|
"error": err.Error(),
|
|
"errorPayload": errPayloadMap,
|
|
}
|
|
if updateErr := q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusFailed, err.Error(), failurePayload, nil); updateErr != nil {
|
|
log.Printf("Failed to mark job %d as failed: %v", job.ID, updateErr)
|
|
}
|
|
} else {
|
|
// Retry - mark as pending again
|
|
if updateErr := q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusPending, err.Error(), nil, nil); updateErr != nil {
|
|
log.Printf("Failed to mark job %d as pending for retry: %v", job.ID, updateErr)
|
|
}
|
|
}
|
|
return true, err
|
|
}
|
|
|
|
// Job succeeded, mark as completed
|
|
resultPayloadMap, mapErr := StructToMap(resultPayload)
|
|
if mapErr != nil {
|
|
Error("failed to convert result payload to map:", mapErr)
|
|
resultPayloadMap = map[string]interface{}{}
|
|
}
|
|
|
|
err = q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusCompleted, "", nil, resultPayloadMap)
|
|
if err != nil {
|
|
log.Printf("Failed to mark job %d as completed: %v", job.ID, err)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// hasActiveOrPending checks if there are any jobs still pending or currently in progress for this queue
|
|
func (q *GeneralQueue) hasActiveOrPending(ctx context.Context) (bool, error) {
|
|
count, err := q.client.GeneralQueue.Query().
|
|
Where(
|
|
generalqueue.NameEQ(q.Name),
|
|
generalqueue.StatusIn(generalqueue.StatusPending, generalqueue.StatusInProgress),
|
|
).
|
|
Count(ctx)
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to check active/pending jobs: %w", err)
|
|
}
|
|
return count > 0, nil
|
|
}
|
|
|
|
// claimNextPendingJob atomically claims the next pending job by setting it to in_progress.
|
|
// It avoids races when multiple workers attempt to pick the same job.
|
|
func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {
|
|
// Try a few times in case of races with other workers
|
|
for attempts := 0; attempts < 3; attempts++ {
|
|
job, err := q.GetNextPendingJob(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if job == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
// Attempt to move to in_progress only if it's still pending
|
|
updated, err := q.client.GeneralQueue.UpdateOneID(job.ID).
|
|
Where(generalqueue.StatusEQ(generalqueue.StatusPending)).
|
|
SetStatus(generalqueue.StatusInProgress).
|
|
SetUpdatedAt(time.Now()).
|
|
Save(ctx)
|
|
if err != nil {
|
|
if ent.IsNotFound(err) {
|
|
// Lost the race; retry
|
|
continue
|
|
}
|
|
return nil, fmt.Errorf("failed to claim pending job: %w", err)
|
|
}
|
|
return updated, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
type EnqueueParams struct {
|
|
WorkflowId string
|
|
}
|
|
|
|
type EnqueueOption func(p *EnqueueParams)
|
|
|
|
func WithWorkflowId(id string) EnqueueOption {
|
|
return func(q *EnqueueParams) {
|
|
q.WorkflowId = id
|
|
}
|
|
}
|
|
|
|
// Enqueue adds a new job to the queue
|
|
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) {
|
|
Debug("Enqueueing job to queue:", q.Name, "payload", payload)
|
|
params := EnqueueParams{}
|
|
for _, option := range options {
|
|
option(¶ms)
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
payloadMap, err := StructToMap(payload)
|
|
if err != nil {
|
|
Error("failed to convert payload to map:", err)
|
|
return nil, fmt.Errorf("failed to convert payload to map: %w", err)
|
|
}
|
|
|
|
create := q.client.GeneralQueue.Create().
|
|
SetName(q.Name).
|
|
SetPayload(payloadMap).
|
|
SetStatus(generalqueue.StatusPending).
|
|
SetNumberOfTries(0).
|
|
SetMaxRetries(maxRetries).
|
|
SetCreatedAt(now).
|
|
SetUpdatedAt(now).
|
|
SetUserID(userId).
|
|
SetWorkflowID(params.WorkflowId)
|
|
|
|
job, err := create.Save(ctx)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to enqueue job: %w", err)
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
// GetNextPendingJob retrieves the next pending job from the queue
|
|
func (q *GeneralQueue) GetNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {
|
|
job, err := q.client.GeneralQueue.Query().
|
|
Where(
|
|
generalqueue.NameEQ(q.Name),
|
|
generalqueue.StatusEQ(generalqueue.StatusPending),
|
|
).
|
|
Order(ent.Asc(generalqueue.FieldCreatedAt)).
|
|
First(ctx)
|
|
|
|
if err != nil {
|
|
if ent.IsNotFound(err) {
|
|
return nil, nil // No pending jobs
|
|
}
|
|
return nil, fmt.Errorf("failed to get next pending job: %w", err)
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
// UpdateJobStatus updates the status of a job
|
|
func (q *GeneralQueue) UpdateJobStatus(ctx context.Context, jobID int, status generalqueue.Status, errorMessage string, failurePayload map[string]interface{}, resultPayload map[string]interface{}) error {
|
|
update := q.client.GeneralQueue.UpdateOneID(jobID).
|
|
SetStatus(status).
|
|
SetUpdatedAt(time.Now())
|
|
|
|
if status == generalqueue.StatusCompleted || status == generalqueue.StatusFailed {
|
|
update = update.SetProcessedAt(time.Now())
|
|
}
|
|
|
|
if errorMessage != "" {
|
|
update = update.SetErrorMessage(errorMessage)
|
|
}
|
|
|
|
if failurePayload != nil {
|
|
update = update.SetFailurePayload(failurePayload)
|
|
}
|
|
|
|
if resultPayload != nil {
|
|
update = update.SetResultPayload(resultPayload)
|
|
}
|
|
|
|
_, err := update.Save(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update job status: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// IncrementTries increments the number of tries for a job
|
|
func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTries int) error {
|
|
_, err := q.client.GeneralQueue.UpdateOneID(jobID).
|
|
SetNumberOfTries(currentTries + 1).
|
|
SetUpdatedAt(time.Now()).
|
|
Save(ctx)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to increment tries: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *GeneralQueue) Resume(ctx context.Context) error {
|
|
// Reset all in_progress jobs back to pending
|
|
count, err := q.client.GeneralQueue.Update().
|
|
Where(
|
|
generalqueue.NameEQ(q.Name),
|
|
generalqueue.StatusEQ(generalqueue.StatusInProgress),
|
|
).
|
|
SetStatus(generalqueue.StatusPending).
|
|
SetUpdatedAt(time.Now()).
|
|
Save(ctx)
|
|
if err != nil {
|
|
customErr := fmt.Errorf("Failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err)
|
|
return customErr
|
|
}
|
|
if count > 0 {
|
|
Debug("Reset ", count, " in_progress jobs to pending for ", q.Name, " queue")
|
|
}
|
|
|
|
// Check if queue should auto-start
|
|
if isRunning, err := q.IsRunning(ctx); err != nil {
|
|
Error("Failed to check ", q.Name, " queue state:", err)
|
|
} else if isRunning {
|
|
Debug("restarting ", q.Name, " queue")
|
|
err := q.Start(ctx)
|
|
if err != nil {
|
|
Error("Failed to restart ", q.Name, " queue:", err)
|
|
return err
|
|
} else {
|
|
Debug(q.Name, " queue auto-started")
|
|
}
|
|
} else {
|
|
Debug(q.Name, " queue not running")
|
|
}
|
|
return nil
|
|
}
|