From 51a9a54cfd56ab46df5e87f05c381fff2ddcda65 Mon Sep 17 00:00:00 2001 From: Elio Bischof Date: Wed, 28 Jun 2023 08:19:34 +0200 Subject: [PATCH] calculate and push 4 in 6 milestones --- e2e/config/localhost/zitadel.yaml | 5 +- internal/command/milestone.go | 27 +++ internal/command/telemetry.go | 25 -- .../eventstore/handler/crdb/handler_stmt.go | 16 +- internal/eventstore/handler/crdb/statement.go | 37 ++- .../eventstore/handler/handler_projection.go | 20 +- .../{quotanotifier.go => quota_notifier.go} | 0 ...telemetrypusher.go => telemetry_pusher.go} | 91 ++++--- .../{usernotifier.go => user_notifier.go} | 0 internal/notification/projections.go | 20 +- internal/query/milestone.go | 139 +++++++++++ internal/query/projection/milestones.go | 125 +++++----- internal/query/projection/projection.go | 3 + internal/query/search_query.go | 21 ++ internal/query/user.go | 1 + internal/repository/milestone/aggregate.go | 5 +- internal/repository/milestone/events.go | 228 +++++++++++++++--- internal/repository/milestone/eventstore.go | 8 +- .../repository/milestone/milestone_string.go | 30 --- internal/repository/milestone/milestones.go | 25 -- internal/repository/pseudo/aggregate.go | 5 + internal/repository/pseudo/events.go | 65 +++++ 22 files changed, 667 insertions(+), 229 deletions(-) create mode 100644 internal/command/milestone.go delete mode 100644 internal/command/telemetry.go rename internal/notification/handlers/{quotanotifier.go => quota_notifier.go} (100%) rename internal/notification/handlers/{telemetrypusher.go => telemetry_pusher.go} (53%) rename internal/notification/handlers/{usernotifier.go => user_notifier.go} (100%) create mode 100644 internal/query/milestone.go delete mode 100644 internal/repository/milestone/milestone_string.go delete mode 100644 internal/repository/milestone/milestones.go create mode 100644 internal/repository/pseudo/aggregate.go create mode 100644 internal/repository/pseudo/events.go diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index e241c5e8a9..8fbb06624c 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -44,11 +44,12 @@ Console: InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}" Projections: + RequeueEvery: 1s Customizations: NotificationsQuotas: - RequeueEvery: 1s +# RequeueEvery: 1s Telemetry: - RequeueEvery: 1s + RequeueEvery: 10s DefaultInstance: LoginPolicy: diff --git a/internal/command/milestone.go b/internal/command/milestone.go new file mode 100644 index 0000000000..cfc7df83f0 --- /dev/null +++ b/internal/command/milestone.go @@ -0,0 +1,27 @@ +package command + +import ( + "context" + + "github.com/zitadel/zitadel/internal/repository/milestone" +) + +// MilestonePushed writes a new event with a new milestone.Aggregate to the eventstore +func (c *Commands) MilestonePushed( + ctx context.Context, + instanceID string, + eventType milestone.PushedEventType, + endpoints []string, + primaryDomain string, +) error { + id, err := c.idGenerator.Next() + if err != nil { + return err + } + pushedEvent, err := milestone.NewPushedEventByType(ctx, eventType, milestone.NewAggregate(id, instanceID, instanceID), endpoints, primaryDomain) + if err != nil { + return err + } + _, err = c.eventstore.Push(ctx, pushedEvent) + return err +} diff --git a/internal/command/telemetry.go b/internal/command/telemetry.go deleted file mode 100644 index 467c252f4c..0000000000 --- a/internal/command/telemetry.go +++ /dev/null @@ -1,25 +0,0 @@ -package command - -import ( - "context" - - "github.com/zitadel/zitadel/internal/eventstore" - - "github.com/zitadel/zitadel/internal/repository/milestone" -) - -// ReportMilestoneReached writes each *milestone.ReachedEvent directly to the event store -func (c *Commands) ReportMilestoneReached(ctx context.Context, triggeringEvent eventstore.Event, customContext interface{}) error { - aggregateId, err := c.idGenerator.Next() - if err != nil { - return err - } - _, err = c.eventstore.Push(ctx, milestone.NewReachedEvent(ctx, aggregateId, triggeringEvent, customContext)) - return err -} - -// ReportMilestonePushed defers a milestone.PushedEvent for each *milestone.ReachedEvent and writes it directly to the event store. -func (c *Commands) ReportMilestonePushed(ctx context.Context, endpoints []string, reachedEvent *milestone.ReachedEvent) error { - _, err := c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, reachedEvent, endpoints)) - return err -} diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index 8f6d9481f0..8f1f37bd26 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -5,6 +5,8 @@ import ( "database/sql" "fmt" + "github.com/zitadel/zitadel/internal/repository/pseudo" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" @@ -49,6 +51,8 @@ type StatementHandler struct { initialized chan bool bulkLimit uint64 + + subscribe bool } func NewStatementHandler( @@ -57,9 +61,16 @@ func NewStatementHandler( ) StatementHandler { aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers)) reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers)) + subscribe := true for _, aggReducer := range config.Reducers { aggregateTypes = append(aggregateTypes, aggReducer.Aggregate) + if aggReducer.Aggregate == pseudo.AggregateType { + subscribe = false + } for _, eventReducer := range aggReducer.EventRedusers { + if eventReducer.Event == pseudo.TimestampEventType { + subscribe = false + } reduces[eventReducer.Event] = eventReducer.Reduce } } @@ -80,7 +91,7 @@ func NewStatementHandler( initialized: make(chan bool), } - h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized) + h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized, subscribe) return h } @@ -88,6 +99,9 @@ func NewStatementHandler( func (h *StatementHandler) Start() { h.initialized <- true close(h.initialized) + if !h.subscribe { + return + } h.Subscribe(h.aggregates...) } diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index 8601630cdd..7d1b5cc644 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -1,6 +1,7 @@ package crdb import ( + "fmt" "strconv" "strings" @@ -285,6 +286,24 @@ func NewCopyCol(column, from string) handler.Column { } } +func NewIsNullCond(column string) handler.Condition { + return handler.Condition{ + Name: column, + Value: specialWhere(func(colName, param string) (clause string, needsParam bool) { + return fmt.Sprintf("%s IS NULL", colName), false + }), + } +} + +func NewIsNotNullCond(column string) handler.Condition { + return handler.Condition{ + Name: column, + Value: specialWhere(func(colName, param string) (clause string, needsParam bool) { + return fmt.Sprintf("%s IS NOT NULL", colName), false + }), + } +} + // NewCopyStatement creates a new upsert statement which updates a column from an existing row // cols represent the columns which are objective to change. // if the value of a col is empty the data will be copied from the selected row @@ -384,13 +403,25 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string, return names, parameters, values[:parameterIndex] } +type specialWhere func(colName, param string) (clause string, needsParam bool) + func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []string, values []interface{}) { wheres = make([]string, len(cols)) - values = make([]interface{}, len(cols)) + values = make([]interface{}, 0, len(cols)) for i, col := range cols { - wheres[i] = "(" + col.Name + " = $" + strconv.Itoa(i+1+paramOffset) + ")" - values[i] = col.Value + param := strconv.Itoa(i + 1 + paramOffset) + special, ok := col.Value.(specialWhere) + if !ok { + wheres[i] = "(" + col.Name + " = $" + param + ")" + values = append(values, col.Value) + continue + } + clause, needsValueParam := special(col.Name, param) + wheres[i] = clause + if needsValueParam { + values = append(values, col.Value) + } } return wheres, values diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index 808da1dbb8..23fe674479 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -6,6 +6,8 @@ import ( "runtime/debug" "time" + "github.com/zitadel/zitadel/internal/repository/pseudo" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" @@ -73,6 +75,7 @@ func NewProjectionHandler( lock Lock, unlock Unlock, initialized <-chan bool, + subscribe bool, ) *ProjectionHandler { concurrentInstances := int(config.ConcurrentInstances) if concurrentInstances < 1 { @@ -97,8 +100,9 @@ func NewProjectionHandler( go func() { <-initialized - go h.subscribe(ctx) - + if subscribe { + go h.subscribe(ctx) + } go h.schedule(ctx) }() @@ -112,6 +116,13 @@ func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) er if len(instances) > 0 { ids = instances } + if h.searchQuery == nil { + return h.processTimestamp(ctx, ids...) + } + return h.processEvents(ctx, ids...) +} + +func (h *ProjectionHandler) processEvents(ctx context.Context, ids ...string) error { for { events, hasLimitExceeded, err := h.FetchEvents(ctx, ids...) if err != nil { @@ -130,6 +141,11 @@ func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) er } } +func (h *ProjectionHandler) processTimestamp(ctx context.Context, instances ...string) error { + _, err := h.Process(ctx, pseudo.NewTimestampEvent(h.nowFunc(), instances...)) + return err +} + // Process handles multiple events by reducing them to statements and updating the projection func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Event) (index int, err error) { if len(events) == 0 { diff --git a/internal/notification/handlers/quotanotifier.go b/internal/notification/handlers/quota_notifier.go similarity index 100% rename from internal/notification/handlers/quotanotifier.go rename to internal/notification/handlers/quota_notifier.go diff --git a/internal/notification/handlers/telemetrypusher.go b/internal/notification/handlers/telemetry_pusher.go similarity index 53% rename from internal/notification/handlers/telemetrypusher.go rename to internal/notification/handlers/telemetry_pusher.go index dc87382fe6..43638070d8 100644 --- a/internal/notification/handlers/telemetrypusher.go +++ b/internal/notification/handlers/telemetry_pusher.go @@ -2,12 +2,20 @@ package handlers import ( "context" + "fmt" "net/http" - "github.com/zitadel/zitadel/internal/repository/milestone" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/internal/query" + + "github.com/zitadel/zitadel/internal/api/call" + + "github.com/zitadel/zitadel/internal/repository/pseudo" + + "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/command" - "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" @@ -61,34 +69,62 @@ func NewTelemetryPusher( } func (t *telemetryPusher) reducers() []handler.AggregateReducer { - return []handler.AggregateReducer{ - { - Aggregate: milestone.AggregateType, - EventRedusers: []handler.EventReducer{ - { - Event: milestone.ReachedEventType, - Reduce: t.reduceMilestoneReached, - }, - }, - }, - } + return []handler.AggregateReducer{{ + Aggregate: pseudo.AggregateType, + EventRedusers: []handler.EventReducer{{ + Event: pseudo.TimestampEventType, + Reduce: t.pushMilestones, + }}, + }} } -func (t *telemetryPusher) reduceMilestoneReached(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*milestone.ReachedEvent) +func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) { + ctx := call.WithTimestamp(context.Background()) + timestampEvent, ok := event.(pseudo.TimestampEvent) if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-UjA3E", "reduce.wrong.event.type %s", milestone.ReachedEventType) + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type()) } - ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, nil, milestone.AggregateType, milestone.PushedEventType) + + isReached, err := query.NewNotNullQuery(query.MilestoneReachedDateColID) if err != nil { return nil, err } - if alreadyHandled { - return crdb.NewNoOpStatement(e), nil + isNotPushed, err := query.NewIsNullQuery(query.MilestonePushedDateColID) + if err != nil { + return nil, err } + hasPrimaryDomain, err := query.NewNotNullQuery(query.MilestonePrimaryDomainColID) + if err != nil { + return nil, err + } + unpushedMilestones, err := t.queries.Queries.SearchMilestones(ctx, timestampEvent.InstanceIDs, &query.MilestonesSearchQueries{ + SearchRequest: query.SearchRequest{ + Offset: 100, + SortingColumn: query.MilestoneReachedDateColID, + Asc: true, + }, + Queries: []query.SearchQuery{isReached, isNotPushed, hasPrimaryDomain}, + }) + if err != nil { + return nil, err + } + var errs int + for _, ms := range unpushedMilestones.Milestones { + if err = t.pushMilestone(ctx, ms); err != nil { + errs++ + logging.Warnf("pushing milestone %+v failed: %s", *ms, err.Error()) + } + } + if errs > 0 { + return nil, fmt.Errorf("pushing %d of %d milestones failed", errs, unpushedMilestones.Count) + } + + return crdb.NewNoOpStatement(timestampEvent), nil +} + +func (t *telemetryPusher) pushMilestone(ctx context.Context, ms *query.Milestone) error { for _, endpoint := range t.endpoints { - if err = types.SendJSON( + if err := types.SendJSON( ctx, webhook.Config{ CallURL: endpoint, @@ -96,18 +132,13 @@ func (t *telemetryPusher) reduceMilestoneReached(event eventstore.Event) (*handl }, t.queries.GetFileSystemProvider, t.queries.GetLogProvider, - e, - e, + ms, + nil, t.metricSuccessfulDeliveriesJSON, t.metricFailedDeliveriesJSON, ).WithoutTemplate(); err != nil { - return nil, err + return err } } - - err = t.commands.ReportMilestonePushed(ctx, t.endpoints, e) - if err != nil { - return nil, err - } - return crdb.NewNoOpStatement(e), nil + return t.commands.MilestonePushed(ctx, ms.InstanceID, ms.MilestoneType, t.endpoints, ms.PrimaryDomain) } diff --git a/internal/notification/handlers/usernotifier.go b/internal/notification/handlers/user_notifier.go similarity index 100% rename from internal/notification/handlers/usernotifier.go rename to internal/notification/handlers/user_notifier.go diff --git a/internal/notification/projections.go b/internal/notification/projections.go index aad38e3afd..a2b06aadd4 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -76,13 +76,15 @@ func Start( metricSuccessfulDeliveriesJSON, metricFailedDeliveriesJSON, ).Start() - handlers.NewTelemetryPusher( - ctx, - telemetryCfg, - projection.ApplyCustomConfig(telemetryHandlerCustomConfig), - commands, - q, - metricSuccessfulDeliveriesJSON, - metricFailedDeliveriesJSON, - ).Start() + if telemetryCfg.Enabled { + handlers.NewTelemetryPusher( + ctx, + telemetryCfg, + projection.ApplyCustomConfig(telemetryHandlerCustomConfig), + commands, + q, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON, + ).Start() + } } diff --git a/internal/query/milestone.go b/internal/query/milestone.go new file mode 100644 index 0000000000..d224521dc9 --- /dev/null +++ b/internal/query/milestone.go @@ -0,0 +1,139 @@ +package query + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/zitadel/zitadel/internal/repository/milestone" + + "github.com/zitadel/zitadel/internal/api/authz" + + sq "github.com/Masterminds/squirrel" + + "github.com/zitadel/zitadel/internal/api/call" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/telemetry/tracing" +) + +type Milestones struct { + SearchResponse + Milestones []*Milestone +} + +type Milestone struct { + InstanceID string + MilestoneType milestone.PushedEventType + ReachedDate time.Time + PushedDate time.Time + PrimaryDomain string +} + +type MilestonesSearchQueries struct { + SearchRequest + Queries []SearchQuery +} + +func (q *MilestonesSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder { + query = q.SearchRequest.toQuery(query) + for _, q := range q.Queries { + query = q.toQuery(query) + } + return query +} + +var ( + milestonesTable = table{ + name: projection.MilestonesProjectionTable, + instanceIDCol: projection.MilestoneColumnInstanceID, + } + MilestoneInstanceIDColID = Column{ + name: projection.MilestoneColumnInstanceID, + table: milestonesTable, + } + MilestoneTypeColID = Column{ + name: projection.MilestoneColumnMilestoneType, + table: milestonesTable, + } + MilestonePrimaryDomainColID = Column{ + name: projection.MilestoneColumnPrimaryDomain, + table: milestonesTable, + } + MilestoneReachedDateColID = Column{ + name: projection.MilestoneColumnReachedDate, + table: milestonesTable, + } + MilestonePushedDateColID = Column{ + name: projection.MilestoneColumnPushedDate, + table: milestonesTable, + } +) + +// SearchMilestones tries to defer the instanceID from the passed context if no instanceIDs are passed +func (q *Queries) SearchMilestones(ctx context.Context, instanceIDs []string, queries *MilestonesSearchQueries) (_ *Milestones, err error) { + ctx, span := tracing.NewSpan(ctx) + defer func() { span.EndWithError(err) }() + query, scan := prepareMilestonesQuery(ctx, q.client) + if len(instanceIDs) == 0 { + instanceIDs = []string{authz.GetInstance(ctx).InstanceID()} + } + stmt, args, err := queries.toQuery(query). + Where(sq.Eq{ + MilestoneInstanceIDColID.identifier(): fmt.Sprintf("IN (%s)", strings.Join(instanceIDs, ",")), + }).ToSql() + if err != nil { + return nil, errors.ThrowInternal(err, "QUERY-A9i5k", "Errors.Query.SQLStatement") + } + rows, err := q.client.QueryContext(ctx, stmt, args...) + if err != nil { + return nil, err + } + milestones, err := scan(rows) + if err != nil { + return nil, err + } + milestones.LatestSequence, err = q.latestSequence(ctx, milestonesTable) + return milestones, err + +} + +func prepareMilestonesQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(*sql.Rows) (*Milestones, error)) { + return sq.Select( + MilestonePrimaryDomainColID.identifier(), + MilestoneReachedDateColID.identifier(), + MilestonePushedDateColID.identifier(), + MilestoneTypeColID.identifier(), + countColumn.identifier(), + ). + From(notificationPolicyTable.identifier() + db.Timetravel(call.Took(ctx))). + PlaceholderFormat(sq.Dollar), + func(rows *sql.Rows) (*Milestones, error) { + milestones := make([]*Milestone, 0) + var count uint64 + for rows.Next() { + m := new(Milestone) + err := rows.Scan( + &m.PrimaryDomain, + &m.ReachedDate, + &m.MilestoneType, + &count, + ) + if err != nil { + return nil, err + } + milestones = append(milestones, m) + } + if err := rows.Close(); err != nil { + return nil, errors.ThrowInternal(err, "QUERY-CK9mI", "Errors.Query.CloseRows") + } + return &Milestones{ + Milestones: milestones, + SearchResponse: SearchResponse{ + Count: count, + }, + }, nil + } +} diff --git a/internal/query/projection/milestones.go b/internal/query/projection/milestones.go index 0491db27cc..c9633844e0 100644 --- a/internal/query/projection/milestones.go +++ b/internal/query/projection/milestones.go @@ -6,9 +6,6 @@ import ( "encoding/json" "fmt" - "github.com/zitadel/zitadel/internal/repository/project" - "github.com/zitadel/zitadel/internal/repository/user" - "github.com/zitadel/zitadel/internal/repository/milestone" "github.com/zitadel/zitadel/internal/errors" @@ -16,6 +13,8 @@ import ( "github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/project" + "github.com/zitadel/zitadel/internal/repository/user" ) const ( @@ -23,25 +22,25 @@ const ( MilestoneColumnInstanceID = "instance_id" MilestoneColumnMilestoneType = "milestone_type" - MilestoneColumnReachedAt = "reached_at" - MilestoneColumnPushedAt = "pushed_at" MilestoneColumnPrimaryDomain = "primary_domain" + MilestoneColumnReachedDate = "reached_date" + MilestoneColumnPushedDate = "pushed_date" ) type milestoneProjection struct { crdb.StatementHandler } -func newMilestoneInstanceProjection(ctx context.Context, config crdb.StatementHandlerConfig) *milestoneProjection { +func newMilestoneProjection(ctx context.Context, config crdb.StatementHandlerConfig) *milestoneProjection { p := new(milestoneProjection) config.ProjectionName = MilestonesProjectionTable config.Reducers = p.reducers() config.InitCheck = crdb.NewMultiTableCheck( crdb.NewTable([]*crdb.Column{ crdb.NewColumn(MilestoneColumnInstanceID, crdb.ColumnTypeText), - crdb.NewColumn(MilestoneColumnMilestoneType, crdb.ColumnTypeEnum), - crdb.NewColumn(MilestoneColumnReachedAt, crdb.ColumnTypeTimestamp, crdb.Nullable()), - crdb.NewColumn(MilestoneColumnPushedAt, crdb.ColumnTypeTimestamp, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnMilestoneType, crdb.ColumnTypeText), + crdb.NewColumn(MilestoneColumnReachedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnPushedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPrimaryDomain, crdb.ColumnTypeText, crdb.Nullable()), }, crdb.NewPrimaryKey(MilestoneColumnInstanceID, MilestoneColumnMilestoneType), @@ -66,7 +65,7 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { }, { Event: instance.InstanceRemovedEventType, - Reduce: p.reduceInstanceRemoved, + Reduce: p.milestoneReached(milestone.PushedInstanceDeletedEventType), }, }, }, @@ -75,11 +74,11 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { EventRedusers: []handler.EventReducer{ { Event: project.ProjectAddedType, - Reduce: p.reduceProjectAdded, + Reduce: p.milestoneReached(milestone.PushedProjectCreatedEventType), }, { Event: project.ApplicationAddedType, - Reduce: p.reduceApplicationAdded, + Reduce: p.milestoneReached(milestone.PushedApplicationCreatedEventType), }, }, }, @@ -92,57 +91,72 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { }, }, }, - { - Aggregate: milestone.AggregateType, - EventRedusers: []handler.EventReducer{ - { - Event: milestone.PushedEventType, - Reduce: p.milestonePushed, - }, - }, - }, } } -func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Event) (*handler.Statement, error) { - e, ok := event.(*instance.DomainPrimarySetEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Sfrgf", "reduce.wrong.event.type %s", instance.InstanceDomainPrimarySetEventType) - } - - var statements []func(eventstore.Event) crdb.Exec - for _, ms := range milestone.All() { - statements = append(statements, crdb.AddUpsertStatement( - []handler.Column{ - handler.NewCol(MilestoneColumnInstanceID, nil), - handler.NewCol(MilestoneColumnMilestoneType, nil), - }, - []handler.Column{ - handler.NewCol(MilestoneColumnInstanceID, e.Aggregate().InstanceID), - handler.NewCol(MilestoneColumnMilestoneType, ms), - handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain), - }, - )) - } - - return crdb.NewMultiStatement(e, statements...), nil -} - func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) { printEvent(event) - - return crdb.NewNoOpStatement(event), nil + e, ok := event.(*instance.InstanceAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-JbHGS", "reduce.wrong.event.type %s", instance.InstanceAddedEventType) + } + allTypes := milestone.PushedEventTypes() + statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes)) + for _, ms := range allTypes { + createColumns := []handler.Column{ + handler.NewCol(MilestoneColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(MilestoneColumnMilestoneType, ms), + } + if ms == milestone.PushedInstanceCreatedEventType { + createColumns = append(createColumns, handler.NewCol(MilestoneColumnReachedDate, event.CreationDate())) + } + statements = append(statements, crdb.AddCreateStatement(createColumns)) + } + return crdb.NewMultiStatement(e, statements...), nil } -func (p *milestoneProjection) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) { +func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Event) (*handler.Statement, error) { printEvent(event) - // ignore instance.ProjectSetEventType - return crdb.NewNoOpStatement(event), nil + e, ok := event.(*instance.DomainPrimarySetEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Sfrgf", "reduce.wrong.event.type %s", instance.InstanceDomainPrimarySetEventType) + } + allTypes := milestone.PushedEventTypes() + statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes)) + for _, ms := range allTypes { + statements = append(statements, crdb.AddUpdateStatement( + []handler.Column{ + handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnMilestoneType, ms), + crdb.NewIsNullCond(MilestoneColumnPushedDate), + }, + )) + } + return crdb.NewMultiStatement(e, statements...), nil } -func (p *milestoneProjection) reduceApplicationAdded(event eventstore.Event) (*handler.Statement, error) { - printEvent(event) - return crdb.NewNoOpStatement(event), nil +func (p *milestoneProjection) milestoneReached(eventType milestone.PushedEventType) func(event eventstore.Event) (*handler.Statement, error) { + return func(event eventstore.Event) (*handler.Statement, error) { + printEvent(event) + if event.EditorUser() == "" || event.EditorService() == "" { + return crdb.NewNoOpStatement(event), nil + } + return crdb.NewUpdateStatement( + event, + []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnMilestoneType, eventType), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + crdb.NewIsNullCond(MilestoneColumnPushedDate), + }, + ), nil + } } func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) { @@ -155,11 +169,6 @@ func (p *milestoneProjection) reduceInstanceRemoved(event eventstore.Event) (*ha return crdb.NewNoOpStatement(event), nil } -func (p *milestoneProjection) milestonePushed(event eventstore.Event) (*handler.Statement, error) { - printEvent(event) - return crdb.NewNoOpStatement(event), nil -} - func printEvent(event eventstore.Event) { var pretty bytes.Buffer if err := json.Indent(&pretty, event.DataAsBytes(), "", " "); err != nil { diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 7e09aacfca..d6d31e93f3 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -67,6 +67,7 @@ var ( TelemetryPusherProjection interface{} DeviceAuthProjection *deviceAuthProjection SessionProjection *sessionProjection + MilestoneProjection *milestoneProjection ) type projection interface { @@ -144,6 +145,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es *eventstore.Eventsto NotificationPolicyProjection = newNotificationPolicyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["notification_policies"])) DeviceAuthProjection = newDeviceAuthProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["device_auth"])) SessionProjection = newSessionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["sessions"])) + MilestoneProjection = newMilestoneProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["milestones"])) newProjectionsList() return nil } @@ -241,5 +243,6 @@ func newProjectionsList() { NotificationPolicyProjection, DeviceAuthProjection, SessionProjection, + MilestoneProjection, } } diff --git a/internal/query/search_query.go b/internal/query/search_query.go index 0f4314dd05..16d37502d2 100644 --- a/internal/query/search_query.go +++ b/internal/query/search_query.go @@ -66,6 +66,27 @@ func (q *NotNullQuery) comp() sq.Sqlizer { return sq.NotEq{q.Column.identifier(): nil} } +type IsNullQuery struct { + Column Column +} + +func NewIsNullQuery(col Column) (*IsNullQuery, error) { + if col.isZero() { + return nil, ErrMissingColumn + } + return &IsNullQuery{ + Column: col, + }, nil +} + +func (q *IsNullQuery) toQuery(query sq.SelectBuilder) sq.SelectBuilder { + return query.Where(q.comp()) +} + +func (q *IsNullQuery) comp() sq.Sqlizer { + return sq.Eq{q.Column.identifier(): nil} +} + type orQuery struct { queries []SearchQuery } diff --git a/internal/query/user.go b/internal/query/user.go index 1550a4678b..7923e5da53 100644 --- a/internal/query/user.go +++ b/internal/query/user.go @@ -338,6 +338,7 @@ func (q *Queries) GetUserByID(ctx context.Context, shouldTriggerBulk bool, userI defer func() { span.EndWithError(err) }() if shouldTriggerBulk { + // TODO: Why are these errors not handled? projection.UserProjection.Trigger(ctx) projection.LoginNameProjection.Trigger(ctx) } diff --git a/internal/repository/milestone/aggregate.go b/internal/repository/milestone/aggregate.go index 568ed78307..2304faf8ef 100644 --- a/internal/repository/milestone/aggregate.go +++ b/internal/repository/milestone/aggregate.go @@ -13,15 +13,14 @@ type Aggregate struct { eventstore.Aggregate } -// Each data point receives its own aggregate -func newAggregate(id, instanceId, resourceOwner string) *Aggregate { +func NewAggregate(id, resourceOwner, instanceID string) *Aggregate { return &Aggregate{ Aggregate: eventstore.Aggregate{ Type: AggregateType, Version: AggregateVersion, ID: id, - InstanceID: instanceId, ResourceOwner: resourceOwner, + InstanceID: instanceID, }, } } diff --git a/internal/repository/milestone/events.go b/internal/repository/milestone/events.go index 44ef15b19d..9bdd099b2d 100644 --- a/internal/repository/milestone/events.go +++ b/internal/repository/milestone/events.go @@ -1,66 +1,216 @@ -//go: - package milestone import ( "context" - "encoding/json" - "time" + "fmt" - "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/eventstore/repository" ) +type PushedEventType eventstore.EventType + const ( - eventTypePrefix = eventstore.EventType("milestone.") - PushedEventType = eventTypePrefix + "pushed" + eventTypePrefix = PushedEventType("milestone.pushed.") + PushedInstanceCreatedEventType = eventTypePrefix + "instance.created" + PushedAuthenticationSucceededOnInstanceEventType = eventTypePrefix + "instance.authentication.succeeded" + PushedProjectCreatedEventType = eventTypePrefix + "project.created" + PushedApplicationCreatedEventType = eventTypePrefix + "application.created" + PushedAuthenticationSucceededOnApplicationEventType = eventTypePrefix + "application.authentication.succeeded" + PushedInstanceDeletedEventType = eventTypePrefix + "instance.deleted" ) -type PushedEvent struct { +func PushedEventTypes() []PushedEventType { + return []PushedEventType{ + PushedInstanceCreatedEventType, + PushedAuthenticationSucceededOnInstanceEventType, + PushedProjectCreatedEventType, + PushedApplicationCreatedEventType, + PushedAuthenticationSucceededOnApplicationEventType, + PushedInstanceDeletedEventType, + } +} + +type PushedEvent interface { + eventstore.Command + IsMilestoneEvent() +} + +type basePushedEvent struct { eventstore.BaseEvent `json:"-"` - Milestone Milestone `json:"milestone"` - Reached time.Time `json:"reached"` - Endpoints []string `json:"endpoints"` - PrimaryDomain string `json:"primaryDomain"` + PrimaryDomain string `json:"primaryDomain"` + Endpoints []string `json:"endpoints"` } -func (e *PushedEvent) Data() interface{} { - return e +func (b *basePushedEvent) Data() interface{} { + return b } -func (e *PushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { +func (b *basePushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { return nil } -func NewPushedEvent( +func (b *basePushedEvent) SetBaseEvent(base *eventstore.BaseEvent) { + b.BaseEvent = *base +} + +func NewPushedEventByType( ctx context.Context, - newAggregate *Aggregate, - milestone Milestone, - reached time.Time, + eventType PushedEventType, + aggregate *Aggregate, endpoints []string, primaryDomain string, -) *PushedEvent { - return &PushedEvent{ - BaseEvent: *eventstore.NewBaseEventForPush( - ctx, - &newAggregate.Aggregate, - PushedEventType, - ), - Milestone: milestone, - Reached: reached, - Endpoints: endpoints, - PrimaryDomain: primaryDomain, +) (PushedEvent, error) { + switch eventType { + case PushedInstanceCreatedEventType: + return NewInstanceCreatedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil + case PushedAuthenticationSucceededOnInstanceEventType: + return NewAuthenticationSucceededOnInstancePushedEvent(ctx, aggregate, endpoints, primaryDomain), nil + case PushedProjectCreatedEventType: + return NewProjectCreatedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil + case PushedApplicationCreatedEventType: + return NewApplicationCreatedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil + case PushedAuthenticationSucceededOnApplicationEventType: + return NewAuthenticationSucceededOnApplicationPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil + case PushedInstanceDeletedEventType: + return NewInstanceDeletedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil + } + return nil, fmt.Errorf("unknown event type %s", eventType) +} + +type InstanceCreatedPushedEvent struct{ basePushedEvent } + +func (e *InstanceCreatedPushedEvent) IsMilestoneEvent() {} + +func NewInstanceCreatedPushedEvent( + ctx context.Context, + aggregate *Aggregate, + endpoints []string, + primaryDomain string, +) *InstanceCreatedPushedEvent { + return &InstanceCreatedPushedEvent{ + basePushedEvent: basePushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + eventstore.EventType(PushedInstanceCreatedEventType), + ), + Endpoints: endpoints, + PrimaryDomain: primaryDomain, + }, } } -func PushedEventMapper(event *repository.Event) (eventstore.Event, error) { - e := &PushedEvent{ - BaseEvent: *eventstore.BaseEventFromRepo(event), +type AuthenticationSucceededOnInstancePushedEvent struct{ basePushedEvent } + +func (e *AuthenticationSucceededOnInstancePushedEvent) IsMilestoneEvent() {} + +func NewAuthenticationSucceededOnInstancePushedEvent( + ctx context.Context, + aggregate *Aggregate, + endpoints []string, + primaryDomain string, +) *AuthenticationSucceededOnInstancePushedEvent { + return &AuthenticationSucceededOnInstancePushedEvent{ + basePushedEvent: basePushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + eventstore.EventType(PushedAuthenticationSucceededOnInstanceEventType), + ), + Endpoints: endpoints, + PrimaryDomain: primaryDomain, + }, + } +} + +type ProjectCreatedPushedEvent struct{ basePushedEvent } + +func (e *ProjectCreatedPushedEvent) IsMilestoneEvent() {} + +func NewProjectCreatedPushedEvent( + ctx context.Context, + aggregate *Aggregate, + endpoints []string, + primaryDomain string, +) *ProjectCreatedPushedEvent { + return &ProjectCreatedPushedEvent{ + basePushedEvent: basePushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + eventstore.EventType(PushedProjectCreatedEventType), + ), + Endpoints: endpoints, + PrimaryDomain: primaryDomain, + }, + } +} + +type ApplicationCreatedPushedEvent struct{ basePushedEvent } + +func (e *ApplicationCreatedPushedEvent) IsMilestoneEvent() {} + +func NewApplicationCreatedPushedEvent( + ctx context.Context, + aggregate *Aggregate, + endpoints []string, + primaryDomain string, +) *ApplicationCreatedPushedEvent { + return &ApplicationCreatedPushedEvent{ + basePushedEvent: basePushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + eventstore.EventType(PushedApplicationCreatedEventType), + ), + Endpoints: endpoints, + PrimaryDomain: primaryDomain, + }, + } +} + +type AuthenticationSucceededOnApplicationPushedEvent struct{ basePushedEvent } + +func (e *AuthenticationSucceededOnApplicationPushedEvent) IsMilestoneEvent() {} + +func NewAuthenticationSucceededOnApplicationPushedEvent( + ctx context.Context, + aggregate *Aggregate, + endpoints []string, + primaryDomain string, +) *AuthenticationSucceededOnApplicationPushedEvent { + return &AuthenticationSucceededOnApplicationPushedEvent{ + basePushedEvent: basePushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + eventstore.EventType(PushedAuthenticationSucceededOnApplicationEventType), + ), + Endpoints: endpoints, + PrimaryDomain: primaryDomain, + }, + } +} + +type InstanceDeletedPushedEvent struct{ basePushedEvent } + +func (e *InstanceDeletedPushedEvent) IsMilestoneEvent() {} + +func NewInstanceDeletedPushedEvent( + ctx context.Context, + aggregate *Aggregate, + endpoints []string, + primaryDomain string, +) *InstanceDeletedPushedEvent { + return &InstanceDeletedPushedEvent{ + basePushedEvent: basePushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate.Aggregate, + eventstore.EventType(PushedInstanceDeletedEventType), + ), + Endpoints: endpoints, + PrimaryDomain: primaryDomain, + }, } - err := json.Unmarshal(event.Data, e) - if err != nil { - return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal milestone pushed") - } - return e, nil } diff --git a/internal/repository/milestone/eventstore.go b/internal/repository/milestone/eventstore.go index a7d229638d..20d82fb018 100644 --- a/internal/repository/milestone/eventstore.go +++ b/internal/repository/milestone/eventstore.go @@ -5,6 +5,10 @@ import ( ) func RegisterEventMappers(es *eventstore.Eventstore) { - es.RegisterFilterEventMapper(AggregateType, ReachedEventType, ReachedEventMapper). - RegisterFilterEventMapper(AggregateType, PushedEventType, PushedEventMapper) + es.RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedProjectCreatedEventType), eventstore.GenericEventMapper[InstanceCreatedPushedEvent]). + RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedAuthenticationSucceededOnInstanceEventType), eventstore.GenericEventMapper[AuthenticationSucceededOnInstancePushedEvent]). + RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedProjectCreatedEventType), eventstore.GenericEventMapper[ProjectCreatedPushedEvent]). + RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedApplicationCreatedEventType), eventstore.GenericEventMapper[ApplicationCreatedPushedEvent]). + RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedAuthenticationSucceededOnApplicationEventType), eventstore.GenericEventMapper[AuthenticationSucceededOnApplicationPushedEvent]). + RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedInstanceDeletedEventType), eventstore.GenericEventMapper[InstanceDeletedPushedEvent]) } diff --git a/internal/repository/milestone/milestone_string.go b/internal/repository/milestone/milestone_string.go deleted file mode 100644 index 7511151970..0000000000 --- a/internal/repository/milestone/milestone_string.go +++ /dev/null @@ -1,30 +0,0 @@ -// Code generated by "stringer -type=Milestone"; DO NOT EDIT. - -package milestone - -import "strconv" - -func _() { - // An "invalid array index" compiler error signifies that the constant values have changed. - // Re-run the stringer command to generate them again. - var x [1]struct{} - _ = x[unknown-0] - _ = x[InstanceCreated-1] - _ = x[AuthenticationSucceededOnInstance-2] - _ = x[ProjectCreated-3] - _ = x[ApplicationCreated-4] - _ = x[AuthenticationSucceededOnApplication-5] - _ = x[InstanceDeleted-6] - _ = x[milestonesCount-7] -} - -const _Milestone_name = "unknownInstanceCreatedAuthenticationSucceededOnInstanceProjectCreatedApplicationCreatedAuthenticationSucceededOnApplicationInstanceDeletedmilestonesCount" - -var _Milestone_index = [...]uint8{0, 7, 22, 55, 69, 87, 123, 138, 153} - -func (i Milestone) String() string { - if i < 0 || i >= Milestone(len(_Milestone_index)-1) { - return "Milestone(" + strconv.FormatInt(int64(i), 10) + ")" - } - return _Milestone_name[_Milestone_index[i]:_Milestone_index[i+1]] -} diff --git a/internal/repository/milestone/milestones.go b/internal/repository/milestone/milestones.go deleted file mode 100644 index f05227a1f4..0000000000 --- a/internal/repository/milestone/milestones.go +++ /dev/null @@ -1,25 +0,0 @@ -//go:generate stringer -type=Milestone - -package milestone - -type Milestone int - -const ( - unknown Milestone = iota - InstanceCreated - AuthenticationSucceededOnInstance - ProjectCreated - ApplicationCreated - AuthenticationSucceededOnApplication - InstanceDeleted - - milestonesCount -) - -func All() []Milestone { - milestones := make([]Milestone, milestonesCount-1) - for i := 1; i < int(milestonesCount); i++ { - milestones[i] = Milestone(i) - } - return milestones -} diff --git a/internal/repository/pseudo/aggregate.go b/internal/repository/pseudo/aggregate.go new file mode 100644 index 0000000000..342b9c4554 --- /dev/null +++ b/internal/repository/pseudo/aggregate.go @@ -0,0 +1,5 @@ +package pseudo + +const ( + AggregateType = "pseudo" +) diff --git a/internal/repository/pseudo/events.go b/internal/repository/pseudo/events.go new file mode 100644 index 0000000000..8d653005d8 --- /dev/null +++ b/internal/repository/pseudo/events.go @@ -0,0 +1,65 @@ +package pseudo + +import ( + "time" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + eventTypePrefix = eventstore.EventType("pseudo.") + TimestampEventType = eventTypePrefix + "timestamp" +) + +var _ eventstore.Event = (*TimestampEvent)(nil) + +type TimestampEvent struct { + Timestamp time.Time + InstanceIDs []string +} + +func (t TimestampEvent) Aggregate() eventstore.Aggregate { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) EditorService() string { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) EditorUser() string { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) Type() eventstore.EventType { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) Sequence() uint64 { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) CreationDate() time.Time { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) PreviousAggregateSequence() uint64 { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) PreviousAggregateTypeSequence() uint64 { + panic("TimestampEvent is not a real event") +} + +func (t TimestampEvent) DataAsBytes() []byte { + panic("TimestampEvent is not a real event") +} + +func NewTimestampEvent( + timestamp time.Time, + instanceIDs ...string, +) *TimestampEvent { + return &TimestampEvent{ + Timestamp: timestamp, + InstanceIDs: instanceIDs, + } +}