diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index 2b72fb0d2d..a678e60fa6 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -45,7 +45,7 @@ Console: InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}" Projections: - RequeueEvery: 20s + RequeueEvery: 60s Customizations: NotificationsQuotas: # RequeueEvery: 1s diff --git a/internal/eventstore/handler/crdb/statement.go b/internal/eventstore/handler/crdb/statement.go index 7d1b5cc644..4073fc5711 100644 --- a/internal/eventstore/handler/crdb/statement.go +++ b/internal/eventstore/handler/crdb/statement.go @@ -288,20 +288,14 @@ func NewCopyCol(column, from string) handler.Column { func NewIsNullCond(column string) handler.Condition { return handler.Condition{ - Name: column, - Value: specialWhere(func(colName, param string) (clause string, needsParam bool) { - return fmt.Sprintf("%s IS NULL", colName), false + Value: specialWhere(func(param string) (clause string, needsParam bool) { + return fmt.Sprintf("%s IS NULL", column), false }), } } -func NewIsNotNullCond(column string) handler.Condition { - return handler.Condition{ - Name: column, - Value: specialWhere(func(colName, param string) (clause string, needsParam bool) { - return fmt.Sprintf("%s IS NOT NULL", colName), false - }), - } +func NewExpressionCond(expr specialWhere) handler.Condition { + return handler.Condition{Value: expr} } // NewCopyStatement creates a new upsert statement which updates a column from an existing row @@ -403,7 +397,7 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string, return names, parameters, values[:parameterIndex] } -type specialWhere func(colName, param string) (clause string, needsParam bool) +type specialWhere func(param string) (clause string, needsParam bool) func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []string, values []interface{}) { wheres = make([]string, len(cols)) @@ -417,7 +411,7 @@ func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []stri values = append(values, col.Value) continue } - clause, needsValueParam := special(col.Name, param) + clause, needsValueParam := special(param) wheres[i] = clause if needsValueParam { values = append(values, col.Value) diff --git a/internal/notification/handlers/telemetry_pusher.go b/internal/notification/handlers/telemetry_pusher.go index cbbac02620..fa3f34e86d 100644 --- a/internal/notification/handlers/telemetry_pusher.go +++ b/internal/notification/handlers/telemetry_pusher.go @@ -76,6 +76,7 @@ func (t *telemetryPusher) reducers() []handler.AggregateReducer { }} } +// TODO: Remove func printEvent(event eventstore.Event) { bytes, err := json.MarshalIndent(event, "", " ") if err != nil { diff --git a/internal/query/projection/milestones.go b/internal/query/projection/milestones.go index f13f640f1e..22a4e39bb8 100644 --- a/internal/query/projection/milestones.go +++ b/internal/query/projection/milestones.go @@ -19,11 +19,12 @@ import ( const ( MilestonesProjectionTable = "projections.milestones" - MilestoneColumnInstanceID = "instance_id" - MilestoneColumnType = "type" - MilestoneColumnPrimaryDomain = "primary_domain" - MilestoneColumnReachedDate = "reached_date" - MilestoneColumnPushedDate = "pushed_date" + MilestoneColumnInstanceID = "instance_id" + MilestoneColumnType = "type" + MilestoneColumnPrimaryDomain = "primary_domain" + MilestoneColumnReachedDate = "reached_date" + MilestoneColumnPushedDate = "last_pushed_date" + MilestoneColumnIgnoredProject = "ignore_project" ) type milestoneProjection struct { @@ -41,6 +42,7 @@ func newMilestoneProjection(ctx context.Context, config crdb.StatementHandlerCon crdb.NewColumn(MilestoneColumnReachedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPushedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPrimaryDomain, crdb.ColumnTypeText, crdb.Nullable()), + crdb.NewColumn(MilestoneColumnIgnoredProject, crdb.ColumnTypeText, crdb.Nullable()), }, crdb.NewPrimaryKey(MilestoneColumnInstanceID, MilestoneColumnType), ), @@ -73,7 +75,7 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { EventRedusers: []handler.EventReducer{ { Event: project.ProjectAddedType, - Reduce: p.milestoneReached(milestone.ProjectCreated), + Reduce: p.reduceProjectAdded, }, { Event: project.ApplicationAddedType, @@ -102,6 +104,41 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer { } } +func (p *milestoneProjection) milestoneReached(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { + return func(event eventstore.Event) (*handler.Statement, error) { + printEvent(event) + if isSystemEvent(event) { + return crdb.NewNoOpStatement(event), nil + } + return crdb.NewUpdateStatement(event, []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, msType), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }), nil + } +} + +func (p *milestoneProjection) reducePushed(event eventstore.Event) (*handler.Statement, error) { + printEvent(event) + e, ok := event.(*milestone.PushedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-XJGXK", "reduce.wrong.event.type %s", milestone.PushedEventType) + } + return crdb.NewUpdateStatement( + event, + []handler.Column{ + handler.NewCol(MilestoneColumnPushedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, e.MilestoneType), + }, + ), nil +} + func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) { e, ok := event.(*instance.InstanceAddedEvent) if !ok { @@ -127,66 +164,74 @@ func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Ev if !ok { return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Sfrgf", "reduce.wrong.event.type %s", instance.InstanceDomainPrimarySetEventType) } - allTypes := milestone.AllTypes() - statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes)) - for _, msType := range allTypes { - statements = append(statements, crdb.AddUpdateStatement( - []handler.Column{ - handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain), - }, - []handler.Condition{ - handler.NewCond(MilestoneColumnInstanceID, e.Aggregate().InstanceID), - handler.NewCond(MilestoneColumnType, msType), - crdb.NewIsNullCond(MilestoneColumnPushedDate), - }, - )) - } - return crdb.NewMultiStatement(e, statements...), nil + return crdb.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, e.Aggregate().InstanceID), + crdb.NewIsNullCond(MilestoneColumnPushedDate), + }, + ), nil } -func (p *milestoneProjection) milestoneReached(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { - return func(event eventstore.Event) (*handler.Statement, error) { - printEvent(event) - if event.EditorUser() == "" || event.EditorService() == "" { - return crdb.NewNoOpStatement(event), nil - } - return crdb.NewUpdateStatement( - event, - []handler.Column{ - handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), - }, - []handler.Condition{ - handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), - handler.NewCond(MilestoneColumnType, msType), - crdb.NewIsNullCond(MilestoneColumnReachedDate), - crdb.NewIsNullCond(MilestoneColumnPushedDate), - }, - ), nil - } -} - -func (p *milestoneProjection) reducePushed(event eventstore.Event) (*handler.Statement, error) { - printEvent(event) - e, ok := event.(*milestone.PushedEvent) - if !ok { - return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-XJGXK", "reduce.wrong.event.type %s", milestone.PushedEventType) +func (p *milestoneProjection) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) { + if !isSystemEvent(event) { + return p.milestoneReached(milestone.ProjectCreated)(event) } return crdb.NewUpdateStatement( event, []handler.Column{ - handler.NewCol(MilestoneColumnPushedDate, event.CreationDate()), + handler.NewCol(MilestoneColumnIgnoredProject, event.Aggregate().ID), }, []handler.Condition{ handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), - handler.NewCond(MilestoneColumnType, e.MilestoneType), + handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication), + crdb.NewIsNullCond(MilestoneColumnReachedDate), }, ), nil } func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) { - return crdb.NewNoOpStatement(event), nil + printEvent(event) + e, ok := event.(*user.UserTokenAddedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-3xhJ7", "reduce.wrong.event.type %s", user.UserTokenAddedType) + } + return crdb.NewMultiStatement( + e, + crdb.AddUpdateStatement( + []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnInstance), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }, + ), + /* crdb.AddUpdateStatement( + []handler.Column{ + handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID), + handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication), + crdb.NewExpressionCond(func(param string) (clause string, needsParam bool) { + return fmt.Sprintf("%s") + }), + crdb.NewIsNullCond(MilestoneColumnReachedDate), + }, + ),*/ + ), nil } +func isSystemEvent(event eventstore.Event) bool { + return event.EditorUser() == "" || event.EditorService() == "" +} + +// TODO: Remove func printEvent(event eventstore.Event) { bytes, err := json.MarshalIndent(event, "", " ") if err != nil { diff --git a/internal/repository/pseudo/events.go b/internal/repository/pseudo/events.go index 5a139d848e..12b84d499d 100644 --- a/internal/repository/pseudo/events.go +++ b/internal/repository/pseudo/events.go @@ -16,9 +16,12 @@ var _ eventstore.Event = (*ScheduledEvent)(nil) type ScheduledEvent struct { *eventstore.BaseEvent `json:"-"` - Timestamp time.Time `json:"timestamp"` - InstanceIDs []string `json:"instanceIDs"` - TriggeringEvent eventstore.Event `json:"triggeringEvent"` + // TODO: `json:"-"` + Timestamp time.Time `json:"timestamp"` + // TODO: `json:"-"` + InstanceIDs []string `json:"instanceIDs"` + // TODO: `json:"-"` + TriggeringEvent eventstore.Event `json:"triggeringEvent"` } func NewScheduledEvent(