diff --git a/queue/general_queue.go b/queue/general_queue.go index 821ffd6..5d03d24 100644 --- a/queue/general_queue.go +++ b/queue/general_queue.go @@ -360,8 +360,25 @@ func (q *GeneralQueue) claimNextPendingJob(ctx context.Context) (*ent.GeneralQue return nil, nil } +type EnqueueParams struct { + WorkflowId string +} + +type EnqueueOption func(p *EnqueueParams) + +func WithWorkflowId(id string) EnqueueOption { + return func(q *EnqueueParams) { + q.WorkflowId = id + } +} + // Enqueue adds a new job to the queue -func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int) (*ent.GeneralQueue, error) { +func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, userId int, options ...EnqueueOption) (*ent.GeneralQueue, error) { + params := EnqueueParams{} + for _, option := range options { + option(¶ms) + } + now := time.Now() payloadMap, err := StructToMap(payload) @@ -378,7 +395,8 @@ func (q *GeneralQueue) Enqueue(ctx context.Context, payload any, maxRetries int, SetMaxRetries(maxRetries). SetCreatedAt(now). SetUpdatedAt(now). - SetUserID(userId) + SetUserID(userId). + SetWorkflowID(params.WorkflowId) job, err := create.Save(ctx) diff --git a/schema/ent/generalqueue.go b/schema/ent/generalqueue.go index 68ca908..70d2485 100644 --- a/schema/ent/generalqueue.go +++ b/schema/ent/generalqueue.go @@ -41,7 +41,9 @@ type GeneralQueue struct { // ProcessedAt holds the value of the "processed_at" field. ProcessedAt time.Time `json:"processed_at,omitempty"` // UserID holds the value of the "user_id" field. - UserID int `json:"user_id,omitempty"` + UserID int `json:"user_id,omitempty"` + // WorkflowID holds the value of the "workflow_id" field. + WorkflowID string `json:"workflow_id,omitempty"` selectValues sql.SelectValues } @@ -54,7 +56,7 @@ func (*GeneralQueue) scanValues(columns []string) ([]any, error) { values[i] = new([]byte) case generalqueue.FieldID, generalqueue.FieldNumberOfTries, generalqueue.FieldMaxRetries, generalqueue.FieldUserID: values[i] = new(sql.NullInt64) - case generalqueue.FieldName, generalqueue.FieldStatus, generalqueue.FieldErrorMessage: + case generalqueue.FieldName, generalqueue.FieldStatus, generalqueue.FieldErrorMessage, generalqueue.FieldWorkflowID: values[i] = new(sql.NullString) case generalqueue.FieldCreatedAt, generalqueue.FieldUpdatedAt, generalqueue.FieldProcessedAt: values[i] = new(sql.NullTime) @@ -157,6 +159,12 @@ func (_m *GeneralQueue) assignValues(columns []string, values []any) error { } else if value.Valid { _m.UserID = int(value.Int64) } + case generalqueue.FieldWorkflowID: + if value, ok := values[i].(*sql.NullString); !ok { + return fmt.Errorf("unexpected type %T for field workflow_id", values[i]) + } else if value.Valid { + _m.WorkflowID = value.String + } default: _m.selectValues.Set(columns[i], values[i]) } @@ -228,6 +236,9 @@ func (_m *GeneralQueue) String() string { builder.WriteString(", ") builder.WriteString("user_id=") builder.WriteString(fmt.Sprintf("%v", _m.UserID)) + builder.WriteString(", ") + builder.WriteString("workflow_id=") + builder.WriteString(_m.WorkflowID) builder.WriteByte(')') return builder.String() } diff --git a/schema/ent/generalqueue/generalqueue.go b/schema/ent/generalqueue/generalqueue.go index 3c57c35..cb338b9 100644 --- a/schema/ent/generalqueue/generalqueue.go +++ b/schema/ent/generalqueue/generalqueue.go @@ -37,6 +37,8 @@ const ( FieldProcessedAt = "processed_at" // FieldUserID holds the string denoting the user_id field in the database. FieldUserID = "user_id" + // FieldWorkflowID holds the string denoting the workflow_id field in the database. + FieldWorkflowID = "workflow_id" // Table holds the table name of the generalqueue in the database. Table = "generalQueue" ) @@ -56,6 +58,7 @@ var Columns = []string{ FieldUpdatedAt, FieldProcessedAt, FieldUserID, + FieldWorkflowID, } // ValidColumn reports if the column name is valid (part of the table columns). @@ -152,3 +155,8 @@ func ByProcessedAt(opts ...sql.OrderTermOption) OrderOption { func ByUserID(opts ...sql.OrderTermOption) OrderOption { return sql.OrderByField(FieldUserID, opts...).ToFunc() } + +// ByWorkflowID orders the results by the workflow_id field. +func ByWorkflowID(opts ...sql.OrderTermOption) OrderOption { + return sql.OrderByField(FieldWorkflowID, opts...).ToFunc() +} diff --git a/schema/ent/generalqueue/where.go b/schema/ent/generalqueue/where.go index f66494e..2a2655b 100644 --- a/schema/ent/generalqueue/where.go +++ b/schema/ent/generalqueue/where.go @@ -94,6 +94,11 @@ func UserID(v int) predicate.GeneralQueue { return predicate.GeneralQueue(sql.FieldEQ(FieldUserID, v)) } +// WorkflowID applies equality check predicate on the "workflow_id" field. It's identical to WorkflowIDEQ. +func WorkflowID(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldEQ(FieldWorkflowID, v)) +} + // NameEQ applies the EQ predicate on the "name" field. func NameEQ(v string) predicate.GeneralQueue { return predicate.GeneralQueue(sql.FieldEQ(FieldName, v)) @@ -524,6 +529,81 @@ func UserIDLTE(v int) predicate.GeneralQueue { return predicate.GeneralQueue(sql.FieldLTE(FieldUserID, v)) } +// WorkflowIDEQ applies the EQ predicate on the "workflow_id" field. +func WorkflowIDEQ(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldEQ(FieldWorkflowID, v)) +} + +// WorkflowIDNEQ applies the NEQ predicate on the "workflow_id" field. +func WorkflowIDNEQ(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldNEQ(FieldWorkflowID, v)) +} + +// WorkflowIDIn applies the In predicate on the "workflow_id" field. +func WorkflowIDIn(vs ...string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldIn(FieldWorkflowID, vs...)) +} + +// WorkflowIDNotIn applies the NotIn predicate on the "workflow_id" field. +func WorkflowIDNotIn(vs ...string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldNotIn(FieldWorkflowID, vs...)) +} + +// WorkflowIDGT applies the GT predicate on the "workflow_id" field. +func WorkflowIDGT(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldGT(FieldWorkflowID, v)) +} + +// WorkflowIDGTE applies the GTE predicate on the "workflow_id" field. +func WorkflowIDGTE(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldGTE(FieldWorkflowID, v)) +} + +// WorkflowIDLT applies the LT predicate on the "workflow_id" field. +func WorkflowIDLT(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldLT(FieldWorkflowID, v)) +} + +// WorkflowIDLTE applies the LTE predicate on the "workflow_id" field. +func WorkflowIDLTE(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldLTE(FieldWorkflowID, v)) +} + +// WorkflowIDContains applies the Contains predicate on the "workflow_id" field. +func WorkflowIDContains(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldContains(FieldWorkflowID, v)) +} + +// WorkflowIDHasPrefix applies the HasPrefix predicate on the "workflow_id" field. +func WorkflowIDHasPrefix(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldHasPrefix(FieldWorkflowID, v)) +} + +// WorkflowIDHasSuffix applies the HasSuffix predicate on the "workflow_id" field. +func WorkflowIDHasSuffix(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldHasSuffix(FieldWorkflowID, v)) +} + +// WorkflowIDIsNil applies the IsNil predicate on the "workflow_id" field. +func WorkflowIDIsNil() predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldIsNull(FieldWorkflowID)) +} + +// WorkflowIDNotNil applies the NotNil predicate on the "workflow_id" field. +func WorkflowIDNotNil() predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldNotNull(FieldWorkflowID)) +} + +// WorkflowIDEqualFold applies the EqualFold predicate on the "workflow_id" field. +func WorkflowIDEqualFold(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldEqualFold(FieldWorkflowID, v)) +} + +// WorkflowIDContainsFold applies the ContainsFold predicate on the "workflow_id" field. +func WorkflowIDContainsFold(v string) predicate.GeneralQueue { + return predicate.GeneralQueue(sql.FieldContainsFold(FieldWorkflowID, v)) +} + // And groups predicates with the AND operator between them. func And(predicates ...predicate.GeneralQueue) predicate.GeneralQueue { return predicate.GeneralQueue(sql.AndPredicates(predicates...)) diff --git a/schema/ent/generalqueue_create.go b/schema/ent/generalqueue_create.go index 4cacd0a..5c8ed7f 100644 --- a/schema/ent/generalqueue_create.go +++ b/schema/ent/generalqueue_create.go @@ -124,6 +124,20 @@ func (_c *GeneralQueueCreate) SetUserID(v int) *GeneralQueueCreate { return _c } +// SetWorkflowID sets the "workflow_id" field. +func (_c *GeneralQueueCreate) SetWorkflowID(v string) *GeneralQueueCreate { + _c.mutation.SetWorkflowID(v) + return _c +} + +// SetNillableWorkflowID sets the "workflow_id" field if the given value is not nil. +func (_c *GeneralQueueCreate) SetNillableWorkflowID(v *string) *GeneralQueueCreate { + if v != nil { + _c.SetWorkflowID(*v) + } + return _c +} + // Mutation returns the GeneralQueueMutation object of the builder. func (_c *GeneralQueueCreate) Mutation() *GeneralQueueMutation { return _c.mutation @@ -274,6 +288,10 @@ func (_c *GeneralQueueCreate) createSpec() (*GeneralQueue, *sqlgraph.CreateSpec) _spec.SetField(generalqueue.FieldUserID, field.TypeInt, value) _node.UserID = value } + if value, ok := _c.mutation.WorkflowID(); ok { + _spec.SetField(generalqueue.FieldWorkflowID, field.TypeString, value) + _node.WorkflowID = value + } return _node, _spec } diff --git a/schema/ent/generalqueue_update.go b/schema/ent/generalqueue_update.go index a11bf2a..3ca4580 100644 --- a/schema/ent/generalqueue_update.go +++ b/schema/ent/generalqueue_update.go @@ -217,6 +217,26 @@ func (_u *GeneralQueueUpdate) AddUserID(v int) *GeneralQueueUpdate { return _u } +// SetWorkflowID sets the "workflow_id" field. +func (_u *GeneralQueueUpdate) SetWorkflowID(v string) *GeneralQueueUpdate { + _u.mutation.SetWorkflowID(v) + return _u +} + +// SetNillableWorkflowID sets the "workflow_id" field if the given value is not nil. +func (_u *GeneralQueueUpdate) SetNillableWorkflowID(v *string) *GeneralQueueUpdate { + if v != nil { + _u.SetWorkflowID(*v) + } + return _u +} + +// ClearWorkflowID clears the value of the "workflow_id" field. +func (_u *GeneralQueueUpdate) ClearWorkflowID() *GeneralQueueUpdate { + _u.mutation.ClearWorkflowID() + return _u +} + // Mutation returns the GeneralQueueMutation object of the builder. func (_u *GeneralQueueUpdate) Mutation() *GeneralQueueMutation { return _u.mutation @@ -328,6 +348,12 @@ func (_u *GeneralQueueUpdate) sqlSave(ctx context.Context) (_node int, err error if value, ok := _u.mutation.AddedUserID(); ok { _spec.AddField(generalqueue.FieldUserID, field.TypeInt, value) } + if value, ok := _u.mutation.WorkflowID(); ok { + _spec.SetField(generalqueue.FieldWorkflowID, field.TypeString, value) + } + if _u.mutation.WorkflowIDCleared() { + _spec.ClearField(generalqueue.FieldWorkflowID, field.TypeString) + } if _node, err = sqlgraph.UpdateNodes(ctx, _u.driver, _spec); err != nil { if _, ok := err.(*sqlgraph.NotFoundError); ok { err = &NotFoundError{generalqueue.Label} @@ -537,6 +563,26 @@ func (_u *GeneralQueueUpdateOne) AddUserID(v int) *GeneralQueueUpdateOne { return _u } +// SetWorkflowID sets the "workflow_id" field. +func (_u *GeneralQueueUpdateOne) SetWorkflowID(v string) *GeneralQueueUpdateOne { + _u.mutation.SetWorkflowID(v) + return _u +} + +// SetNillableWorkflowID sets the "workflow_id" field if the given value is not nil. +func (_u *GeneralQueueUpdateOne) SetNillableWorkflowID(v *string) *GeneralQueueUpdateOne { + if v != nil { + _u.SetWorkflowID(*v) + } + return _u +} + +// ClearWorkflowID clears the value of the "workflow_id" field. +func (_u *GeneralQueueUpdateOne) ClearWorkflowID() *GeneralQueueUpdateOne { + _u.mutation.ClearWorkflowID() + return _u +} + // Mutation returns the GeneralQueueMutation object of the builder. func (_u *GeneralQueueUpdateOne) Mutation() *GeneralQueueMutation { return _u.mutation @@ -678,6 +724,12 @@ func (_u *GeneralQueueUpdateOne) sqlSave(ctx context.Context) (_node *GeneralQue if value, ok := _u.mutation.AddedUserID(); ok { _spec.AddField(generalqueue.FieldUserID, field.TypeInt, value) } + if value, ok := _u.mutation.WorkflowID(); ok { + _spec.SetField(generalqueue.FieldWorkflowID, field.TypeString, value) + } + if _u.mutation.WorkflowIDCleared() { + _spec.ClearField(generalqueue.FieldWorkflowID, field.TypeString) + } _node = &GeneralQueue{config: _u.config} _spec.Assign = _node.assignValues _spec.ScanValues = _node.scanValues diff --git a/schema/ent/migrate/schema.go b/schema/ent/migrate/schema.go index 7c147c5..0024987 100644 --- a/schema/ent/migrate/schema.go +++ b/schema/ent/migrate/schema.go @@ -38,6 +38,7 @@ var ( {Name: "updated_at", Type: field.TypeTime}, {Name: "processed_at", Type: field.TypeTime, Nullable: true}, {Name: "user_id", Type: field.TypeInt}, + {Name: "workflow_id", Type: field.TypeString, Nullable: true}, } // GeneralQueueTable holds the schema information for the "generalQueue" table. GeneralQueueTable = &schema.Table{ diff --git a/schema/ent/mutation.go b/schema/ent/mutation.go index c62e8e3..5829ddf 100644 --- a/schema/ent/mutation.go +++ b/schema/ent/mutation.go @@ -576,6 +576,7 @@ type GeneralQueueMutation struct { processed_at *time.Time user_id *int adduser_id *int + workflow_id *string clearedFields map[string]struct{} done bool oldValue func(context.Context) (*GeneralQueue, error) @@ -1224,6 +1225,55 @@ func (m *GeneralQueueMutation) ResetUserID() { m.adduser_id = nil } +// SetWorkflowID sets the "workflow_id" field. +func (m *GeneralQueueMutation) SetWorkflowID(s string) { + m.workflow_id = &s +} + +// WorkflowID returns the value of the "workflow_id" field in the mutation. +func (m *GeneralQueueMutation) WorkflowID() (r string, exists bool) { + v := m.workflow_id + if v == nil { + return + } + return *v, true +} + +// OldWorkflowID returns the old "workflow_id" field's value of the GeneralQueue entity. +// If the GeneralQueue object wasn't provided to the builder, the object is fetched from the database. +// An error is returned if the mutation operation is not UpdateOne, or the database query fails. +func (m *GeneralQueueMutation) OldWorkflowID(ctx context.Context) (v string, err error) { + if !m.op.Is(OpUpdateOne) { + return v, errors.New("OldWorkflowID is only allowed on UpdateOne operations") + } + if m.id == nil || m.oldValue == nil { + return v, errors.New("OldWorkflowID requires an ID field in the mutation") + } + oldValue, err := m.oldValue(ctx) + if err != nil { + return v, fmt.Errorf("querying old value for OldWorkflowID: %w", err) + } + return oldValue.WorkflowID, nil +} + +// ClearWorkflowID clears the value of the "workflow_id" field. +func (m *GeneralQueueMutation) ClearWorkflowID() { + m.workflow_id = nil + m.clearedFields[generalqueue.FieldWorkflowID] = struct{}{} +} + +// WorkflowIDCleared returns if the "workflow_id" field was cleared in this mutation. +func (m *GeneralQueueMutation) WorkflowIDCleared() bool { + _, ok := m.clearedFields[generalqueue.FieldWorkflowID] + return ok +} + +// ResetWorkflowID resets all changes to the "workflow_id" field. +func (m *GeneralQueueMutation) ResetWorkflowID() { + m.workflow_id = nil + delete(m.clearedFields, generalqueue.FieldWorkflowID) +} + // Where appends a list predicates to the GeneralQueueMutation builder. func (m *GeneralQueueMutation) Where(ps ...predicate.GeneralQueue) { m.predicates = append(m.predicates, ps...) @@ -1258,7 +1308,7 @@ func (m *GeneralQueueMutation) Type() string { // order to get all numeric fields that were incremented/decremented, call // AddedFields(). func (m *GeneralQueueMutation) Fields() []string { - fields := make([]string, 0, 12) + fields := make([]string, 0, 13) if m.name != nil { fields = append(fields, generalqueue.FieldName) } @@ -1295,6 +1345,9 @@ func (m *GeneralQueueMutation) Fields() []string { if m.user_id != nil { fields = append(fields, generalqueue.FieldUserID) } + if m.workflow_id != nil { + fields = append(fields, generalqueue.FieldWorkflowID) + } return fields } @@ -1327,6 +1380,8 @@ func (m *GeneralQueueMutation) Field(name string) (ent.Value, bool) { return m.ProcessedAt() case generalqueue.FieldUserID: return m.UserID() + case generalqueue.FieldWorkflowID: + return m.WorkflowID() } return nil, false } @@ -1360,6 +1415,8 @@ func (m *GeneralQueueMutation) OldField(ctx context.Context, name string) (ent.V return m.OldProcessedAt(ctx) case generalqueue.FieldUserID: return m.OldUserID(ctx) + case generalqueue.FieldWorkflowID: + return m.OldWorkflowID(ctx) } return nil, fmt.Errorf("unknown GeneralQueue field %s", name) } @@ -1453,6 +1510,13 @@ func (m *GeneralQueueMutation) SetField(name string, value ent.Value) error { } m.SetUserID(v) return nil + case generalqueue.FieldWorkflowID: + v, ok := value.(string) + if !ok { + return fmt.Errorf("unexpected type %T for field %s", value, name) + } + m.SetWorkflowID(v) + return nil } return fmt.Errorf("unknown GeneralQueue field %s", name) } @@ -1534,6 +1598,9 @@ func (m *GeneralQueueMutation) ClearedFields() []string { if m.FieldCleared(generalqueue.FieldProcessedAt) { fields = append(fields, generalqueue.FieldProcessedAt) } + if m.FieldCleared(generalqueue.FieldWorkflowID) { + fields = append(fields, generalqueue.FieldWorkflowID) + } return fields } @@ -1560,6 +1627,9 @@ func (m *GeneralQueueMutation) ClearField(name string) error { case generalqueue.FieldProcessedAt: m.ClearProcessedAt() return nil + case generalqueue.FieldWorkflowID: + m.ClearWorkflowID() + return nil } return fmt.Errorf("unknown GeneralQueue nullable field %s", name) } @@ -1604,6 +1674,9 @@ func (m *GeneralQueueMutation) ResetField(name string) error { case generalqueue.FieldUserID: m.ResetUserID() return nil + case generalqueue.FieldWorkflowID: + m.ResetWorkflowID() + return nil } return fmt.Errorf("unknown GeneralQueue field %s", name) } diff --git a/schema/ent/schema/general_queue.go b/schema/ent/schema/general_queue.go index 73e948d..cc22f0b 100644 --- a/schema/ent/schema/general_queue.go +++ b/schema/ent/schema/general_queue.go @@ -25,6 +25,7 @@ func (GeneralQueue) Fields() []ent.Field { field.Time("updated_at"), field.Time("processed_at").Optional(), field.Int("user_id"), + field.String("workflow_id").Optional(), } } diff --git a/starter/go.mod b/starter/go.mod index 4711ecb..7fc92e9 100644 --- a/starter/go.mod +++ b/starter/go.mod @@ -13,19 +13,26 @@ require ( maragu.dev/gomponents-htmx v0.6.1 ) +replace git.gorlug.de/code/ersteller => ../ + require ( ariga.io/atlas v0.32.1-0.20250325101103-175b25e1c1b9 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect github.com/agext/levenshtein v1.2.3 // indirect github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/bmatcuk/doublestar v1.3.4 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/go-chi/chi/v5 v5.1.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/inflect v0.19.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/context v1.1.1 // indirect github.com/gorilla/mux v1.6.2 // indirect github.com/gorilla/securecookie v1.1.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect github.com/hashicorp/hcl/v2 v2.18.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect @@ -38,16 +45,27 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-sqlite3 v1.14.32 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect - github.com/rogpeppe/go-internal v1.14.1 // indirect - github.com/stretchr/testify v1.11.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/zclconf/go-cty v1.14.4 // indirect github.com/zclconf/go-cty-yaml v1.1.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect + go.opentelemetry.io/otel v1.39.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0 // indirect + go.opentelemetry.io/otel/log v0.15.0 // indirect + go.opentelemetry.io/otel/metric v1.39.0 // indirect + go.opentelemetry.io/otel/sdk v1.39.0 // indirect + go.opentelemetry.io/otel/sdk/log v0.15.0 // indirect + go.opentelemetry.io/otel/trace v1.39.0 // indirect + go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/mod v0.29.0 // indirect golang.org/x/net v0.47.0 // indirect golang.org/x/sync v0.18.0 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/text v0.31.0 // indirect golang.org/x/tools v0.38.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect + google.golang.org/grpc v1.77.0 // indirect + google.golang.org/protobuf v1.36.10 // indirect ) diff --git a/starter/go.sum b/starter/go.sum index 44db347..5e44def 100644 --- a/starter/go.sum +++ b/starter/go.sum @@ -4,8 +4,6 @@ cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdB cloud.google.com/go/compute/metadata v0.9.0/go.mod h1:E0bWwX5wTnLPedCKqk3pJmVgCBSM6qQI1yTBdEb3C10= entgo.io/ent v0.14.5 h1:Rj2WOYJtCkWyFo6a+5wB3EfBRP0rnx1fMk6gGA0UUe4= entgo.io/ent v0.14.5/go.mod h1:zTzLmWtPvGpmSwtkaayM2cm5m819NdM7z7tYPq3vN0U= -git.gorlug.de/code/ersteller v0.0.0-20251115110439-18d05566e2fc h1:oin+Mm/Z9FaR0To40MDGrFsEdK7s7FDaC9vImeyvMdg= -git.gorlug.de/code/ersteller v0.0.0-20251115110439-18d05566e2fc/go.mod h1:1pFeoQnWLWVcAfSPCq0Fwr+cJ6gqiW475B8Mt2Kuk7Y= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/agext/levenshtein v1.2.3 h1:YB2fHEn0UJagG8T1rrWknE3ZQzWM06O8AMAatNn7lmo= @@ -14,15 +12,26 @@ github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= github.com/bmatcuk/doublestar v1.3.4 h1:gPypJ5xD31uhX6Tf54sDPUOBXTqKH4c9aPY66CyQrS0= github.com/bmatcuk/doublestar v1.3.4/go.mod h1:wiQtGV+rzVYxB7WIlirSN++5HPtPlXEo9MEoZQC/PmE= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/inflect v0.19.0 h1:9jCH9scKIbHeV9m12SmPilScz6krDxKRasNNSNPXu/4= github.com/go-openapi/inflect v0.19.0/go.mod h1:lHpZVlpIQqLyKwJ4N+YSc9hchQy/i12fJykb83CRBH4= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= @@ -37,6 +46,8 @@ github.com/gorilla/securecookie v1.1.2 h1:YCIWL56dvtr73r6715mJs5ZvhtnY73hBvEF8kX github.com/gorilla/securecookie v1.1.2/go.mod h1:NfCASbcHqRSY+3a8tlWJwsQap2VX5pwzwo4h3eOamfo= github.com/gorilla/sessions v1.4.0 h1:kpIYOp/oi6MG/p5PgxApU8srsSw9tuFbt46Lt7auzqQ= github.com/gorilla/sessions v1.4.0/go.mod h1:FLWm50oby91+hl7p/wRxDth9bWSuk0qVL2emc7lT5ik= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= github.com/hashicorp/hcl/v2 v2.18.1 h1:6nxnOJFku1EuSawSD81fuviYUV8DxFr3fp2dUi3ZYSo= github.com/hashicorp/hcl/v2 v2.18.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= @@ -88,6 +99,28 @@ github.com/zclconf/go-cty v1.14.4 h1:uXXczd9QDGsgu0i/QFR/hzI5NYCHLf6NQw/atrbnhq8 github.com/zclconf/go-cty v1.14.4/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE= github.com/zclconf/go-cty-yaml v1.1.0 h1:nP+jp0qPHv2IhUVqmQSzjvqAWcObN0KBkUl2rWBdig0= github.com/zclconf/go-cty-yaml v1.1.0/go.mod h1:9YLUH4g7lOhVWqUbctnVlZ5KLpg7JAprQNgxSZ1Gyxs= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.39.0 h1:8yPrr/S0ND9QEfTfdP9V+SiwT4E0G7Y5MO7p85nis48= +go.opentelemetry.io/otel v1.39.0/go.mod h1:kLlFTywNWrFyEdH0oj2xK0bFYZtHRYUdv1NklR/tgc8= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0 h1:W+m0g+/6v3pa5PgVf2xoFMi5YtNR06WtS7ve5pcvLtM= +go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0/go.mod h1:JM31r0GGZ/GU94mX8hN4D8v6e40aFlUECSQ48HaLgHM= +go.opentelemetry.io/otel/log v0.15.0 h1:0VqVnc3MgyYd7QqNVIldC3dsLFKgazR6P3P3+ypkyDY= +go.opentelemetry.io/otel/log v0.15.0/go.mod h1:9c/G1zbyZfgu1HmQD7Qj84QMmwTp2QCQsZH1aeoWDE4= +go.opentelemetry.io/otel/metric v1.39.0 h1:d1UzonvEZriVfpNKEVmHXbdf909uGTOQjA0HF0Ls5Q0= +go.opentelemetry.io/otel/metric v1.39.0/go.mod h1:jrZSWL33sD7bBxg1xjrqyDjnuzTUB0x1nBERXd7Ftcs= +go.opentelemetry.io/otel/sdk v1.39.0 h1:nMLYcjVsvdui1B/4FRkwjzoRVsMK8uL/cj0OyhKzt18= +go.opentelemetry.io/otel/sdk v1.39.0/go.mod h1:vDojkC4/jsTJsE+kh+LXYQlbL8CgrEcwmt1ENZszdJE= +go.opentelemetry.io/otel/sdk/log v0.15.0 h1:WgMEHOUt5gjJE93yqfqJOkRflApNif84kxoHWS9VVHE= +go.opentelemetry.io/otel/sdk/log v0.15.0/go.mod h1:qDC/FlKQCXfH5hokGsNg9aUBGMJQsrUyeOiW5u+dKBQ= +go.opentelemetry.io/otel/sdk/log/logtest v0.14.0 h1:Ijbtz+JKXl8T2MngiwqBlPaHqc4YCaP/i13Qrow6gAM= +go.opentelemetry.io/otel/sdk/log/logtest v0.14.0/go.mod h1:dCU8aEL6q+L9cYTqcVOk8rM9Tp8WdnHOPLiBgp0SGOA= +go.opentelemetry.io/otel/sdk/metric v1.39.0 h1:cXMVVFVgsIf2YL6QkRF4Urbr/aMInf+2WKg+sEJTtB8= +go.opentelemetry.io/otel/sdk/metric v1.39.0/go.mod h1:xq9HEVH7qeX69/JnwEfp6fVq5wosJsY1mt4lLfYdVew= +go.opentelemetry.io/otel/trace v1.39.0 h1:2d2vfpEDmCJ5zVYz7ijaJdOF59xLomrvj7bjt6/qCJI= +go.opentelemetry.io/otel/trace v1.39.0/go.mod h1:88w4/PnZSazkGzz/w84VHpQafiU4EtqqlVdxWy+rNOA= +go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= @@ -105,6 +138,16 @@ golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 h1:fCvbg86sFXwdrl5LgVcTEvNC+2txB5mgROGmRL5mrls= +google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:+rXWjjaukWZun3mLfjmVnQi18E1AsFbDN9QdJ5YXLto= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 h1:gRkg/vSppuSQoDjxyiGfN4Upv/h/DQmIR10ZU8dh4Ww= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/starter/main.go b/starter/main.go index c397de5..03a29a3 100644 --- a/starter/main.go +++ b/starter/main.go @@ -2,13 +2,14 @@ package main import ( "context" + "log" + "net/http" + "time" + . "git.gorlug.de/code/ersteller" "git.gorlug.de/code/ersteller/starter/ent" "git.gorlug.de/code/ersteller/starter/env" "git.gorlug.de/code/ersteller/starter/routes" - "log" - "net/http" - "time" ) func main() { @@ -17,7 +18,7 @@ func main() { environment := env.LoadEnvironment() client, err := ent.Open("sqlite3", environment.DatabaseUrl, - ent.Log(log.Println), ent.Debug()) + ent.Log(log.Println)) if err != nil { log.Fatalf("failed opening connection to sqlite: %v", err) } diff --git a/workflow/workflow.go b/workflow/workflow.go new file mode 100644 index 0000000..4888ec5 --- /dev/null +++ b/workflow/workflow.go @@ -0,0 +1,120 @@ +package workflow + +import ( + "context" + "fmt" + "time" + + "git.gorlug.de/code/ersteller" + "git.gorlug.de/code/ersteller/queue" + "git.gorlug.de/code/ersteller/schema/ent" +) + +type StepHandler interface { + Handle(ctx context.Context, currentStep *Step, + job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, *Step, error) +} + +type StepParams struct { + Name string + Identifier string + Description string + Client *ent.Client + Processors int + Handler StepHandler + MaxRetries int +} + +type Step struct { + StepParams + Queue *queue.GeneralQueue + NextSteps map[string]*Step +} + +func NewStep(params *StepParams) *Step { + if params.MaxRetries == 0 { + params.MaxRetries = 3 + } + step := Step{ + StepParams: *params, + NextSteps: make(map[string]*Step), + } + step.Queue = queue.NewGeneralQueue(params.Identifier, + params.Client, step.HandleQueue, params.Processors) + return &step +} + +func (s *Step) AddNextStep(step *Step) { + s.NextSteps[step.Name] = step +} + +const workflowIdParam = "workflowId" + +func (s *Step) Execute(ctx context.Context, payload map[string]interface{}) error { + _, err := s.Queue.Enqueue(ctx, payload, s.MaxRetries, -1, queue.WithWorkflowId(payload[workflowIdParam].(string))) + if err != nil { + return err + } + err = s.Queue.Start(ctx) + return err +} + +func (s *Step) HandleQueue(ctx context.Context, job queue.GeneralQueueJob) (queue.GeneralQueueHandlerResult, error) { + result, nextStep, err := s.Handler.Handle(ctx, s, job) + if err != nil { + return result, err + } + if nextStep != nil { + err = nextStep.Execute(ctx, result.ResultPayload) + if err != nil { + failurePayload := result.FailurePayload + failurePayload["next_step_error"] = fmt.Sprint("Failed to execute next step: ", err.Error()) + return queue.GeneralQueueHandlerResult{ + ResultPayload: result.ResultPayload, + FailurePayload: failurePayload, + }, err + } + } + return result, nil +} + +type Workflow struct { + Name string + Identifier string + FirstStep *Step +} + +func NewWorkflow(name string, identifier string, firstStep *Step) *Workflow { + go func() {}() + return &Workflow{ + Name: name, + FirstStep: firstStep, + Identifier: identifier, + } +} + +func (w *Workflow) generateId() string { + now := time.Now() + return w.Identifier + "_" + now.Format("20060102150405") +} + +func NewCronTrigger(ctx context.Context, workflow *Workflow, d time.Duration) { + go func() { + ticker := time.NewTicker(d) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + err := workflow.FirstStep.Execute(ctx, map[string]interface{}{ + "workflowId": workflow.generateId(), + }) + if err != nil { + ersteller.Error("Failed to execute cron trigger for workflow '", workflow.Name, "': ", err.Error()) + } + } + } + }() +}