package queue import ( "context" "fmt" "log" "sync" atomicpkg "sync/atomic" "time" . "git.gorlug.de/code/ersteller" "git.gorlug.de/code/ersteller/schema/ent" "git.gorlug.de/code/ersteller/schema/ent/generalqueue" "git.gorlug.de/code/ersteller/schema/ent/generalqueuestate" ) type GeneralQueueStatus string const ( GeneralQueueStatusPending GeneralQueueStatus = "pending" GeneralQueueStatusInProgress GeneralQueueStatus = "in_progress" GeneralQueueStatusCompleted GeneralQueueStatus = "completed" GeneralQueueStatusFailed GeneralQueueStatus = "failed" ) type GeneralQueueJob struct { ID int CreatedAt time.Time NumberOfTries int Payload map[string]interface{} Status GeneralQueueStatus FailurePayload map[string]interface{} ResultPayload map[string]interface{} } func (j GeneralQueueJob) LoadPayloadStruct(target any) error { return MapToStruct(j.Payload, target) } type GeneralQueueHandlerResult struct { ResultPayload map[string]interface{} FailurePayload map[string]interface{} } func NewSuccessGeneralQueueHandlerResult(result any) (GeneralQueueHandlerResult, error) { resultMap, err := StructToMap(result) if err != nil { return returnStructToMapError(err), err } return GeneralQueueHandlerResult{ ResultPayload: resultMap, FailurePayload: nil, }, nil } const structToMapErrorKey = "structToMapError" func returnStructToMapError(err error) GeneralQueueHandlerResult { return GeneralQueueHandlerResult{ FailurePayload: map[string]interface{}{ structToMapErrorKey: err.Error(), }, } } func FailureGeneralQueueHandlerResult(message string, err error) GeneralQueueHandlerResult { return GeneralQueueHandlerResult{ FailurePayload: map[string]interface{}{ "message": message, "error": err.Error(), }, } } func NewFailureGeneralQueueHandlerResult(failure any) (GeneralQueueHandlerResult, error) { failureMap, err := StructToMap(failure) if err != nil { return returnStructToMapError(err), err } return GeneralQueueHandlerResult{ ResultPayload: nil, FailurePayload: failureMap, }, nil } func NewGeneralQueueHandlerResult(result any, failure any) (GeneralQueueHandlerResult, error) { resultMap, err := StructToMap(result) if err != nil { return returnStructToMapError(err), err } failureMap, err := StructToMap(failure) if err != nil { return returnStructToMapError(err), err } return GeneralQueueHandlerResult{ ResultPayload: resultMap, FailurePayload: failureMap, }, nil } type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error) type GeneralQueueParams struct { AutoStop bool } type GeneraleQueueOptions func(p *GeneralQueueParams) func WithAutoStop(autoStop bool) GeneraleQueueOptions { return func(q *GeneralQueueParams) { q.AutoStop = autoStop } } type GeneralQueue struct { Name string client *ent.Client running bool handler GeneralQueueHandler workerCount int pollInterval time.Duration cancel context.CancelFunc startCtx context.Context wg sync.WaitGroup inFlight int64 GeneralQueueParams } // NewGeneralQueue creates a new general queue instance func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int, options ...GeneraleQueueOptions) *GeneralQueue { params := GeneralQueueParams{ AutoStop: true, } for _, option := range options { option(¶ms) } if processors <= 0 { processors = 1 } return &GeneralQueue{ Name: name, client: client, running: false, handler: handler, workerCount: processors, pollInterval: 5 * time.Second, GeneralQueueParams: params, } } // SetRunning persists the running state of the queue to the database func (q *GeneralQueue) SetRunning(ctx context.Context, running bool) error { now := time.Now() // Try to find existing state existing, err := q.client.GeneralQueueState.Query(). Where(generalqueuestate.NameEQ(q.Name)). First(ctx) if err != nil && !ent.IsNotFound(err) { return fmt.Errorf("failed to query queue state: %w", err) } if existing != nil { // Update existing err = q.client.GeneralQueueState.UpdateOne(existing). SetRunning(running). SetUpdatedAt(now). Exec(ctx) if err != nil { return fmt.Errorf("failed to update queue running state: %w", err) } } else { // Create new _, err = q.client.GeneralQueueState.Create(). SetName(q.Name). SetRunning(running). SetUpdatedAt(now). Save(ctx) if err != nil { return fmt.Errorf("failed to create queue running state: %w", err) } } return nil } // IsRunning returns the persisted running state of the queue from the database func (q *GeneralQueue) IsRunning(ctx context.Context) (bool, error) { state, err := q.client.GeneralQueueState.Query(). Where(generalqueuestate.NameEQ(q.Name)). First(ctx) if err != nil { if ent.IsNotFound(err) { return false, nil // No state record means not running } return false, fmt.Errorf("failed to get queue running state: %w", err) } return state.Running, nil } // Start begins processing jobs from the queue in a background goroutine func (q *GeneralQueue) Start(ctx context.Context) error { if q.running { return nil } q.running = true if err := q.SetRunning(ctx, true); err != nil { Error("Failed to persist queue running state (start):", err) return err } // create a cancellable context used by workers q.startCtx, q.cancel = context.WithCancel(ctx) // start workers workers := q.workerCount if workers <= 0 { workers = 1 } for i := 0; i < workers; i++ { q.wg.Add(1) go q.workerLoop(q.startCtx, q.handler) } if q.GeneralQueueParams.AutoStop { q.runAutoStop() } LogDebug("Queue '%s' started with %d processors", q.Name, workers) return nil } func (q *GeneralQueue) runAutoStop() { // Auto-stop persistence: when all workers finish (e.g., no more jobs), // mark queue as not running in DB and in memory, mirroring previous behavior. go func(name string) { time.Sleep(q.pollInterval) q.wg.Wait() q.running = false if err := q.SetRunning(context.Background(), false); err != nil { LogError("Failed to persist queue running state (auto-stop):", err) } LogDebug("Queue '%s' completed. Auto-stopping.", name) }(q.Name) } // Stop stops the queue from processing new jobs func (q *GeneralQueue) Stop(ctx context.Context) error { if !q.running { return nil } if q.cancel != nil { q.cancel() } q.wg.Wait() q.running = false if err := q.SetRunning(ctx, false); err != nil { Error("Failed to persist queue running state (stop):", err) return err } LogDebug("Queue '%s' stopped", q.Name) return nil } // loop processes jobs continuously until stopped func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandler) { Debug(q.Name, "worker loop started") defer q.wg.Done() ticker := time.NewTicker(q.pollInterval) defer ticker.Stop() for { select { case <-ctx.Done(): Debug(q.Name, "worker loop context done") return case <-ticker.C: // Process at most one job per tick per worker processed, err := q.processNext(ctx, handler) if err != nil { LogError("Queue '%s' processing error:", q.Name, err) } if q.AutoStop { if q.autoStopCheckForWork(ctx, err, processed) { return } } } } } func (q *GeneralQueue) autoStopCheckForWork(ctx context.Context, err error, processed bool) bool { // If no job was processed, initiate auto-stop (match previous behavior) if err == nil && !processed { // Only stop if there is truly no work left: no pending or in-progress jobs hasWork, hwErr := q.hasActiveOrPending(ctx) if hwErr != nil { LogError("Queue '%s' work-check error:", q.Name, hwErr) } Debug("Queue", q.Name, "hasWork:", hasWork) if hwErr == nil && !hasWork && atomicpkg.LoadInt64(&q.inFlight) == 0 { if q.cancel != nil { q.cancel() } return true } // Otherwise, continue ticking while others are working or new jobs may arrive } return false } // processNext processes the next pending job and returns true if a job was processed func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHandler) (bool, error) { job, err := q.claimNextPendingJob(ctx) if err != nil { Error(q.Name, "Failed to claim next pending job:", err) return false, err } if job == nil { Debug(q.Name, "no pending jobs") return false, nil // No pending jobs } Debug(q.Name, "found next pending job:", job.ID, job.Name) // Mark this worker as processing a job atomicpkg.AddInt64(&q.inFlight, 1) defer atomicpkg.AddInt64(&q.inFlight, -1) // Create job struct for handler queueJob := GeneralQueueJob{ ID: job.ID, CreatedAt: job.CreatedAt, NumberOfTries: job.NumberOfTries, Payload: job.Payload, Status: GeneralQueueStatus(job.Status), FailurePayload: job.FailurePayload, } // Create a context with 15-minute timeout for job processing jobCtx, cancel := context.WithTimeout(ctx, 15*time.Minute) defer cancel() // Process the job with timeout type handlerResult struct { err error resultPayload any errPayload any } resultChan := make(chan handlerResult, 1) go func() { //handlerErr, handlerResultPayload, handlerErrPayload := handler(jobCtx, queueJob) result, err := handler(jobCtx, queueJob) resultChan <- handlerResult{err: err, resultPayload: result.ResultPayload, errPayload: result.FailurePayload} }() var handlerErr error var resultPayload any var errPayload any select { case result := <-resultChan: handlerErr = result.err resultPayload = result.resultPayload errPayload = result.errPayload case <-jobCtx.Done(): // Timeout occurred handlerErr = fmt.Errorf("job processing timeout after 15 minutes") resultPayload = nil errPayload = nil } // Process the job err = handlerErr if err != nil { // Increment tries if incrementErr := q.IncrementTries(ctx, job.ID, job.NumberOfTries); incrementErr != nil { log.Printf("Failed to increment tries for job %d: %v", job.ID, incrementErr) } // Check if we should retry or fail if job.NumberOfTries+1 >= job.MaxRetries { errPayloadMap, mapErr := StructToMap(errPayload) if mapErr != nil { Error("failed to convert error payload to map:", mapErr) errPayloadMap = map[string]interface{}{} } // Max retries reached, mark as failed failurePayload := map[string]interface{}{ "error": err.Error(), "errorPayload": errPayloadMap, } if updateErr := q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusFailed, err.Error(), failurePayload, nil); updateErr != nil { log.Printf("Failed to mark job %d as failed: %v", job.ID, updateErr) } } else { // Retry - mark as pending again if updateErr := q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusPending, err.Error(), nil, nil); updateErr != nil { log.Printf("Failed to mark job %d as pending for retry: %v", job.ID, updateErr) } } return true, err } // Job succeeded, mark as completed resultPayloadMap, mapErr := StructToMap(resultPayload) if mapErr != nil { Error("failed to convert result payload to map:", mapErr) resultPayloadMap = map[string]interface{}{} } err = q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusCompleted, "", nil, resultPayloadMap) if err != nil { log.Printf("Failed to mark job %d as completed: %v", job.ID, err) } return true, nil } // hasActiveOrPending checks if there are any jobs still pending or currently in progress for this queue func (q *GeneralQueue) hasActiveOrPending(ctx context.Context) (bool, error) { count, err := q.client.GeneralQueue.Query(). Where( generalqueue.NameEQ(q.Name), generalqueue.StatusIn(generalqueue.StatusPending, generalqueue.StatusInProgress), ). Count(ctx) if err != nil { return false, fmt.Errorf("failed to check active/pending jobs: %w", err) } return count > 0, nil } // claimNextPendingJob atomically claims the next pending job by setting it to in_progress. // It avoids races when multiple workers attempt to pick the same job. func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) { // Try a few times in case of races with other workers for attempts := 0; attempts < 3; attempts++ { job, err := q.GetNextPendingJob(ctx) if err != nil { return nil, err } if job == nil { return nil, nil } // Attempt to move to in_progress only if it's still pending updated, err := q.client.GeneralQueue.UpdateOneID(job.ID). Where(generalqueue.StatusEQ(generalqueue.StatusPending)). SetStatus(generalqueue.StatusInProgress). SetUpdatedAt(time.Now()). Save(ctx) if err != nil { if ent.IsNotFound(err) { // Lost the race; retry continue } return nil, fmt.Errorf("failed to claim pending job: %w", err) } return updated, nil } return nil, nil } type EnqueueParams struct { WorkflowId string } type EnqueueOption func(p *EnqueueParams) func WithWorkflowId(id string) EnqueueOption { return func(q *EnqueueParams) { q.WorkflowId = id } } // Enqueue adds a new job to the queue func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) { Debug("Enqueueing job to queue:", q.Name, "payload", payload) params := EnqueueParams{} for _, option := range options { option(¶ms) } now := time.Now() payloadMap, err := StructToMap(payload) if err != nil { Error("failed to convert payload to map:", err) return nil, fmt.Errorf("failed to convert payload to map: %w", err) } create := q.client.GeneralQueue.Create(). SetName(q.Name). SetPayload(payloadMap). SetStatus(generalqueue.StatusPending). SetNumberOfTries(0). SetMaxRetries(maxRetries). SetCreatedAt(now). SetUpdatedAt(now). SetUserID(userId). SetWorkflowID(params.WorkflowId) job, err := create.Save(ctx) if err != nil { return nil, fmt.Errorf("failed to enqueue job: %w", err) } return job, nil } // GetNextPendingJob retrieves the next pending job from the queue func (q *GeneralQueue) GetNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) { job, err := q.client.GeneralQueue.Query(). Where( generalqueue.NameEQ(q.Name), generalqueue.StatusEQ(generalqueue.StatusPending), ). Order(ent.Asc(generalqueue.FieldCreatedAt)). First(ctx) if err != nil { if ent.IsNotFound(err) { return nil, nil // No pending jobs } return nil, fmt.Errorf("failed to get next pending job: %w", err) } return job, nil } // UpdateJobStatus updates the status of a job func (q *GeneralQueue) UpdateJobStatus(ctx context.Context, jobID int, status generalqueue.Status, errorMessage string, failurePayload map[string]interface{}, resultPayload map[string]interface{}) error { update := q.client.GeneralQueue.UpdateOneID(jobID). SetStatus(status). SetUpdatedAt(time.Now()) if status == generalqueue.StatusCompleted || status == generalqueue.StatusFailed { update = update.SetProcessedAt(time.Now()) } if errorMessage != "" { update = update.SetErrorMessage(errorMessage) } if failurePayload != nil { update = update.SetFailurePayload(failurePayload) } if resultPayload != nil { update = update.SetResultPayload(resultPayload) } _, err := update.Save(ctx) if err != nil { return fmt.Errorf("failed to update job status: %w", err) } return nil } // IncrementTries increments the number of tries for a job func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTries int) error { _, err := q.client.GeneralQueue.UpdateOneID(jobID). SetNumberOfTries(currentTries + 1). SetUpdatedAt(time.Now()). Save(ctx) if err != nil { return fmt.Errorf("failed to increment tries: %w", err) } return nil } // ResumeWorkflow resumes a specific workflow's execution in this queue by // setting all InProgress jobs back to Pending, allowing them to be picked up again. // After resetting, if the queue is marked running, it will be started. func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) error { // Reset in-progress jobs back to pending for the specific workflow inProgressCount, err := q.client.GeneralQueue.Update(). Where( generalqueue.NameEQ(q.Name), generalqueue.WorkflowIDEQ(workflowId), generalqueue.StatusEQ(generalqueue.StatusInProgress), ). SetStatus(generalqueue.StatusPending). SetUpdatedAt(time.Now()). Save(ctx) if err != nil { return fmt.Errorf("failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err) } if inProgressCount > 0 { Debug("Reset ", inProgressCount, " in_progress jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")") } // Start the queue again if it is configured as running if isRunning, err := q.IsRunning(ctx); err != nil { Error("Failed to check ", q.Name, " queue state:", err) } else if isRunning { Debug("restarting ", q.Name, " queue") if err := q.Start(ctx); err != nil { Error("Failed to restart ", q.Name, " queue:", err) return err } Debug(q.Name, " queue auto-started") } else { Debug(q.Name, " queue not running") } return nil } // RetryFailedWorkflow retries failed jobs for a specific workflow in this queue by // setting all Failed jobs back to Pending and resetting their NumberOfTries to 0. // After resetting, if the queue is marked running, it will be started. func (q *GeneralQueue) RetryFailedWorkflow(ctx context.Context, workflowId string) error { // Reset failed jobs back to pending and reset their try counter for the specific workflow failedCount, err := q.client.GeneralQueue.Update(). Where( generalqueue.NameEQ(q.Name), generalqueue.WorkflowIDEQ(workflowId), generalqueue.StatusEQ(generalqueue.StatusFailed), ). SetStatus(generalqueue.StatusPending). SetNumberOfTries(0). SetUpdatedAt(time.Now()). Save(ctx) if err != nil { return fmt.Errorf("failed to reset failed jobs to pending for queue '%s': %w", q.Name, err) } if failedCount > 0 { Debug("Reset ", failedCount, " failed jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")") } // Start the queue again if it is configured as running if isRunning, err := q.IsRunning(ctx); err != nil { Error("Failed to check ", q.Name, " queue state:", err) } else if isRunning { Debug("restarting ", q.Name, " queue") if err := q.Start(ctx); err != nil { Error("Failed to restart ", q.Name, " queue:", err) return err } Debug(q.Name, " queue auto-started") } else { Debug(q.Name, " queue not running") } return nil } func (q *GeneralQueue) Resume(ctx context.Context) error { // Reset all in_progress jobs back to pending count, err := q.client.GeneralQueue.Update(). Where( generalqueue.NameEQ(q.Name), generalqueue.StatusEQ(generalqueue.StatusInProgress), ). SetStatus(generalqueue.StatusPending). SetUpdatedAt(time.Now()). Save(ctx) if err != nil { customErr := fmt.Errorf("Failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err) return customErr } if count > 0 { Debug("Reset ", count, " in_progress jobs to pending for ", q.Name, " queue") } // Check if queue should auto-start if isRunning, err := q.IsRunning(ctx); err != nil { Error("Failed to check ", q.Name, " queue state:", err) } else if isRunning { Debug("restarting ", q.Name, " queue") err := q.Start(ctx) if err != nil { Error("Failed to restart ", q.Name, " queue:", err) return err } else { Debug(q.Name, " queue auto-started") } } else { Debug(q.Name, " queue not running") } return nil }