Compare commits

...

10 Commits

Author SHA1 Message Date
Achim Rohn af2c70bec6 Remove payload debug 2026-04-08 23:31:41 +02:00
Achim Rohn 1d730a4c5d Remove payload debug 2026-04-08 17:14:32 +02:00
Achim Rohn 8a3d041e3f Do not log out the payload 2026-04-08 17:03:38 +02:00
Achim Rohn d4008e3655 Add methods to retry failed workflows and jobs across all steps and queues 2026-04-08 15:41:22 +02:00
Achim Rohn d4714c4f27 Add method to resume all workflow steps 2026-04-08 15:39:08 +02:00
Achim Rohn 12d22efc74 Resume workflow by id 2026-04-08 15:38:07 +02:00
Achim Rohn a383129abb Add utility methods for handling queue handler results and payload mapping 2026-04-08 14:50:45 +02:00
Achim Rohn f49871cc1c Add random string to the workflow id 2026-04-06 18:34:31 +02:00
Achim Rohn 9d0733bd96 Add additional scopes option 2026-04-06 16:45:10 +02:00
Achim Rohn 2aabad6f07 Save credentials for google in the database 2026-04-06 16:10:43 +02:00
6 changed files with 253 additions and 10 deletions
+31 -5
View File
@@ -5,12 +5,13 @@ import (
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
. "git.gorlug.de/code/ersteller"
"git.gorlug.de/code/ersteller/authentication"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"time" "time"
. "git.gorlug.de/code/ersteller"
"git.gorlug.de/code/ersteller/authentication"
"github.com/gorilla/sessions" "github.com/gorilla/sessions"
"golang.org/x/oauth2" "golang.org/x/oauth2"
@@ -39,6 +40,7 @@ type GoogleAuth struct {
type Database interface { type Database interface {
GetUserIdByEmail(ctx context.Context, email string) (int, error) GetUserIdByEmail(ctx context.Context, email string) (int, error)
CreateUser(ctx context.Context, email string) (int, error) CreateUser(ctx context.Context, email string) (int, error)
SaveCredentials(ctx context.Context, userId int, token *oauth2.Token) error
} }
type Environment struct { type Environment struct {
@@ -48,13 +50,34 @@ type Environment struct {
IsLocal bool IsLocal bool
} }
func NewGoogleAuth(db Database, server *http.ServeMux, environment Environment, sessionStore *sessions.CookieStore) *GoogleAuth { type GoogleAuthParams struct {
AdditionalScopes []string
}
type GoogleAuthOption func(p *GoogleAuthParams)
func WithAdditionalScopes(scopes []string) GoogleAuthOption {
return func(p *GoogleAuthParams) {
p.AdditionalScopes = scopes
}
}
func NewGoogleAuth(db Database, server *http.ServeMux, environment Environment, sessionStore *sessions.CookieStore,
options ...GoogleAuthOption) *GoogleAuth {
params := GoogleAuthParams{}
for _, option := range options {
option(&params)
}
scopes := []string{"https://www.googleapis.com/auth/userinfo.email"}
scopes = append(scopes, params.AdditionalScopes...)
config := oauth2.Config{ config := oauth2.Config{
ClientID: environment.ClientId, ClientID: environment.ClientId,
ClientSecret: environment.ClientSecret, ClientSecret: environment.ClientSecret,
Endpoint: google.Endpoint, Endpoint: google.Endpoint,
RedirectURL: environment.BaseUrl + GoogleLoginCallback, RedirectURL: environment.BaseUrl + GoogleLoginCallback,
Scopes: []string{"https://www.googleapis.com/auth/userinfo.email"}, Scopes: scopes,
} }
return &GoogleAuth{ return &GoogleAuth{
db: db, db: db,
@@ -180,7 +203,10 @@ func (g *GoogleAuth) getUserDataFromGoogle(code string) (GoogleUserData, error)
func (g *GoogleAuth) SaveCredentials(userId int, token *oauth2.Token) error { func (g *GoogleAuth) SaveCredentials(userId int, token *oauth2.Token) error {
Debug("saving google credentials for user ", userId) Debug("saving google credentials for user ", userId)
// For now, we'll just log this - in a real implementation you'd save to database err := g.db.SaveCredentials(context.Background(), userId, token)
if err != nil {
return err
}
Debug("saved credentials for user", userId) Debug("saved credentials for user", userId)
return nil return nil
} }
+1 -1
View File
@@ -22,7 +22,7 @@ func StructToMap(data interface{}) (JSONB, error) {
return result, nil return result, nil
} }
func MapToStruct(data JSONB, result interface{}) error { func MapToStruct(data JSONB, result any) error {
// Marshal map to JSON bytes // Marshal map to JSON bytes
jsonBytes, err := json.Marshal(data) jsonBytes, err := json.Marshal(data)
if err != nil { if err != nil {
+136 -1
View File
@@ -33,11 +33,71 @@ type GeneralQueueJob struct {
ResultPayload map[string]interface{} ResultPayload map[string]interface{}
} }
func (j GeneralQueueJob) LoadPayloadStruct(target any) error {
return MapToStruct(j.Payload, target)
}
type GeneralQueueHandlerResult struct { type GeneralQueueHandlerResult struct {
ResultPayload map[string]interface{} ResultPayload map[string]interface{}
FailurePayload map[string]interface{} FailurePayload map[string]interface{}
} }
func NewSuccessGeneralQueueHandlerResult(result any) (GeneralQueueHandlerResult, error) {
resultMap, err := StructToMap(result)
if err != nil {
return returnStructToMapError(err), err
}
return GeneralQueueHandlerResult{
ResultPayload: resultMap,
FailurePayload: nil,
}, nil
}
const structToMapErrorKey = "structToMapError"
func returnStructToMapError(err error) GeneralQueueHandlerResult {
return GeneralQueueHandlerResult{
FailurePayload: map[string]interface{}{
structToMapErrorKey: err.Error(),
},
}
}
func FailureGeneralQueueHandlerResult(message string, err error) GeneralQueueHandlerResult {
return GeneralQueueHandlerResult{
FailurePayload: map[string]interface{}{
"message": message,
"error": err.Error(),
},
}
}
func NewFailureGeneralQueueHandlerResult(failure any) (GeneralQueueHandlerResult, error) {
failureMap, err := StructToMap(failure)
if err != nil {
return returnStructToMapError(err), err
}
return GeneralQueueHandlerResult{
ResultPayload: nil,
FailurePayload: failureMap,
}, nil
}
func NewGeneralQueueHandlerResult(result any, failure any) (GeneralQueueHandlerResult, error) {
resultMap, err := StructToMap(result)
if err != nil {
return returnStructToMapError(err), err
}
failureMap, err := StructToMap(failure)
if err != nil {
return returnStructToMapError(err), err
}
return GeneralQueueHandlerResult{
ResultPayload: resultMap,
FailurePayload: failureMap,
}, nil
}
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error) type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
type GeneralQueueParams struct { type GeneralQueueParams struct {
@@ -417,7 +477,7 @@ func WithWorkflowId(id string) EnqueueOption {
// Enqueue adds a new job to the queue // Enqueue adds a new job to the queue
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) { func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) {
Debug("Enqueueing job to queue:", q.Name, "payload", payload) Debug("Enqueueing job to queue:", q.Name)
params := EnqueueParams{} params := EnqueueParams{}
for _, option := range options { for _, option := range options {
option(&params) option(&params)
@@ -515,6 +575,81 @@ func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTri
return nil return nil
} }
// ResumeWorkflow resumes a specific workflow's execution in this queue by
// setting all InProgress jobs back to Pending, allowing them to be picked up again.
// After resetting, if the queue is marked running, it will be started.
func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) error {
// Reset in-progress jobs back to pending for the specific workflow
inProgressCount, err := q.client.GeneralQueue.Update().
Where(
generalqueue.NameEQ(q.Name),
generalqueue.WorkflowIDEQ(workflowId),
generalqueue.StatusEQ(generalqueue.StatusInProgress),
).
SetStatus(generalqueue.StatusPending).
SetUpdatedAt(time.Now()).
Save(ctx)
if err != nil {
return fmt.Errorf("failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err)
}
if inProgressCount > 0 {
Debug("Reset ", inProgressCount, " in_progress jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")")
}
// Start the queue again if it is configured as running
if isRunning, err := q.IsRunning(ctx); err != nil {
Error("Failed to check ", q.Name, " queue state:", err)
} else if isRunning {
Debug("restarting ", q.Name, " queue")
if err := q.Start(ctx); err != nil {
Error("Failed to restart ", q.Name, " queue:", err)
return err
}
Debug(q.Name, " queue auto-started")
} else {
Debug(q.Name, " queue not running")
}
return nil
}
// RetryFailedWorkflow retries failed jobs for a specific workflow in this queue by
// setting all Failed jobs back to Pending and resetting their NumberOfTries to 0.
// After resetting, if the queue is marked running, it will be started.
func (q *GeneralQueue) RetryFailedWorkflow(ctx context.Context, workflowId string) error {
// Reset failed jobs back to pending and reset their try counter for the specific workflow
failedCount, err := q.client.GeneralQueue.Update().
Where(
generalqueue.NameEQ(q.Name),
generalqueue.WorkflowIDEQ(workflowId),
generalqueue.StatusEQ(generalqueue.StatusFailed),
).
SetStatus(generalqueue.StatusPending).
SetNumberOfTries(0).
SetUpdatedAt(time.Now()).
Save(ctx)
if err != nil {
return fmt.Errorf("failed to reset failed jobs to pending for queue '%s': %w", q.Name, err)
}
if failedCount > 0 {
Debug("Reset ", failedCount, " failed jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")")
}
// Start the queue again if it is configured as running
if isRunning, err := q.IsRunning(ctx); err != nil {
Error("Failed to check ", q.Name, " queue state:", err)
} else if isRunning {
Debug("restarting ", q.Name, " queue")
if err := q.Start(ctx); err != nil {
Error("Failed to restart ", q.Name, " queue:", err)
return err
}
Debug(q.Name, " queue auto-started")
} else {
Debug(q.Name, " queue not running")
}
return nil
}
func (q *GeneralQueue) Resume(ctx context.Context) error { func (q *GeneralQueue) Resume(ctx context.Context) error {
// Reset all in_progress jobs back to pending // Reset all in_progress jobs back to pending
count, err := q.client.GeneralQueue.Update(). count, err := q.client.GeneralQueue.Update().
+20
View File
@@ -2,10 +2,14 @@ package google
import ( import (
"context" "context"
"git.gorlug.de/code/ersteller" "git.gorlug.de/code/ersteller"
google_http "git.gorlug.de/code/ersteller/authentication/google/http" google_http "git.gorlug.de/code/ersteller/authentication/google/http"
"git.gorlug.de/code/ersteller/starter/ent" "git.gorlug.de/code/ersteller/starter/ent"
"git.gorlug.de/code/ersteller/starter/ent/googleauth"
"git.gorlug.de/code/ersteller/starter/ent/schema"
"git.gorlug.de/code/ersteller/starter/ent/user" "git.gorlug.de/code/ersteller/starter/ent/user"
"golang.org/x/oauth2"
) )
type Database struct { type Database struct {
@@ -30,3 +34,19 @@ func (d *Database) CreateUser(ctx context.Context, email string) (int, error) {
} }
return newUser.ID, nil return newUser.ID, nil
} }
func (d *Database) SaveCredentials(ctx context.Context, userId int, token *oauth2.Token) error {
existing, err := d.db.GoogleAuth.Query().Where(googleauth.HasUserWith(user.ID(userId))).Only(ctx)
if err == nil {
_, err = existing.Update().SetCredentials(schema.Credentials{Token: *token}).Save(ctx)
return err
}
if !ent.IsNotFound(err) {
return err
}
_, err = d.db.GoogleAuth.Create().
SetUserID(userId).
SetCredentials(schema.Credentials{Token: *token}).
Save(ctx)
return err
}
+10
View File
@@ -3,6 +3,7 @@ package ersteller
import ( import (
"bytes" "bytes"
"fmt" "fmt"
mathRand "math/rand"
"regexp" "regexp"
"strings" "strings"
"text/template" "text/template"
@@ -35,3 +36,12 @@ func InlineTemplate(templateString string, data any) string {
} }
return buf.String() return buf.String()
} }
func RandomString(n int) string {
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letters[int(mathRand.Int63()%int64(len(letters)))]
}
return string(b)
}
+55 -3
View File
@@ -2,6 +2,7 @@ package workflow
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time" "time"
@@ -53,7 +54,7 @@ func (s *Step) AddNextStep(step *Step) {
const workflowIdParam = "workflowId" const workflowIdParam = "workflowId"
func (s *Step) Execute(ctx context.Context, payload map[string]any) error { func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
ersteller.Debug("Executing step ", s.Name, "payload", payload) ersteller.Debug("Executing step ", s.Name)
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string))) _, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
if err != nil { if err != nil {
return err return err
@@ -70,7 +71,7 @@ func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queu
} }
if nextStep != nil { if nextStep != nil {
result.ResultPayload[workflowIdParam] = workflowId result.ResultPayload[workflowIdParam] = workflowId
ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload) ersteller.Debug("Moving to next step ", nextStep.Name)
err = nextStep.Execute(context.Background(), result.ResultPayload) err = nextStep.Execute(context.Background(), result.ResultPayload)
if err != nil { if err != nil {
failurePayload := result.FailurePayload failurePayload := result.FailurePayload
@@ -111,7 +112,58 @@ func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error {
func (w *Workflow) GenerateId() string { func (w *Workflow) GenerateId() string {
now := time.Now() now := time.Now()
return w.Identifier + "_" + now.Format("20060102150405") return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5)
}
// Resume restarts a specific workflow execution by its workflowId by
// setting any in-progress jobs of all steps back to pending, then starts
// their queues again if they are running.
func (w *Workflow) Resume(ctx context.Context, workflowId string) error {
var allErr error
for _, step := range w.AllSteps {
if step.Queue == nil {
// Safety: ensure queues are initialized
step.initQueue()
}
if err := step.Queue.ResumeWorkflow(ctx, workflowId); err != nil {
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
}
}
return allErr
}
// ResumeAll restarts all executions of this workflow across all steps.
// For each step it resets any in-progress jobs to pending, then starts the
// queue again if it is configured as running.
func (w *Workflow) ResumeAll(ctx context.Context) error {
var allErr error
for _, step := range w.AllSteps {
if step.Queue == nil {
// Safety: ensure queues are initialized
step.initQueue()
}
if err := step.Queue.Resume(ctx); err != nil {
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
}
}
return allErr
}
// RetryFailed restarts only failed jobs of a specific workflow execution by its
// workflowId across all steps by resetting them to pending and their try count
// to 0, then starts their queues again if they are running.
func (w *Workflow) RetryFailed(ctx context.Context, workflowId string) error {
var allErr error
for _, step := range w.AllSteps {
if step.Queue == nil {
// Safety: ensure queues are initialized
step.initQueue()
}
if err := step.Queue.RetryFailedWorkflow(ctx, workflowId); err != nil {
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
}
}
return allErr
} }
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {