Compare commits
10 Commits
d3c1cad6f2
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| af2c70bec6 | |||
| 1d730a4c5d | |||
| 8a3d041e3f | |||
| d4008e3655 | |||
| d4714c4f27 | |||
| 12d22efc74 | |||
| a383129abb | |||
| f49871cc1c | |||
| 9d0733bd96 | |||
| 2aabad6f07 |
@@ -5,12 +5,13 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
. "git.gorlug.de/code/ersteller"
|
||||
"git.gorlug.de/code/ersteller/authentication"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
. "git.gorlug.de/code/ersteller"
|
||||
"git.gorlug.de/code/ersteller/authentication"
|
||||
|
||||
"github.com/gorilla/sessions"
|
||||
|
||||
"golang.org/x/oauth2"
|
||||
@@ -39,6 +40,7 @@ type GoogleAuth struct {
|
||||
type Database interface {
|
||||
GetUserIdByEmail(ctx context.Context, email string) (int, error)
|
||||
CreateUser(ctx context.Context, email string) (int, error)
|
||||
SaveCredentials(ctx context.Context, userId int, token *oauth2.Token) error
|
||||
}
|
||||
|
||||
type Environment struct {
|
||||
@@ -48,13 +50,34 @@ type Environment struct {
|
||||
IsLocal bool
|
||||
}
|
||||
|
||||
func NewGoogleAuth(db Database, server *http.ServeMux, environment Environment, sessionStore *sessions.CookieStore) *GoogleAuth {
|
||||
type GoogleAuthParams struct {
|
||||
AdditionalScopes []string
|
||||
}
|
||||
|
||||
type GoogleAuthOption func(p *GoogleAuthParams)
|
||||
|
||||
func WithAdditionalScopes(scopes []string) GoogleAuthOption {
|
||||
return func(p *GoogleAuthParams) {
|
||||
p.AdditionalScopes = scopes
|
||||
}
|
||||
}
|
||||
|
||||
func NewGoogleAuth(db Database, server *http.ServeMux, environment Environment, sessionStore *sessions.CookieStore,
|
||||
options ...GoogleAuthOption) *GoogleAuth {
|
||||
|
||||
params := GoogleAuthParams{}
|
||||
for _, option := range options {
|
||||
option(¶ms)
|
||||
}
|
||||
scopes := []string{"https://www.googleapis.com/auth/userinfo.email"}
|
||||
scopes = append(scopes, params.AdditionalScopes...)
|
||||
|
||||
config := oauth2.Config{
|
||||
ClientID: environment.ClientId,
|
||||
ClientSecret: environment.ClientSecret,
|
||||
Endpoint: google.Endpoint,
|
||||
RedirectURL: environment.BaseUrl + GoogleLoginCallback,
|
||||
Scopes: []string{"https://www.googleapis.com/auth/userinfo.email"},
|
||||
Scopes: scopes,
|
||||
}
|
||||
return &GoogleAuth{
|
||||
db: db,
|
||||
@@ -180,7 +203,10 @@ func (g *GoogleAuth) getUserDataFromGoogle(code string) (GoogleUserData, error)
|
||||
|
||||
func (g *GoogleAuth) SaveCredentials(userId int, token *oauth2.Token) error {
|
||||
Debug("saving google credentials for user ", userId)
|
||||
// For now, we'll just log this - in a real implementation you'd save to database
|
||||
err := g.db.SaveCredentials(context.Background(), userId, token)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Debug("saved credentials for user", userId)
|
||||
return nil
|
||||
}
|
||||
|
||||
+1
-1
@@ -22,7 +22,7 @@ func StructToMap(data interface{}) (JSONB, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func MapToStruct(data JSONB, result interface{}) error {
|
||||
func MapToStruct(data JSONB, result any) error {
|
||||
// Marshal map to JSON bytes
|
||||
jsonBytes, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
|
||||
+136
-1
@@ -33,11 +33,71 @@ type GeneralQueueJob struct {
|
||||
ResultPayload map[string]interface{}
|
||||
}
|
||||
|
||||
func (j GeneralQueueJob) LoadPayloadStruct(target any) error {
|
||||
return MapToStruct(j.Payload, target)
|
||||
}
|
||||
|
||||
type GeneralQueueHandlerResult struct {
|
||||
ResultPayload map[string]interface{}
|
||||
FailurePayload map[string]interface{}
|
||||
}
|
||||
|
||||
func NewSuccessGeneralQueueHandlerResult(result any) (GeneralQueueHandlerResult, error) {
|
||||
resultMap, err := StructToMap(result)
|
||||
if err != nil {
|
||||
return returnStructToMapError(err), err
|
||||
}
|
||||
return GeneralQueueHandlerResult{
|
||||
ResultPayload: resultMap,
|
||||
FailurePayload: nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
const structToMapErrorKey = "structToMapError"
|
||||
|
||||
func returnStructToMapError(err error) GeneralQueueHandlerResult {
|
||||
return GeneralQueueHandlerResult{
|
||||
FailurePayload: map[string]interface{}{
|
||||
structToMapErrorKey: err.Error(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func FailureGeneralQueueHandlerResult(message string, err error) GeneralQueueHandlerResult {
|
||||
return GeneralQueueHandlerResult{
|
||||
FailurePayload: map[string]interface{}{
|
||||
"message": message,
|
||||
"error": err.Error(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func NewFailureGeneralQueueHandlerResult(failure any) (GeneralQueueHandlerResult, error) {
|
||||
failureMap, err := StructToMap(failure)
|
||||
if err != nil {
|
||||
return returnStructToMapError(err), err
|
||||
}
|
||||
return GeneralQueueHandlerResult{
|
||||
ResultPayload: nil,
|
||||
FailurePayload: failureMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewGeneralQueueHandlerResult(result any, failure any) (GeneralQueueHandlerResult, error) {
|
||||
resultMap, err := StructToMap(result)
|
||||
if err != nil {
|
||||
return returnStructToMapError(err), err
|
||||
}
|
||||
failureMap, err := StructToMap(failure)
|
||||
if err != nil {
|
||||
return returnStructToMapError(err), err
|
||||
}
|
||||
return GeneralQueueHandlerResult{
|
||||
ResultPayload: resultMap,
|
||||
FailurePayload: failureMap,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
|
||||
|
||||
type GeneralQueueParams struct {
|
||||
@@ -417,7 +477,7 @@ func WithWorkflowId(id string) EnqueueOption {
|
||||
|
||||
// Enqueue adds a new job to the queue
|
||||
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) {
|
||||
Debug("Enqueueing job to queue:", q.Name, "payload", payload)
|
||||
Debug("Enqueueing job to queue:", q.Name)
|
||||
params := EnqueueParams{}
|
||||
for _, option := range options {
|
||||
option(¶ms)
|
||||
@@ -515,6 +575,81 @@ func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTri
|
||||
return nil
|
||||
}
|
||||
|
||||
// ResumeWorkflow resumes a specific workflow's execution in this queue by
|
||||
// setting all InProgress jobs back to Pending, allowing them to be picked up again.
|
||||
// After resetting, if the queue is marked running, it will be started.
|
||||
func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) error {
|
||||
// Reset in-progress jobs back to pending for the specific workflow
|
||||
inProgressCount, err := q.client.GeneralQueue.Update().
|
||||
Where(
|
||||
generalqueue.NameEQ(q.Name),
|
||||
generalqueue.WorkflowIDEQ(workflowId),
|
||||
generalqueue.StatusEQ(generalqueue.StatusInProgress),
|
||||
).
|
||||
SetStatus(generalqueue.StatusPending).
|
||||
SetUpdatedAt(time.Now()).
|
||||
Save(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err)
|
||||
}
|
||||
if inProgressCount > 0 {
|
||||
Debug("Reset ", inProgressCount, " in_progress jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")")
|
||||
}
|
||||
|
||||
// Start the queue again if it is configured as running
|
||||
if isRunning, err := q.IsRunning(ctx); err != nil {
|
||||
Error("Failed to check ", q.Name, " queue state:", err)
|
||||
} else if isRunning {
|
||||
Debug("restarting ", q.Name, " queue")
|
||||
if err := q.Start(ctx); err != nil {
|
||||
Error("Failed to restart ", q.Name, " queue:", err)
|
||||
return err
|
||||
}
|
||||
Debug(q.Name, " queue auto-started")
|
||||
} else {
|
||||
Debug(q.Name, " queue not running")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RetryFailedWorkflow retries failed jobs for a specific workflow in this queue by
|
||||
// setting all Failed jobs back to Pending and resetting their NumberOfTries to 0.
|
||||
// After resetting, if the queue is marked running, it will be started.
|
||||
func (q *GeneralQueue) RetryFailedWorkflow(ctx context.Context, workflowId string) error {
|
||||
// Reset failed jobs back to pending and reset their try counter for the specific workflow
|
||||
failedCount, err := q.client.GeneralQueue.Update().
|
||||
Where(
|
||||
generalqueue.NameEQ(q.Name),
|
||||
generalqueue.WorkflowIDEQ(workflowId),
|
||||
generalqueue.StatusEQ(generalqueue.StatusFailed),
|
||||
).
|
||||
SetStatus(generalqueue.StatusPending).
|
||||
SetNumberOfTries(0).
|
||||
SetUpdatedAt(time.Now()).
|
||||
Save(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to reset failed jobs to pending for queue '%s': %w", q.Name, err)
|
||||
}
|
||||
if failedCount > 0 {
|
||||
Debug("Reset ", failedCount, " failed jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")")
|
||||
}
|
||||
|
||||
// Start the queue again if it is configured as running
|
||||
if isRunning, err := q.IsRunning(ctx); err != nil {
|
||||
Error("Failed to check ", q.Name, " queue state:", err)
|
||||
} else if isRunning {
|
||||
Debug("restarting ", q.Name, " queue")
|
||||
if err := q.Start(ctx); err != nil {
|
||||
Error("Failed to restart ", q.Name, " queue:", err)
|
||||
return err
|
||||
}
|
||||
Debug(q.Name, " queue auto-started")
|
||||
} else {
|
||||
Debug(q.Name, " queue not running")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *GeneralQueue) Resume(ctx context.Context) error {
|
||||
// Reset all in_progress jobs back to pending
|
||||
count, err := q.client.GeneralQueue.Update().
|
||||
|
||||
@@ -2,10 +2,14 @@ package google
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.gorlug.de/code/ersteller"
|
||||
google_http "git.gorlug.de/code/ersteller/authentication/google/http"
|
||||
"git.gorlug.de/code/ersteller/starter/ent"
|
||||
"git.gorlug.de/code/ersteller/starter/ent/googleauth"
|
||||
"git.gorlug.de/code/ersteller/starter/ent/schema"
|
||||
"git.gorlug.de/code/ersteller/starter/ent/user"
|
||||
"golang.org/x/oauth2"
|
||||
)
|
||||
|
||||
type Database struct {
|
||||
@@ -30,3 +34,19 @@ func (d *Database) CreateUser(ctx context.Context, email string) (int, error) {
|
||||
}
|
||||
return newUser.ID, nil
|
||||
}
|
||||
|
||||
func (d *Database) SaveCredentials(ctx context.Context, userId int, token *oauth2.Token) error {
|
||||
existing, err := d.db.GoogleAuth.Query().Where(googleauth.HasUserWith(user.ID(userId))).Only(ctx)
|
||||
if err == nil {
|
||||
_, err = existing.Update().SetCredentials(schema.Credentials{Token: *token}).Save(ctx)
|
||||
return err
|
||||
}
|
||||
if !ent.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
_, err = d.db.GoogleAuth.Create().
|
||||
SetUserID(userId).
|
||||
SetCredentials(schema.Credentials{Token: *token}).
|
||||
Save(ctx)
|
||||
return err
|
||||
}
|
||||
|
||||
+10
@@ -3,6 +3,7 @@ package ersteller
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
mathRand "math/rand"
|
||||
"regexp"
|
||||
"strings"
|
||||
"text/template"
|
||||
@@ -35,3 +36,12 @@ func InlineTemplate(templateString string, data any) string {
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func RandomString(n int) string {
|
||||
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
b := make([]byte, n)
|
||||
for i := range b {
|
||||
b[i] = letters[int(mathRand.Int63()%int64(len(letters)))]
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
+55
-3
@@ -2,6 +2,7 @@ package workflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -53,7 +54,7 @@ func (s *Step) AddNextStep(step *Step) {
|
||||
const workflowIdParam = "workflowId"
|
||||
|
||||
func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
|
||||
ersteller.Debug("Executing step ", s.Name, "payload", payload)
|
||||
ersteller.Debug("Executing step ", s.Name)
|
||||
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -70,7 +71,7 @@ func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queu
|
||||
}
|
||||
if nextStep != nil {
|
||||
result.ResultPayload[workflowIdParam] = workflowId
|
||||
ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload)
|
||||
ersteller.Debug("Moving to next step ", nextStep.Name)
|
||||
err = nextStep.Execute(context.Background(), result.ResultPayload)
|
||||
if err != nil {
|
||||
failurePayload := result.FailurePayload
|
||||
@@ -111,7 +112,58 @@ func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error {
|
||||
|
||||
func (w *Workflow) GenerateId() string {
|
||||
now := time.Now()
|
||||
return w.Identifier + "_" + now.Format("20060102150405")
|
||||
return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5)
|
||||
}
|
||||
|
||||
// Resume restarts a specific workflow execution by its workflowId by
|
||||
// setting any in-progress jobs of all steps back to pending, then starts
|
||||
// their queues again if they are running.
|
||||
func (w *Workflow) Resume(ctx context.Context, workflowId string) error {
|
||||
var allErr error
|
||||
for _, step := range w.AllSteps {
|
||||
if step.Queue == nil {
|
||||
// Safety: ensure queues are initialized
|
||||
step.initQueue()
|
||||
}
|
||||
if err := step.Queue.ResumeWorkflow(ctx, workflowId); err != nil {
|
||||
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
||||
}
|
||||
}
|
||||
return allErr
|
||||
}
|
||||
|
||||
// ResumeAll restarts all executions of this workflow across all steps.
|
||||
// For each step it resets any in-progress jobs to pending, then starts the
|
||||
// queue again if it is configured as running.
|
||||
func (w *Workflow) ResumeAll(ctx context.Context) error {
|
||||
var allErr error
|
||||
for _, step := range w.AllSteps {
|
||||
if step.Queue == nil {
|
||||
// Safety: ensure queues are initialized
|
||||
step.initQueue()
|
||||
}
|
||||
if err := step.Queue.Resume(ctx); err != nil {
|
||||
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
||||
}
|
||||
}
|
||||
return allErr
|
||||
}
|
||||
|
||||
// RetryFailed restarts only failed jobs of a specific workflow execution by its
|
||||
// workflowId across all steps by resetting them to pending and their try count
|
||||
// to 0, then starts their queues again if they are running.
|
||||
func (w *Workflow) RetryFailed(ctx context.Context, workflowId string) error {
|
||||
var allErr error
|
||||
for _, step := range w.AllSteps {
|
||||
if step.Queue == nil {
|
||||
// Safety: ensure queues are initialized
|
||||
step.initQueue()
|
||||
}
|
||||
if err := step.Queue.RetryFailedWorkflow(ctx, workflowId); err != nil {
|
||||
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
||||
}
|
||||
}
|
||||
return allErr
|
||||
}
|
||||
|
||||
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
|
||||
|
||||
Reference in New Issue
Block a user