diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index a678e60fa6..ebc4d6e1c1 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -45,12 +45,11 @@ Console: InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}" Projections: - RequeueEvery: 60s Customizations: NotificationsQuotas: -# RequeueEvery: 1s + RequeueEvery: 1s Telemetry: - RequeueEvery: 20s + RequeueEvery: 30s DefaultInstance: LoginPolicy: diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index c7c8c1c569..02cbf70b71 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -128,16 +128,18 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string break } } + if h.reduceScheduledPseudoEvent { + queryBuilder. + AddQuery(). + SequenceGreater(seq). + InstanceID(instanceID) + continue + } queryBuilder. AddQuery(). SequenceGreater(seq). - InstanceID(instanceID) - - if !h.reduceScheduledPseudoEvent { - queryBuilder. - AddQuery(). - AggregateTypes(aggregateType) - } + InstanceID(instanceID). + AggregateTypes(aggregateType) } } diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index fd042c61bb..0f2747ae6c 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -315,6 +315,16 @@ func NewLessThanCond(column string, value interface{}) handler.Condition { } } +func NewContainsCond(column string, value interface{}) handler.Condition { + return handler.Condition{ + Name: column, + Value: value, + ParameterOpt: func(placeholder string) string { + return fmt.Sprintf(" @> ARRAY[%s]", placeholder) + }, + } +} + // NewCopyStatement creates a new upsert statement which updates a column from an existing row // cols represent the columns which are objective to change. // if the value of a col is empty the data will be copied from the selected row diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index c0e6728caa..d7aed5ef3b 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -100,13 +100,13 @@ func NewProjectionHandler( reduceScheduledPseudoEvent: reduceScheduledPseudoEvent, } - go func(subscribe bool) { + go func() { <-initialized - if subscribe { + if !h.reduceScheduledPseudoEvent { go h.subscribe(ctx) } go h.schedule(ctx) - }(!h.reduceScheduledPseudoEvent) + }() return h } diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index 3ab88ef4a1..349799e46a 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -342,6 +342,7 @@ func TestProjectionHandler_Process(t *testing.T) { nil, nil, nil, + false, ) index, err := h.Process(tt.args.ctx, tt.args.events...) diff --git a/internal/query/projection/milestones.go b/internal/query/projection/milestones.go index 19f40de250..802aac6f17 100644 --- a/internal/query/projection/milestones.go +++ b/internal/query/projection/milestones.go @@ -2,9 +2,6 @@ package projection import ( "context" - "encoding/json" - "fmt" - "strings" "github.com/zitadel/zitadel/internal/repository/milestone" @@ -20,12 +17,12 @@ import ( const ( MilestonesProjectionTable = "projections.milestones" - MilestoneColumnInstanceID = "instance_id" - MilestoneColumnType = "type" - MilestoneColumnPrimaryDomain = "primary_domain" - MilestoneColumnReachedDate = "reached_date" - MilestoneColumnPushedDate = "last_pushed_date" - MilestoneColumnIgnoredProject = "ignore_project" + MilestoneColumnInstanceID = "instance_id" + MilestoneColumnType = "type" + MilestoneColumnPrimaryDomain = "primary_domain" + MilestoneColumnReachedDate = "reached_date" + MilestoneColumnPushedDate = "last_pushed_date" + MilestoneColumnIgnoreClientIDs = "ignore_client_ids" ) type milestoneProjection struct { @@ -43,7 +40,7 @@ func newMilestoneProjection(ctx context.Context, config crdb.StatementHandlerCon crdb.NewColumn(MilestoneColumnReachedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPushedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPrimaryDomain, crdb.ColumnTypeText, crdb.Nullable()), - crdb.NewColumn(MilestoneColumnIgnoredProject, crdb.ColumnTypeText, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnIgnoreClientIDs, crdb.ColumnTypeTextArray, crdb.Nullable()), }, crdb.NewPrimaryKey(MilestoneColumnInstanceID, MilestoneColumnType), ), @@ -67,7 +64,7 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { }, { Event: instance.InstanceRemovedEventType, - Reduce: p.milestoneReached(milestone.InstanceDeleted), + Reduce: p.reduceReachedFunc(milestone.InstanceDeleted), }, }, }, @@ -76,11 +73,19 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { EventRedusers: []handler.EventReducer{ { Event: project.ProjectAddedType, - Reduce: p.reduceProjectAdded, + Reduce: p.reduceReachedIfUserEventFunc(milestone.ProjectCreated), }, { Event: project.ApplicationAddedType, - Reduce: p.milestoneReached(milestone.ApplicationCreated), + Reduce: p.reduceReachedIfUserEventFunc(milestone.ApplicationCreated), + }, + { + Event: project.OIDCConfigAddedType, + Reduce: p.reduceOIDCConfigAdded, + }, + { + Event: project.APIConfigAddedType, + Reduce: p.reduceAPIConfigAdded, }, }, }, @@ -88,6 +93,8 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { Aggregate: user.AggregateType, EventRedusers: []handler.EventReducer{ { + // user.UserTokenAddedType is not emitted on creation of personal access tokens + // PATs have no effect on milestone.AuthenticationSucceededOnApplication or milestone.AuthenticationSucceededOnInstance Event: user.UserTokenAddedType, Reduce: p.reduceUserTokenAdded, }, @@ -105,12 +112,17 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { } } -func (p *milestoneProjection) milestoneReached(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { +func (p *milestoneProjection) reduceReachedIfUserEventFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { return func(event eventstore.Event) (*handler.Statement, error) { - printEvent(event) - if isSystemEvent(event) { + if p.isSystemEvent(event) { return crdb.NewNoOpStatement(event), nil } + return p.reduceReachedFunc(msType)(event) + } +} + +func (p *milestoneProjection) reduceReachedFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { + return func(event eventstore.Event) (*handler.Statement, error) { return crdb.NewUpdateStatement(event, []handler.Column{ handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), }, @@ -123,7 +135,6 @@ func (p *milestoneProjection) milestoneReached(msType milestone.Type) func(event } func (p *milestoneProjection) reducePushed(event eventstore.Event) (*handler.Statement, error) { - printEvent(event) e, ok := event.(*milestone.PushedEvent) if !ok { return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-XJGXK", "reduce.wrong.event.type %s", milestone.PushedEventType) @@ -185,14 +196,30 @@ func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Ev ), nil } -func (p *milestoneProjection) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) { - if !isSystemEvent(event) { - return p.milestoneReached(milestone.ProjectCreated)(event) +func (p *milestoneProjection) reduceOIDCConfigAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*project.OIDCConfigAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Rw7Cv", "reduce.wrong.event.type %s", project.OIDCConfigAddedType) + } + return p.reduceAppConfigAdded(e, e.ClientID) +} + +func (p *milestoneProjection) reduceAPIConfigAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*project.APIConfigAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-QudD2", "reduce.wrong.event.type %s", project.APIConfigAddedType) + } + return p.reduceAppConfigAdded(e, e.ClientID) +} + +func (p *milestoneProjection) reduceAppConfigAdded(event eventstore.Event, clientID string) (*handler.Statement, error) { + if !p.isSystemEvent(event) { + return crdb.NewNoOpStatement(event), nil } return crdb.NewUpdateStatement( event, []handler.Column{ - handler.NewCol(MilestoneColumnIgnoredProject, event.Aggregate().ID), + crdb.NewArrayAppendCol(MilestoneColumnIgnoreClientIDs, clientID), }, []handler.Condition{ handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), @@ -207,8 +234,7 @@ func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*han if !ok { return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-3xhJ7", "reduce.wrong.event.type %s", user.UserTokenAddedType) } - return crdb.NewMultiStatement( - e, + statements := []func(eventstore.Event) crdb.Exec{ crdb.AddUpdateStatement( []handler.Column{ handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), @@ -219,6 +245,9 @@ func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*han crdb.NewIsNullCond(MilestoneColumnReachedDate), }, ), + } + // We ignore authentications without app, for example JWT profile or PAT + if e.ApplicationID != "" { crdb.AddUpdateStatement( []handler.Column{ handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), @@ -226,22 +255,14 @@ func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*han []handler.Condition{ handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication), - crdb.NewNotEqualCond(MilestoneColumnIgnoredProject, strings.Split(e.ApplicationID, "@")[0]), + crdb.NewContainsCond(MilestoneColumnIgnoreClientIDs, e.ApplicationID), crdb.NewIsNullCond(MilestoneColumnReachedDate), }, - ), - ), nil -} - -func isSystemEvent(event eventstore.Event) bool { - return event.EditorUser() == "" || event.EditorService() == "" -} - -// TODO: Remove -func printEvent(event eventstore.Event) { - bytes, err := json.MarshalIndent(event, "", " ") - if err != nil { - panic(err) + ) } - fmt.Println(event.Type(), string(bytes)) + return crdb.NewMultiStatement(e, statements...), nil +} + +func (p *milestoneProjection) isSystemEvent(event eventstore.Event) bool { + return event.EditorUser() == "" || event.EditorService() == "" || event.EditorService() == "SYSTEM" }