Compare commits
10 Commits
d3c1cad6f2
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| af2c70bec6 | |||
| 1d730a4c5d | |||
| 8a3d041e3f | |||
| d4008e3655 | |||
| d4714c4f27 | |||
| 12d22efc74 | |||
| a383129abb | |||
| f49871cc1c | |||
| 9d0733bd96 | |||
| 2aabad6f07 |
@@ -5,12 +5,13 @@ import (
|
|||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
. "git.gorlug.de/code/ersteller"
|
|
||||||
"git.gorlug.de/code/ersteller/authentication"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
. "git.gorlug.de/code/ersteller"
|
||||||
|
"git.gorlug.de/code/ersteller/authentication"
|
||||||
|
|
||||||
"github.com/gorilla/sessions"
|
"github.com/gorilla/sessions"
|
||||||
|
|
||||||
"golang.org/x/oauth2"
|
"golang.org/x/oauth2"
|
||||||
@@ -39,6 +40,7 @@ type GoogleAuth struct {
|
|||||||
type Database interface {
|
type Database interface {
|
||||||
GetUserIdByEmail(ctx context.Context, email string) (int, error)
|
GetUserIdByEmail(ctx context.Context, email string) (int, error)
|
||||||
CreateUser(ctx context.Context, email string) (int, error)
|
CreateUser(ctx context.Context, email string) (int, error)
|
||||||
|
SaveCredentials(ctx context.Context, userId int, token *oauth2.Token) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type Environment struct {
|
type Environment struct {
|
||||||
@@ -48,13 +50,34 @@ type Environment struct {
|
|||||||
IsLocal bool
|
IsLocal bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGoogleAuth(db Database, server *http.ServeMux, environment Environment, sessionStore *sessions.CookieStore) *GoogleAuth {
|
type GoogleAuthParams struct {
|
||||||
|
AdditionalScopes []string
|
||||||
|
}
|
||||||
|
|
||||||
|
type GoogleAuthOption func(p *GoogleAuthParams)
|
||||||
|
|
||||||
|
func WithAdditionalScopes(scopes []string) GoogleAuthOption {
|
||||||
|
return func(p *GoogleAuthParams) {
|
||||||
|
p.AdditionalScopes = scopes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGoogleAuth(db Database, server *http.ServeMux, environment Environment, sessionStore *sessions.CookieStore,
|
||||||
|
options ...GoogleAuthOption) *GoogleAuth {
|
||||||
|
|
||||||
|
params := GoogleAuthParams{}
|
||||||
|
for _, option := range options {
|
||||||
|
option(¶ms)
|
||||||
|
}
|
||||||
|
scopes := []string{"https://www.googleapis.com/auth/userinfo.email"}
|
||||||
|
scopes = append(scopes, params.AdditionalScopes...)
|
||||||
|
|
||||||
config := oauth2.Config{
|
config := oauth2.Config{
|
||||||
ClientID: environment.ClientId,
|
ClientID: environment.ClientId,
|
||||||
ClientSecret: environment.ClientSecret,
|
ClientSecret: environment.ClientSecret,
|
||||||
Endpoint: google.Endpoint,
|
Endpoint: google.Endpoint,
|
||||||
RedirectURL: environment.BaseUrl + GoogleLoginCallback,
|
RedirectURL: environment.BaseUrl + GoogleLoginCallback,
|
||||||
Scopes: []string{"https://www.googleapis.com/auth/userinfo.email"},
|
Scopes: scopes,
|
||||||
}
|
}
|
||||||
return &GoogleAuth{
|
return &GoogleAuth{
|
||||||
db: db,
|
db: db,
|
||||||
@@ -180,7 +203,10 @@ func (g *GoogleAuth) getUserDataFromGoogle(code string) (GoogleUserData, error)
|
|||||||
|
|
||||||
func (g *GoogleAuth) SaveCredentials(userId int, token *oauth2.Token) error {
|
func (g *GoogleAuth) SaveCredentials(userId int, token *oauth2.Token) error {
|
||||||
Debug("saving google credentials for user ", userId)
|
Debug("saving google credentials for user ", userId)
|
||||||
// For now, we'll just log this - in a real implementation you'd save to database
|
err := g.db.SaveCredentials(context.Background(), userId, token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
Debug("saved credentials for user", userId)
|
Debug("saved credentials for user", userId)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
+1
-1
@@ -22,7 +22,7 @@ func StructToMap(data interface{}) (JSONB, error) {
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func MapToStruct(data JSONB, result interface{}) error {
|
func MapToStruct(data JSONB, result any) error {
|
||||||
// Marshal map to JSON bytes
|
// Marshal map to JSON bytes
|
||||||
jsonBytes, err := json.Marshal(data)
|
jsonBytes, err := json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
+136
-1
@@ -33,11 +33,71 @@ type GeneralQueueJob struct {
|
|||||||
ResultPayload map[string]interface{}
|
ResultPayload map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (j GeneralQueueJob) LoadPayloadStruct(target any) error {
|
||||||
|
return MapToStruct(j.Payload, target)
|
||||||
|
}
|
||||||
|
|
||||||
type GeneralQueueHandlerResult struct {
|
type GeneralQueueHandlerResult struct {
|
||||||
ResultPayload map[string]interface{}
|
ResultPayload map[string]interface{}
|
||||||
FailurePayload map[string]interface{}
|
FailurePayload map[string]interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewSuccessGeneralQueueHandlerResult(result any) (GeneralQueueHandlerResult, error) {
|
||||||
|
resultMap, err := StructToMap(result)
|
||||||
|
if err != nil {
|
||||||
|
return returnStructToMapError(err), err
|
||||||
|
}
|
||||||
|
return GeneralQueueHandlerResult{
|
||||||
|
ResultPayload: resultMap,
|
||||||
|
FailurePayload: nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const structToMapErrorKey = "structToMapError"
|
||||||
|
|
||||||
|
func returnStructToMapError(err error) GeneralQueueHandlerResult {
|
||||||
|
return GeneralQueueHandlerResult{
|
||||||
|
FailurePayload: map[string]interface{}{
|
||||||
|
structToMapErrorKey: err.Error(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func FailureGeneralQueueHandlerResult(message string, err error) GeneralQueueHandlerResult {
|
||||||
|
return GeneralQueueHandlerResult{
|
||||||
|
FailurePayload: map[string]interface{}{
|
||||||
|
"message": message,
|
||||||
|
"error": err.Error(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFailureGeneralQueueHandlerResult(failure any) (GeneralQueueHandlerResult, error) {
|
||||||
|
failureMap, err := StructToMap(failure)
|
||||||
|
if err != nil {
|
||||||
|
return returnStructToMapError(err), err
|
||||||
|
}
|
||||||
|
return GeneralQueueHandlerResult{
|
||||||
|
ResultPayload: nil,
|
||||||
|
FailurePayload: failureMap,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGeneralQueueHandlerResult(result any, failure any) (GeneralQueueHandlerResult, error) {
|
||||||
|
resultMap, err := StructToMap(result)
|
||||||
|
if err != nil {
|
||||||
|
return returnStructToMapError(err), err
|
||||||
|
}
|
||||||
|
failureMap, err := StructToMap(failure)
|
||||||
|
if err != nil {
|
||||||
|
return returnStructToMapError(err), err
|
||||||
|
}
|
||||||
|
return GeneralQueueHandlerResult{
|
||||||
|
ResultPayload: resultMap,
|
||||||
|
FailurePayload: failureMap,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
|
type GeneralQueueHandler func(ctx context.Context, job GeneralQueueJob) (GeneralQueueHandlerResult, error)
|
||||||
|
|
||||||
type GeneralQueueParams struct {
|
type GeneralQueueParams struct {
|
||||||
@@ -417,7 +477,7 @@ func WithWorkflowId(id string) EnqueueOption {
|
|||||||
|
|
||||||
// Enqueue adds a new job to the queue
|
// Enqueue adds a new job to the queue
|
||||||
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) {
|
func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) {
|
||||||
Debug("Enqueueing job to queue:", q.Name, "payload", payload)
|
Debug("Enqueueing job to queue:", q.Name)
|
||||||
params := EnqueueParams{}
|
params := EnqueueParams{}
|
||||||
for _, option := range options {
|
for _, option := range options {
|
||||||
option(¶ms)
|
option(¶ms)
|
||||||
@@ -515,6 +575,81 @@ func (q *GeneralQueue) IncrementTries(ctx context.Context, jobID int, currentTri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ResumeWorkflow resumes a specific workflow's execution in this queue by
|
||||||
|
// setting all InProgress jobs back to Pending, allowing them to be picked up again.
|
||||||
|
// After resetting, if the queue is marked running, it will be started.
|
||||||
|
func (q *GeneralQueue) ResumeWorkflow(ctx context.Context, workflowId string) error {
|
||||||
|
// Reset in-progress jobs back to pending for the specific workflow
|
||||||
|
inProgressCount, err := q.client.GeneralQueue.Update().
|
||||||
|
Where(
|
||||||
|
generalqueue.NameEQ(q.Name),
|
||||||
|
generalqueue.WorkflowIDEQ(workflowId),
|
||||||
|
generalqueue.StatusEQ(generalqueue.StatusInProgress),
|
||||||
|
).
|
||||||
|
SetStatus(generalqueue.StatusPending).
|
||||||
|
SetUpdatedAt(time.Now()).
|
||||||
|
Save(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to reset in_progress jobs to pending for queue '%s': %w", q.Name, err)
|
||||||
|
}
|
||||||
|
if inProgressCount > 0 {
|
||||||
|
Debug("Reset ", inProgressCount, " in_progress jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the queue again if it is configured as running
|
||||||
|
if isRunning, err := q.IsRunning(ctx); err != nil {
|
||||||
|
Error("Failed to check ", q.Name, " queue state:", err)
|
||||||
|
} else if isRunning {
|
||||||
|
Debug("restarting ", q.Name, " queue")
|
||||||
|
if err := q.Start(ctx); err != nil {
|
||||||
|
Error("Failed to restart ", q.Name, " queue:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Debug(q.Name, " queue auto-started")
|
||||||
|
} else {
|
||||||
|
Debug(q.Name, " queue not running")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetryFailedWorkflow retries failed jobs for a specific workflow in this queue by
|
||||||
|
// setting all Failed jobs back to Pending and resetting their NumberOfTries to 0.
|
||||||
|
// After resetting, if the queue is marked running, it will be started.
|
||||||
|
func (q *GeneralQueue) RetryFailedWorkflow(ctx context.Context, workflowId string) error {
|
||||||
|
// Reset failed jobs back to pending and reset their try counter for the specific workflow
|
||||||
|
failedCount, err := q.client.GeneralQueue.Update().
|
||||||
|
Where(
|
||||||
|
generalqueue.NameEQ(q.Name),
|
||||||
|
generalqueue.WorkflowIDEQ(workflowId),
|
||||||
|
generalqueue.StatusEQ(generalqueue.StatusFailed),
|
||||||
|
).
|
||||||
|
SetStatus(generalqueue.StatusPending).
|
||||||
|
SetNumberOfTries(0).
|
||||||
|
SetUpdatedAt(time.Now()).
|
||||||
|
Save(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to reset failed jobs to pending for queue '%s': %w", q.Name, err)
|
||||||
|
}
|
||||||
|
if failedCount > 0 {
|
||||||
|
Debug("Reset ", failedCount, " failed jobs to pending for ", q.Name, " queue (workflow:", workflowId, ")")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the queue again if it is configured as running
|
||||||
|
if isRunning, err := q.IsRunning(ctx); err != nil {
|
||||||
|
Error("Failed to check ", q.Name, " queue state:", err)
|
||||||
|
} else if isRunning {
|
||||||
|
Debug("restarting ", q.Name, " queue")
|
||||||
|
if err := q.Start(ctx); err != nil {
|
||||||
|
Error("Failed to restart ", q.Name, " queue:", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Debug(q.Name, " queue auto-started")
|
||||||
|
} else {
|
||||||
|
Debug(q.Name, " queue not running")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (q *GeneralQueue) Resume(ctx context.Context) error {
|
func (q *GeneralQueue) Resume(ctx context.Context) error {
|
||||||
// Reset all in_progress jobs back to pending
|
// Reset all in_progress jobs back to pending
|
||||||
count, err := q.client.GeneralQueue.Update().
|
count, err := q.client.GeneralQueue.Update().
|
||||||
|
|||||||
@@ -2,10 +2,14 @@ package google
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.gorlug.de/code/ersteller"
|
"git.gorlug.de/code/ersteller"
|
||||||
google_http "git.gorlug.de/code/ersteller/authentication/google/http"
|
google_http "git.gorlug.de/code/ersteller/authentication/google/http"
|
||||||
"git.gorlug.de/code/ersteller/starter/ent"
|
"git.gorlug.de/code/ersteller/starter/ent"
|
||||||
|
"git.gorlug.de/code/ersteller/starter/ent/googleauth"
|
||||||
|
"git.gorlug.de/code/ersteller/starter/ent/schema"
|
||||||
"git.gorlug.de/code/ersteller/starter/ent/user"
|
"git.gorlug.de/code/ersteller/starter/ent/user"
|
||||||
|
"golang.org/x/oauth2"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Database struct {
|
type Database struct {
|
||||||
@@ -30,3 +34,19 @@ func (d *Database) CreateUser(ctx context.Context, email string) (int, error) {
|
|||||||
}
|
}
|
||||||
return newUser.ID, nil
|
return newUser.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Database) SaveCredentials(ctx context.Context, userId int, token *oauth2.Token) error {
|
||||||
|
existing, err := d.db.GoogleAuth.Query().Where(googleauth.HasUserWith(user.ID(userId))).Only(ctx)
|
||||||
|
if err == nil {
|
||||||
|
_, err = existing.Update().SetCredentials(schema.Credentials{Token: *token}).Save(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !ent.IsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
_, err = d.db.GoogleAuth.Create().
|
||||||
|
SetUserID(userId).
|
||||||
|
SetCredentials(schema.Credentials{Token: *token}).
|
||||||
|
Save(ctx)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
+10
@@ -3,6 +3,7 @@ package ersteller
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
mathRand "math/rand"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
"text/template"
|
"text/template"
|
||||||
@@ -35,3 +36,12 @@ func InlineTemplate(templateString string, data any) string {
|
|||||||
}
|
}
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RandomString(n int) string {
|
||||||
|
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||||
|
b := make([]byte, n)
|
||||||
|
for i := range b {
|
||||||
|
b[i] = letters[int(mathRand.Int63()%int64(len(letters)))]
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
||||||
|
|||||||
+55
-3
@@ -2,6 +2,7 @@ package workflow
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -53,7 +54,7 @@ func (s *Step) AddNextStep(step *Step) {
|
|||||||
const workflowIdParam = "workflowId"
|
const workflowIdParam = "workflowId"
|
||||||
|
|
||||||
func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
|
func (s *Step) Execute(ctx context.Context, payload map[string]any) error {
|
||||||
ersteller.Debug("Executing step ", s.Name, "payload", payload)
|
ersteller.Debug("Executing step ", s.Name)
|
||||||
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
|
_, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string)))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -70,7 +71,7 @@ func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queu
|
|||||||
}
|
}
|
||||||
if nextStep != nil {
|
if nextStep != nil {
|
||||||
result.ResultPayload[workflowIdParam] = workflowId
|
result.ResultPayload[workflowIdParam] = workflowId
|
||||||
ersteller.Debug("Moving to next step ", nextStep.Name, "payload", result.ResultPayload)
|
ersteller.Debug("Moving to next step ", nextStep.Name)
|
||||||
err = nextStep.Execute(context.Background(), result.ResultPayload)
|
err = nextStep.Execute(context.Background(), result.ResultPayload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
failurePayload := result.FailurePayload
|
failurePayload := result.FailurePayload
|
||||||
@@ -111,7 +112,58 @@ func (w *Workflow) Execute(ctx context.Context, payload map[string]any) error {
|
|||||||
|
|
||||||
func (w *Workflow) GenerateId() string {
|
func (w *Workflow) GenerateId() string {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
return w.Identifier + "_" + now.Format("20060102150405")
|
return w.Identifier + "_" + now.Format("20060102150405") + "_" + ersteller.RandomString(5)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resume restarts a specific workflow execution by its workflowId by
|
||||||
|
// setting any in-progress jobs of all steps back to pending, then starts
|
||||||
|
// their queues again if they are running.
|
||||||
|
func (w *Workflow) Resume(ctx context.Context, workflowId string) error {
|
||||||
|
var allErr error
|
||||||
|
for _, step := range w.AllSteps {
|
||||||
|
if step.Queue == nil {
|
||||||
|
// Safety: ensure queues are initialized
|
||||||
|
step.initQueue()
|
||||||
|
}
|
||||||
|
if err := step.Queue.ResumeWorkflow(ctx, workflowId); err != nil {
|
||||||
|
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResumeAll restarts all executions of this workflow across all steps.
|
||||||
|
// For each step it resets any in-progress jobs to pending, then starts the
|
||||||
|
// queue again if it is configured as running.
|
||||||
|
func (w *Workflow) ResumeAll(ctx context.Context) error {
|
||||||
|
var allErr error
|
||||||
|
for _, step := range w.AllSteps {
|
||||||
|
if step.Queue == nil {
|
||||||
|
// Safety: ensure queues are initialized
|
||||||
|
step.initQueue()
|
||||||
|
}
|
||||||
|
if err := step.Queue.Resume(ctx); err != nil {
|
||||||
|
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetryFailed restarts only failed jobs of a specific workflow execution by its
|
||||||
|
// workflowId across all steps by resetting them to pending and their try count
|
||||||
|
// to 0, then starts their queues again if they are running.
|
||||||
|
func (w *Workflow) RetryFailed(ctx context.Context, workflowId string) error {
|
||||||
|
var allErr error
|
||||||
|
for _, step := range w.AllSteps {
|
||||||
|
if step.Queue == nil {
|
||||||
|
// Safety: ensure queues are initialized
|
||||||
|
step.initQueue()
|
||||||
|
}
|
||||||
|
if err := step.Queue.RetryFailedWorkflow(ctx, workflowId); err != nil {
|
||||||
|
allErr = errors.Join(allErr, fmt.Errorf("step %s: %w", step.Name, err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
|
func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) {
|
||||||
|
|||||||
Reference in New Issue
Block a user