Files
ersteller/queue/general_queue.go
T
2025-12-20 12:38:11 +01:00

446 lines
12 KiB
Go

package queue
import (
"context"
"fmt"
"log"
"sync"
"time"
. "git.gorlug.de/code/ersteller"
"git.gorlug.de/code/ersteller/schema/ent"
"git.gorlug.de/code/ersteller/schema/ent/generalqueue"
"git.gorlug.de/code/ersteller/schema/ent/generalqueuestate"
)
type GeneralQueueStatus string
const (
GeneralQueueStatusPending GeneralQueueStatus = "pending"
GeneralQueueStatusInProgress GeneralQueueStatus = "in_progress"
GeneralQueueStatusCompleted GeneralQueueStatus = "completed"
GeneralQueueStatusFailed GeneralQueueStatus = "failed"
)
type GeneralQueueJob struct {
ID int
CreatedAt time.Time
NumberOfTries int
Payload map[string]interface{}
Status GeneralQueueStatus
FailurePayload map[string]interface{}
ResultPayload map[string]interface{}
}
type GeneralQueueHandlerResult struct {
ResultPayload map[string]interface{}
FailurePayload map[string]interface{}
}
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
type GeneralQueue struct {
Name string
client *ent.Client
running bool
handler GeneralQueueHandler
workerCount int
pollInterval time.Duration
cancel context.CancelFunc
startCtx context.Context
wg sync.WaitGroup
}
// NewGeneralQueue creates a new general queue instance
func NewGeneralQueue(name string, client *ent.Client, handler GeneralQueueHandler, processors int) *GeneralQueue {
if processors <= 0 {
processors = 1
}
return &GeneralQueue{
Name: name,
client: client,
running: false,
handler: handler,
workerCount: processors,
pollInterval: 5 * time.Second,
}
}
// SetRunning persists the running state of the queue to the database
func (q *GeneralQueue) SetRunning(ctx context.Context, running bool) error {
now := time.Now()
// Try to find existing state
existing, err := q.client.GeneralQueueState.Query().
Where(generalqueuestate.NameEQ(q.Name)).
First(ctx)
if err != nil && !ent.IsNotFound(err) {
return fmt.Errorf("failed to query queue state: %w", err)
}
if existing != nil {
// Update existing
err = q.client.GeneralQueueState.UpdateOne(existing).
SetRunning(running).
SetUpdatedAt(now).
Exec(ctx)
if err != nil {
return fmt.Errorf("failed to update queue running state: %w", err)
}
} else {
// Create new
_, err = q.client.GeneralQueueState.Create().
SetName(q.Name).
SetRunning(running).
SetUpdatedAt(now).
Save(ctx)
if err != nil {
return fmt.Errorf("failed to create queue running state: %w", err)
}
}
return nil
}
// IsRunning returns the persisted running state of the queue from the database
func (q *GeneralQueue) IsRunning(ctx context.Context) (bool, error) {
state, err := q.client.GeneralQueueState.Query().
Where(generalqueuestate.NameEQ(q.Name)).
First(ctx)
if err != nil {
if ent.IsNotFound(err) {
return false, nil // No state record means not running
}
return false, fmt.Errorf("failed to get queue running state: %w", err)
}
return state.Running, nil
}
// Start begins processing jobs from the queue in a background goroutine
func (q *GeneralQueue) Start(ctx context.Context) error {
if q.running {
return nil
}
q.running = true
if err := q.SetRunning(ctx, true); err != nil {
Error("Failed to persist queue running state (start):", err)
return err
}
// create a cancellable context used by workers
q.startCtx, q.cancel = context.WithCancel(ctx)
// start workers
workers := q.workerCount
if workers <= 0 {
workers = 1
}
for i := 0; i < workers; i++ {
q.wg.Add(1)
go q.workerLoop(q.startCtx, q.handler)
}
// Auto-stop persistence: when all workers finish (e.g., no more jobs),
// mark queue as not running in DB and in memory, mirroring previous behavior.
go func(name string) {
q.wg.Wait()
q.running = false
if err := q.SetRunning(context.Background(), false); err != nil {
LogError("Failed to persist queue running state (auto-stop):", err)
}
LogDebug("Queue '%s' completed. Auto-stopping.", name)
}(q.Name)
LogDebug("Queue '%s' started with %d processors", q.Name, workers)
return nil
}
// Stop stops the queue from processing new jobs
func (q *GeneralQueue) Stop(ctx context.Context) error {
if !q.running {
return nil
}
if q.cancel != nil {
q.cancel()
}
q.wg.Wait()
q.running = false
if err := q.SetRunning(ctx, false); err != nil {
Error("Failed to persist queue running state (stop):", err)
return err
}
LogDebug("Queue '%s' stopped", q.Name)
return nil
}
// loop processes jobs continuously until stopped
func (q *GeneralQueue) workerLoop(ctx context.Context, handler GeneralQueueHandler) {
defer q.wg.Done()
ticker := time.NewTicker(q.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// Process at most one job per tick per worker
processed, err := q.processNext(ctx, handler)
if err != nil {
LogError("Queue '%s' processing error:", q.Name, err)
}
// If no job was processed, initiate auto-stop (match previous behavior)
if err == nil && !processed {
if q.cancel != nil {
q.cancel()
}
return
}
}
}
}
// processNext processes the next pending job and returns true if a job was processed
func (q *GeneralQueue) processNext(ctx context.Context, handler GeneralQueueHandler) (bool, error) {
job, err := q.claimNextPendingJob(ctx)
if err != nil {
return false, err
}
if job == nil {
return false, nil // No pending jobs
}
// Create job struct for handler
queueJob := GeneralQueueJob{
ID: job.ID,
CreatedAt: job.CreatedAt,
NumberOfTries: job.NumberOfTries,
Payload: job.Payload,
Status: GeneralQueueStatus(job.Status),
FailurePayload: job.FailurePayload,
}
// Create a context with 15-minute timeout for job processing
jobCtx, cancel := context.WithTimeout(ctx, 15*time.Minute)
defer cancel()
// Process the job with timeout
type handlerResult struct {
err error
resultPayload any
errPayload any
}
resultChan := make(chan handlerResult, 1)
go func() {
//handlerErr, handlerResultPayload, handlerErrPayload := handler(jobCtx, queueJob)
result, err := handler(jobCtx, queueJob)
resultChan <- handlerResult{err: err, resultPayload: result.ResultPayload, errPayload: result.FailurePayload}
}()
var handlerErr error
var resultPayload any
var errPayload any
select {
case result := <-resultChan:
handlerErr = result.err
resultPayload = result.resultPayload
errPayload = result.errPayload
case <-jobCtx.Done():
// Timeout occurred
handlerErr = fmt.Errorf("job processing timeout after 15 minutes")
resultPayload = nil
errPayload = nil
}
// Process the job
err = handlerErr
if err != nil {
// Increment tries
if incrementErr := q.IncrementTries(ctx, job.ID, job.NumberOfTries); incrementErr != nil {
log.Printf("Failed to increment tries for job %d: %v", job.ID, incrementErr)
}
// Check if we should retry or fail
if job.NumberOfTries+1 >= job.MaxRetries {
errPayloadMap, mapErr := StructToMap(errPayload)
if mapErr != nil {
Error("failed to convert error payload to map:", mapErr)
errPayloadMap = map[string]interface{}{}
}
// Max retries reached, mark as failed
failurePayload := map[string]interface{}{
"error": err.Error(),
"errorPayload": errPayloadMap,
}
if updateErr := q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusFailed, err.Error(), failurePayload, nil); updateErr != nil {
log.Printf("Failed to mark job %d as failed: %v", job.ID, updateErr)
}
} else {
// Retry - mark as pending again
if updateErr := q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusPending, err.Error(), nil, nil); updateErr != nil {
log.Printf("Failed to mark job %d as pending for retry: %v", job.ID, updateErr)
}
}
return true, err
}
// Job succeeded, mark as completed
resultPayloadMap, mapErr := StructToMap(resultPayload)
if mapErr != nil {
Error("failed to convert result payload to map:", mapErr)
resultPayloadMap = map[string]interface{}{}
}
err = q.UpdateJobStatus(ctx, job.ID, generalqueue.StatusCompleted, "", nil, resultPayloadMap)
if err != nil {
log.Printf("Failed to mark job %d as completed: %v", job.ID, err)
}
return true, nil
}
// claimNextPendingJob atomically claims the next pending job by setting it to in_progress.
// It avoids races when multiple workers attempt to pick the same job.
func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {
// Try a few times in case of races with other workers
for attempts := 0; attempts < 3; attempts++ {
job, err := q.GetNextPendingJob(ctx)
if err != nil {
return nil, err
}
if job == nil {
return nil, nil
}
// Attempt to move to in_progress only if it's still pending
updated, err := q.client.GeneralQueue.UpdateOneID(job.ID).
Where(generalqueue.StatusEQ(generalqueue.StatusPending)).
SetStatus(generalqueue.StatusInProgress).
SetUpdatedAt(time.Now()).
Save(ctx)
if err != nil {
if ent.IsNotFound(err) {
// Lost the race; retry
continue
}
return nil, fmt.Errorf("failed to claim pending job: %w", err)
}
return updated, nil
}
return nil, nil
}
// Enqueue adds a new job to the queue
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int) (*ent.GeneralQueue, error) {
now := time.Now()
payloadMap, err := StructToMap(payload)
if err != nil {
Error("failed to convert payload to map:", err)
return nil, fmt.Errorf("failed to convert payload to map: %w", err)
}
create := q.client.GeneralQueue.Create().
SetName(q.Name).
SetPayload(payloadMap).
SetStatus(generalqueue.StatusPending).
SetNumberOfTries(0).
SetMaxRetries(maxRetries).
SetCreatedAt(now).
SetUpdatedAt(now).
SetUserID(userId)
job, err := create.Save(ctx)
if err != nil {
return nil, fmt.Errorf("failed to enqueue job: %w", err)
}
return job, nil
}
// GetNextPendingJob retrieves the next pending job from the queue
func (q *GeneralQueue) GetNextPendingJob(ctx context.Context) (*ent.GeneralQueue, error) {
job, err := q.client.GeneralQueue.Query().
Where(
generalqueue.NameEQ(q.Name),
generalqueue.StatusEQ(generalqueue.StatusPending),
).
Order(ent.Asc(generalqueue.FieldCreatedAt)).
First(ctx)
if err != nil {
if ent.IsNotFound(err) {
return nil, nil // No pending jobs
}
return nil, fmt.Errorf("failed to get next pending job: %w", err)
}
return job, nil
}
// UpdateJobStatus updates the status of a job
func (q *GeneralQueue) UpdateJobStatus(ctx context.Context, jobID int, status generalqueue.Status, errorMessage string, failurePayload map[string]interface{}, resultPayload map[string]interface{}) error {
update := q.client.GeneralQueue.UpdateOneID(jobID).
SetStatus(status).
SetUpdatedAt(time.Now())
if status == generalqueue.StatusCompleted || status == generalqueue.StatusFailed {
update = update.SetProcessedAt(time.Now())
}
if errorMessage != "" {
update = update.SetErrorMessage(errorMessage)
}
if failurePayload != nil {
update = update.SetFailurePayload(failurePayload)
}
if resultPayload != nil {
update = update.SetResultPayload(resultPayload)
}
_, err := update.Save(ctx)
if err != nil {
return fmt.Errorf("failed to update job status: %w", err)
}
return nil
}
// IncrementTries increments the number of tries for a job
func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTries int) error {
_, err := q.client.GeneralQueue.UpdateOneID(jobID).
SetNumberOfTries(currentTries + 1).
SetUpdatedAt(time.Now()).
Save(ctx)
if err != nil {
return fmt.Errorf("failed to increment tries: %w", err)
}
return nil
}
func (q *GeneralQueue) Resume(ctx context.Context) error {
// Check if queue should auto-start
if isRunning, err := q.IsRunning(ctx); err != nil {
Error("Failed to check ", q.Name, " queue state:", err)
} else if isRunning {
Debug("restarting ", q.Name, " queue")
err := q.Start(ctx)
if err != nil {
Error("Failed to restart ", q.Name, " queue:", err)
return err
} else {
Debug(q.Name, " queue auto-started")
}
} else {
Debug(q.Name, " queue not running")
}
return nil
}