calculate and push 4 in 6 milestones

This commit is contained in:
Elio Bischof 2023-06-28 08:19:34 +02:00
parent 1b5f5e9e62
commit 51a9a54cfd
No known key found for this signature in database
GPG Key ID: 7B383FDE4DDBF1BD
22 changed files with 667 additions and 229 deletions

View File

@ -44,11 +44,12 @@ Console:
InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}" InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}"
Projections: Projections:
RequeueEvery: 1s
Customizations: Customizations:
NotificationsQuotas: NotificationsQuotas:
RequeueEvery: 1s # RequeueEvery: 1s
Telemetry: Telemetry:
RequeueEvery: 1s RequeueEvery: 10s
DefaultInstance: DefaultInstance:
LoginPolicy: LoginPolicy:

View File

@ -0,0 +1,27 @@
package command
import (
"context"
"github.com/zitadel/zitadel/internal/repository/milestone"
)
// MilestonePushed writes a new event with a new milestone.Aggregate to the eventstore
func (c *Commands) MilestonePushed(
ctx context.Context,
instanceID string,
eventType milestone.PushedEventType,
endpoints []string,
primaryDomain string,
) error {
id, err := c.idGenerator.Next()
if err != nil {
return err
}
pushedEvent, err := milestone.NewPushedEventByType(ctx, eventType, milestone.NewAggregate(id, instanceID, instanceID), endpoints, primaryDomain)
if err != nil {
return err
}
_, err = c.eventstore.Push(ctx, pushedEvent)
return err
}

View File

@ -1,25 +0,0 @@
package command
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/milestone"
)
// ReportMilestoneReached writes each *milestone.ReachedEvent directly to the event store
func (c *Commands) ReportMilestoneReached(ctx context.Context, triggeringEvent eventstore.Event, customContext interface{}) error {
aggregateId, err := c.idGenerator.Next()
if err != nil {
return err
}
_, err = c.eventstore.Push(ctx, milestone.NewReachedEvent(ctx, aggregateId, triggeringEvent, customContext))
return err
}
// ReportMilestonePushed defers a milestone.PushedEvent for each *milestone.ReachedEvent and writes it directly to the event store.
func (c *Commands) ReportMilestonePushed(ctx context.Context, endpoints []string, reachedEvent *milestone.ReachedEvent) error {
_, err := c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, reachedEvent, endpoints))
return err
}

View File

@ -5,6 +5,8 @@ import (
"database/sql" "database/sql"
"fmt" "fmt"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/database"
@ -49,6 +51,8 @@ type StatementHandler struct {
initialized chan bool initialized chan bool
bulkLimit uint64 bulkLimit uint64
subscribe bool
} }
func NewStatementHandler( func NewStatementHandler(
@ -57,9 +61,16 @@ func NewStatementHandler(
) StatementHandler { ) StatementHandler {
aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers)) aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers))
reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers)) reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers))
subscribe := true
for _, aggReducer := range config.Reducers { for _, aggReducer := range config.Reducers {
aggregateTypes = append(aggregateTypes, aggReducer.Aggregate) aggregateTypes = append(aggregateTypes, aggReducer.Aggregate)
if aggReducer.Aggregate == pseudo.AggregateType {
subscribe = false
}
for _, eventReducer := range aggReducer.EventRedusers { for _, eventReducer := range aggReducer.EventRedusers {
if eventReducer.Event == pseudo.TimestampEventType {
subscribe = false
}
reduces[eventReducer.Event] = eventReducer.Reduce reduces[eventReducer.Event] = eventReducer.Reduce
} }
} }
@ -80,7 +91,7 @@ func NewStatementHandler(
initialized: make(chan bool), initialized: make(chan bool),
} }
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized) h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized, subscribe)
return h return h
} }
@ -88,6 +99,9 @@ func NewStatementHandler(
func (h *StatementHandler) Start() { func (h *StatementHandler) Start() {
h.initialized <- true h.initialized <- true
close(h.initialized) close(h.initialized)
if !h.subscribe {
return
}
h.Subscribe(h.aggregates...) h.Subscribe(h.aggregates...)
} }

View File

@ -1,6 +1,7 @@
package crdb package crdb
import ( import (
"fmt"
"strconv" "strconv"
"strings" "strings"
@ -285,6 +286,24 @@ func NewCopyCol(column, from string) handler.Column {
} }
} }
func NewIsNullCond(column string) handler.Condition {
return handler.Condition{
Name: column,
Value: specialWhere(func(colName, param string) (clause string, needsParam bool) {
return fmt.Sprintf("%s IS NULL", colName), false
}),
}
}
func NewIsNotNullCond(column string) handler.Condition {
return handler.Condition{
Name: column,
Value: specialWhere(func(colName, param string) (clause string, needsParam bool) {
return fmt.Sprintf("%s IS NOT NULL", colName), false
}),
}
}
// NewCopyStatement creates a new upsert statement which updates a column from an existing row // NewCopyStatement creates a new upsert statement which updates a column from an existing row
// cols represent the columns which are objective to change. // cols represent the columns which are objective to change.
// if the value of a col is empty the data will be copied from the selected row // if the value of a col is empty the data will be copied from the selected row
@ -384,13 +403,25 @@ func columnsToQuery(cols []handler.Column) (names []string, parameters []string,
return names, parameters, values[:parameterIndex] return names, parameters, values[:parameterIndex]
} }
type specialWhere func(colName, param string) (clause string, needsParam bool)
func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []string, values []interface{}) { func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []string, values []interface{}) {
wheres = make([]string, len(cols)) wheres = make([]string, len(cols))
values = make([]interface{}, len(cols)) values = make([]interface{}, 0, len(cols))
for i, col := range cols { for i, col := range cols {
wheres[i] = "(" + col.Name + " = $" + strconv.Itoa(i+1+paramOffset) + ")" param := strconv.Itoa(i + 1 + paramOffset)
values[i] = col.Value special, ok := col.Value.(specialWhere)
if !ok {
wheres[i] = "(" + col.Name + " = $" + param + ")"
values = append(values, col.Value)
continue
}
clause, needsValueParam := special(col.Name, param)
wheres[i] = clause
if needsValueParam {
values = append(values, col.Value)
}
} }
return wheres, values return wheres, values

View File

@ -6,6 +6,8 @@ import (
"runtime/debug" "runtime/debug"
"time" "time"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/authz"
@ -73,6 +75,7 @@ func NewProjectionHandler(
lock Lock, lock Lock,
unlock Unlock, unlock Unlock,
initialized <-chan bool, initialized <-chan bool,
subscribe bool,
) *ProjectionHandler { ) *ProjectionHandler {
concurrentInstances := int(config.ConcurrentInstances) concurrentInstances := int(config.ConcurrentInstances)
if concurrentInstances < 1 { if concurrentInstances < 1 {
@ -97,8 +100,9 @@ func NewProjectionHandler(
go func() { go func() {
<-initialized <-initialized
go h.subscribe(ctx) if subscribe {
go h.subscribe(ctx)
}
go h.schedule(ctx) go h.schedule(ctx)
}() }()
@ -112,6 +116,13 @@ func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) er
if len(instances) > 0 { if len(instances) > 0 {
ids = instances ids = instances
} }
if h.searchQuery == nil {
return h.processTimestamp(ctx, ids...)
}
return h.processEvents(ctx, ids...)
}
func (h *ProjectionHandler) processEvents(ctx context.Context, ids ...string) error {
for { for {
events, hasLimitExceeded, err := h.FetchEvents(ctx, ids...) events, hasLimitExceeded, err := h.FetchEvents(ctx, ids...)
if err != nil { if err != nil {
@ -130,6 +141,11 @@ func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) er
} }
} }
func (h *ProjectionHandler) processTimestamp(ctx context.Context, instances ...string) error {
_, err := h.Process(ctx, pseudo.NewTimestampEvent(h.nowFunc(), instances...))
return err
}
// Process handles multiple events by reducing them to statements and updating the projection // Process handles multiple events by reducing them to statements and updating the projection
func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Event) (index int, err error) { func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Event) (index int, err error) {
if len(events) == 0 { if len(events) == 0 {

View File

@ -2,12 +2,20 @@ package handlers
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"github.com/zitadel/zitadel/internal/repository/milestone" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
@ -61,34 +69,62 @@ func NewTelemetryPusher(
} }
func (t *telemetryPusher) reducers() []handler.AggregateReducer { func (t *telemetryPusher) reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{ return []handler.AggregateReducer{{
{ Aggregate: pseudo.AggregateType,
Aggregate: milestone.AggregateType, EventRedusers: []handler.EventReducer{{
EventRedusers: []handler.EventReducer{ Event: pseudo.TimestampEventType,
{ Reduce: t.pushMilestones,
Event: milestone.ReachedEventType, }},
Reduce: t.reduceMilestoneReached, }}
},
},
},
}
} }
func (t *telemetryPusher) reduceMilestoneReached(event eventstore.Event) (*handler.Statement, error) { func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*milestone.ReachedEvent) ctx := call.WithTimestamp(context.Background())
timestampEvent, ok := event.(pseudo.TimestampEvent)
if !ok { if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-UjA3E", "reduce.wrong.event.type %s", milestone.ReachedEventType) return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type())
} }
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, nil, milestone.AggregateType, milestone.PushedEventType) isReached, err := query.NewNotNullQuery(query.MilestoneReachedDateColID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if alreadyHandled { isNotPushed, err := query.NewIsNullQuery(query.MilestonePushedDateColID)
return crdb.NewNoOpStatement(e), nil if err != nil {
return nil, err
} }
hasPrimaryDomain, err := query.NewNotNullQuery(query.MilestonePrimaryDomainColID)
if err != nil {
return nil, err
}
unpushedMilestones, err := t.queries.Queries.SearchMilestones(ctx, timestampEvent.InstanceIDs, &query.MilestonesSearchQueries{
SearchRequest: query.SearchRequest{
Offset: 100,
SortingColumn: query.MilestoneReachedDateColID,
Asc: true,
},
Queries: []query.SearchQuery{isReached, isNotPushed, hasPrimaryDomain},
})
if err != nil {
return nil, err
}
var errs int
for _, ms := range unpushedMilestones.Milestones {
if err = t.pushMilestone(ctx, ms); err != nil {
errs++
logging.Warnf("pushing milestone %+v failed: %s", *ms, err.Error())
}
}
if errs > 0 {
return nil, fmt.Errorf("pushing %d of %d milestones failed", errs, unpushedMilestones.Count)
}
return crdb.NewNoOpStatement(timestampEvent), nil
}
func (t *telemetryPusher) pushMilestone(ctx context.Context, ms *query.Milestone) error {
for _, endpoint := range t.endpoints { for _, endpoint := range t.endpoints {
if err = types.SendJSON( if err := types.SendJSON(
ctx, ctx,
webhook.Config{ webhook.Config{
CallURL: endpoint, CallURL: endpoint,
@ -96,18 +132,13 @@ func (t *telemetryPusher) reduceMilestoneReached(event eventstore.Event) (*handl
}, },
t.queries.GetFileSystemProvider, t.queries.GetFileSystemProvider,
t.queries.GetLogProvider, t.queries.GetLogProvider,
e, ms,
e, nil,
t.metricSuccessfulDeliveriesJSON, t.metricSuccessfulDeliveriesJSON,
t.metricFailedDeliveriesJSON, t.metricFailedDeliveriesJSON,
).WithoutTemplate(); err != nil { ).WithoutTemplate(); err != nil {
return nil, err return err
} }
} }
return t.commands.MilestonePushed(ctx, ms.InstanceID, ms.MilestoneType, t.endpoints, ms.PrimaryDomain)
err = t.commands.ReportMilestonePushed(ctx, t.endpoints, e)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
} }

View File

@ -76,13 +76,15 @@ func Start(
metricSuccessfulDeliveriesJSON, metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON, metricFailedDeliveriesJSON,
).Start() ).Start()
handlers.NewTelemetryPusher( if telemetryCfg.Enabled {
ctx, handlers.NewTelemetryPusher(
telemetryCfg, ctx,
projection.ApplyCustomConfig(telemetryHandlerCustomConfig), telemetryCfg,
commands, projection.ApplyCustomConfig(telemetryHandlerCustomConfig),
q, commands,
metricSuccessfulDeliveriesJSON, q,
metricFailedDeliveriesJSON, metricSuccessfulDeliveriesJSON,
).Start() metricFailedDeliveriesJSON,
).Start()
}
} }

139
internal/query/milestone.go Normal file
View File

@ -0,0 +1,139 @@
package query
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/api/authz"
sq "github.com/Masterminds/squirrel"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
type Milestones struct {
SearchResponse
Milestones []*Milestone
}
type Milestone struct {
InstanceID string
MilestoneType milestone.PushedEventType
ReachedDate time.Time
PushedDate time.Time
PrimaryDomain string
}
type MilestonesSearchQueries struct {
SearchRequest
Queries []SearchQuery
}
func (q *MilestonesSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
query = q.SearchRequest.toQuery(query)
for _, q := range q.Queries {
query = q.toQuery(query)
}
return query
}
var (
milestonesTable = table{
name: projection.MilestonesProjectionTable,
instanceIDCol: projection.MilestoneColumnInstanceID,
}
MilestoneInstanceIDColID = Column{
name: projection.MilestoneColumnInstanceID,
table: milestonesTable,
}
MilestoneTypeColID = Column{
name: projection.MilestoneColumnMilestoneType,
table: milestonesTable,
}
MilestonePrimaryDomainColID = Column{
name: projection.MilestoneColumnPrimaryDomain,
table: milestonesTable,
}
MilestoneReachedDateColID = Column{
name: projection.MilestoneColumnReachedDate,
table: milestonesTable,
}
MilestonePushedDateColID = Column{
name: projection.MilestoneColumnPushedDate,
table: milestonesTable,
}
)
// SearchMilestones tries to defer the instanceID from the passed context if no instanceIDs are passed
func (q *Queries) SearchMilestones(ctx context.Context, instanceIDs []string, queries *MilestonesSearchQueries) (_ *Milestones, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
query, scan := prepareMilestonesQuery(ctx, q.client)
if len(instanceIDs) == 0 {
instanceIDs = []string{authz.GetInstance(ctx).InstanceID()}
}
stmt, args, err := queries.toQuery(query).
Where(sq.Eq{
MilestoneInstanceIDColID.identifier(): fmt.Sprintf("IN (%s)", strings.Join(instanceIDs, ",")),
}).ToSql()
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-A9i5k", "Errors.Query.SQLStatement")
}
rows, err := q.client.QueryContext(ctx, stmt, args...)
if err != nil {
return nil, err
}
milestones, err := scan(rows)
if err != nil {
return nil, err
}
milestones.LatestSequence, err = q.latestSequence(ctx, milestonesTable)
return milestones, err
}
func prepareMilestonesQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(*sql.Rows) (*Milestones, error)) {
return sq.Select(
MilestonePrimaryDomainColID.identifier(),
MilestoneReachedDateColID.identifier(),
MilestonePushedDateColID.identifier(),
MilestoneTypeColID.identifier(),
countColumn.identifier(),
).
From(notificationPolicyTable.identifier() + db.Timetravel(call.Took(ctx))).
PlaceholderFormat(sq.Dollar),
func(rows *sql.Rows) (*Milestones, error) {
milestones := make([]*Milestone, 0)
var count uint64
for rows.Next() {
m := new(Milestone)
err := rows.Scan(
&m.PrimaryDomain,
&m.ReachedDate,
&m.MilestoneType,
&count,
)
if err != nil {
return nil, err
}
milestones = append(milestones, m)
}
if err := rows.Close(); err != nil {
return nil, errors.ThrowInternal(err, "QUERY-CK9mI", "Errors.Query.CloseRows")
}
return &Milestones{
Milestones: milestones,
SearchResponse: SearchResponse{
Count: count,
},
}, nil
}
}

View File

@ -6,9 +6,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/repository/user"
"github.com/zitadel/zitadel/internal/repository/milestone" "github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/errors"
@ -16,6 +13,8 @@ import (
"github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/repository/instance" "github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/repository/user"
) )
const ( const (
@ -23,25 +22,25 @@ const (
MilestoneColumnInstanceID = "instance_id" MilestoneColumnInstanceID = "instance_id"
MilestoneColumnMilestoneType = "milestone_type" MilestoneColumnMilestoneType = "milestone_type"
MilestoneColumnReachedAt = "reached_at"
MilestoneColumnPushedAt = "pushed_at"
MilestoneColumnPrimaryDomain = "primary_domain" MilestoneColumnPrimaryDomain = "primary_domain"
MilestoneColumnReachedDate = "reached_date"
MilestoneColumnPushedDate = "pushed_date"
) )
type milestoneProjection struct { type milestoneProjection struct {
crdb.StatementHandler crdb.StatementHandler
} }
func newMilestoneInstanceProjection(ctx context.Context, config crdb.StatementHandlerConfig) *milestoneProjection { func newMilestoneProjection(ctx context.Context, config crdb.StatementHandlerConfig) *milestoneProjection {
p := new(milestoneProjection) p := new(milestoneProjection)
config.ProjectionName = MilestonesProjectionTable config.ProjectionName = MilestonesProjectionTable
config.Reducers = p.reducers() config.Reducers = p.reducers()
config.InitCheck = crdb.NewMultiTableCheck( config.InitCheck = crdb.NewMultiTableCheck(
crdb.NewTable([]*crdb.Column{ crdb.NewTable([]*crdb.Column{
crdb.NewColumn(MilestoneColumnInstanceID, crdb.ColumnTypeText), crdb.NewColumn(MilestoneColumnInstanceID, crdb.ColumnTypeText),
crdb.NewColumn(MilestoneColumnMilestoneType, crdb.ColumnTypeEnum), crdb.NewColumn(MilestoneColumnMilestoneType, crdb.ColumnTypeText),
crdb.NewColumn(MilestoneColumnReachedAt, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnReachedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()),
crdb.NewColumn(MilestoneColumnPushedAt, crdb.ColumnTypeTimestamp, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPushedDate, crdb.ColumnTypeTimestamp, crdb.Nullable()),
crdb.NewColumn(MilestoneColumnPrimaryDomain, crdb.ColumnTypeText, crdb.Nullable()), crdb.NewColumn(MilestoneColumnPrimaryDomain, crdb.ColumnTypeText, crdb.Nullable()),
}, },
crdb.NewPrimaryKey(MilestoneColumnInstanceID, MilestoneColumnMilestoneType), crdb.NewPrimaryKey(MilestoneColumnInstanceID, MilestoneColumnMilestoneType),
@ -66,7 +65,7 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer {
}, },
{ {
Event: instance.InstanceRemovedEventType, Event: instance.InstanceRemovedEventType,
Reduce: p.reduceInstanceRemoved, Reduce: p.milestoneReached(milestone.PushedInstanceDeletedEventType),
}, },
}, },
}, },
@ -75,11 +74,11 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer {
EventRedusers: []handler.EventReducer{ EventRedusers: []handler.EventReducer{
{ {
Event: project.ProjectAddedType, Event: project.ProjectAddedType,
Reduce: p.reduceProjectAdded, Reduce: p.milestoneReached(milestone.PushedProjectCreatedEventType),
}, },
{ {
Event: project.ApplicationAddedType, Event: project.ApplicationAddedType,
Reduce: p.reduceApplicationAdded, Reduce: p.milestoneReached(milestone.PushedApplicationCreatedEventType),
}, },
}, },
}, },
@ -92,57 +91,72 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer {
}, },
}, },
}, },
{
Aggregate: milestone.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: milestone.PushedEventType,
Reduce: p.milestonePushed,
},
},
},
} }
} }
func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*instance.DomainPrimarySetEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Sfrgf", "reduce.wrong.event.type %s", instance.InstanceDomainPrimarySetEventType)
}
var statements []func(eventstore.Event) crdb.Exec
for _, ms := range milestone.All() {
statements = append(statements, crdb.AddUpsertStatement(
[]handler.Column{
handler.NewCol(MilestoneColumnInstanceID, nil),
handler.NewCol(MilestoneColumnMilestoneType, nil),
},
[]handler.Column{
handler.NewCol(MilestoneColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(MilestoneColumnMilestoneType, ms),
handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain),
},
))
}
return crdb.NewMultiStatement(e, statements...), nil
}
func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) { func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) {
printEvent(event) printEvent(event)
e, ok := event.(*instance.InstanceAddedEvent)
return crdb.NewNoOpStatement(event), nil if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-JbHGS", "reduce.wrong.event.type %s", instance.InstanceAddedEventType)
}
allTypes := milestone.PushedEventTypes()
statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes))
for _, ms := range allTypes {
createColumns := []handler.Column{
handler.NewCol(MilestoneColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(MilestoneColumnMilestoneType, ms),
}
if ms == milestone.PushedInstanceCreatedEventType {
createColumns = append(createColumns, handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()))
}
statements = append(statements, crdb.AddCreateStatement(createColumns))
}
return crdb.NewMultiStatement(e, statements...), nil
} }
func (p *milestoneProjection) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) { func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Event) (*handler.Statement, error) {
printEvent(event) printEvent(event)
// ignore instance.ProjectSetEventType e, ok := event.(*instance.DomainPrimarySetEvent)
return crdb.NewNoOpStatement(event), nil if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Sfrgf", "reduce.wrong.event.type %s", instance.InstanceDomainPrimarySetEventType)
}
allTypes := milestone.PushedEventTypes()
statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes))
for _, ms := range allTypes {
statements = append(statements, crdb.AddUpdateStatement(
[]handler.Column{
handler.NewCol(MilestoneColumnPrimaryDomain, e.Domain),
},
[]handler.Condition{
handler.NewCond(MilestoneColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCond(MilestoneColumnMilestoneType, ms),
crdb.NewIsNullCond(MilestoneColumnPushedDate),
},
))
}
return crdb.NewMultiStatement(e, statements...), nil
} }
func (p *milestoneProjection) reduceApplicationAdded(event eventstore.Event) (*handler.Statement, error) { func (p *milestoneProjection) milestoneReached(eventType milestone.PushedEventType) func(event eventstore.Event) (*handler.Statement, error) {
printEvent(event) return func(event eventstore.Event) (*handler.Statement, error) {
return crdb.NewNoOpStatement(event), nil printEvent(event)
if event.EditorUser() == "" || event.EditorService() == "" {
return crdb.NewNoOpStatement(event), nil
}
return crdb.NewUpdateStatement(
event,
[]handler.Column{
handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()),
},
[]handler.Condition{
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
handler.NewCond(MilestoneColumnMilestoneType, eventType),
crdb.NewIsNullCond(MilestoneColumnReachedDate),
crdb.NewIsNullCond(MilestoneColumnPushedDate),
},
), nil
}
} }
func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) { func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) {
@ -155,11 +169,6 @@ func (p *milestoneProjection) reduceInstanceRemoved(event eventstore.Event) (*ha
return crdb.NewNoOpStatement(event), nil return crdb.NewNoOpStatement(event), nil
} }
func (p *milestoneProjection) milestonePushed(event eventstore.Event) (*handler.Statement, error) {
printEvent(event)
return crdb.NewNoOpStatement(event), nil
}
func printEvent(event eventstore.Event) { func printEvent(event eventstore.Event) {
var pretty bytes.Buffer var pretty bytes.Buffer
if err := json.Indent(&pretty, event.DataAsBytes(), "", " "); err != nil { if err := json.Indent(&pretty, event.DataAsBytes(), "", " "); err != nil {

View File

@ -67,6 +67,7 @@ var (
TelemetryPusherProjection interface{} TelemetryPusherProjection interface{}
DeviceAuthProjection *deviceAuthProjection DeviceAuthProjection *deviceAuthProjection
SessionProjection *sessionProjection SessionProjection *sessionProjection
MilestoneProjection *milestoneProjection
) )
type projection interface { type projection interface {
@ -144,6 +145,7 @@ func Create(ctx context.Context, sqlClient *database.DB, es *eventstore.Eventsto
NotificationPolicyProjection = newNotificationPolicyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["notification_policies"])) NotificationPolicyProjection = newNotificationPolicyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["notification_policies"]))
DeviceAuthProjection = newDeviceAuthProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["device_auth"])) DeviceAuthProjection = newDeviceAuthProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["device_auth"]))
SessionProjection = newSessionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["sessions"])) SessionProjection = newSessionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["sessions"]))
MilestoneProjection = newMilestoneProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["milestones"]))
newProjectionsList() newProjectionsList()
return nil return nil
} }
@ -241,5 +243,6 @@ func newProjectionsList() {
NotificationPolicyProjection, NotificationPolicyProjection,
DeviceAuthProjection, DeviceAuthProjection,
SessionProjection, SessionProjection,
MilestoneProjection,
} }
} }

View File

@ -66,6 +66,27 @@ func (q *NotNullQuery) comp() sq.Sqlizer {
return sq.NotEq{q.Column.identifier(): nil} return sq.NotEq{q.Column.identifier(): nil}
} }
type IsNullQuery struct {
Column Column
}
func NewIsNullQuery(col Column) (*IsNullQuery, error) {
if col.isZero() {
return nil, ErrMissingColumn
}
return &IsNullQuery{
Column: col,
}, nil
}
func (q *IsNullQuery) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(q.comp())
}
func (q *IsNullQuery) comp() sq.Sqlizer {
return sq.Eq{q.Column.identifier(): nil}
}
type orQuery struct { type orQuery struct {
queries []SearchQuery queries []SearchQuery
} }

View File

@ -338,6 +338,7 @@ func (q *Queries) GetUserByID(ctx context.Context, shouldTriggerBulk bool, userI
defer func() { span.EndWithError(err) }() defer func() { span.EndWithError(err) }()
if shouldTriggerBulk { if shouldTriggerBulk {
// TODO: Why are these errors not handled?
projection.UserProjection.Trigger(ctx) projection.UserProjection.Trigger(ctx)
projection.LoginNameProjection.Trigger(ctx) projection.LoginNameProjection.Trigger(ctx)
} }

View File

@ -13,15 +13,14 @@ type Aggregate struct {
eventstore.Aggregate eventstore.Aggregate
} }
// Each data point receives its own aggregate func NewAggregate(id, resourceOwner, instanceID string) *Aggregate {
func newAggregate(id, instanceId, resourceOwner string) *Aggregate {
return &Aggregate{ return &Aggregate{
Aggregate: eventstore.Aggregate{ Aggregate: eventstore.Aggregate{
Type: AggregateType, Type: AggregateType,
Version: AggregateVersion, Version: AggregateVersion,
ID: id, ID: id,
InstanceID: instanceId,
ResourceOwner: resourceOwner, ResourceOwner: resourceOwner,
InstanceID: instanceID,
}, },
} }
} }

View File

@ -1,66 +1,216 @@
//go:
package milestone package milestone
import ( import (
"context" "context"
"encoding/json" "fmt"
"time"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
) )
type PushedEventType eventstore.EventType
const ( const (
eventTypePrefix = eventstore.EventType("milestone.") eventTypePrefix = PushedEventType("milestone.pushed.")
PushedEventType = eventTypePrefix + "pushed" PushedInstanceCreatedEventType = eventTypePrefix + "instance.created"
PushedAuthenticationSucceededOnInstanceEventType = eventTypePrefix + "instance.authentication.succeeded"
PushedProjectCreatedEventType = eventTypePrefix + "project.created"
PushedApplicationCreatedEventType = eventTypePrefix + "application.created"
PushedAuthenticationSucceededOnApplicationEventType = eventTypePrefix + "application.authentication.succeeded"
PushedInstanceDeletedEventType = eventTypePrefix + "instance.deleted"
) )
type PushedEvent struct { func PushedEventTypes() []PushedEventType {
return []PushedEventType{
PushedInstanceCreatedEventType,
PushedAuthenticationSucceededOnInstanceEventType,
PushedProjectCreatedEventType,
PushedApplicationCreatedEventType,
PushedAuthenticationSucceededOnApplicationEventType,
PushedInstanceDeletedEventType,
}
}
type PushedEvent interface {
eventstore.Command
IsMilestoneEvent()
}
type basePushedEvent struct {
eventstore.BaseEvent `json:"-"` eventstore.BaseEvent `json:"-"`
Milestone Milestone `json:"milestone"` PrimaryDomain string `json:"primaryDomain"`
Reached time.Time `json:"reached"` Endpoints []string `json:"endpoints"`
Endpoints []string `json:"endpoints"`
PrimaryDomain string `json:"primaryDomain"`
} }
func (e *PushedEvent) Data() interface{} { func (b *basePushedEvent) Data() interface{} {
return e return b
} }
func (e *PushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { func (b *basePushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
return nil return nil
} }
func NewPushedEvent( func (b *basePushedEvent) SetBaseEvent(base *eventstore.BaseEvent) {
b.BaseEvent = *base
}
func NewPushedEventByType(
ctx context.Context, ctx context.Context,
newAggregate *Aggregate, eventType PushedEventType,
milestone Milestone, aggregate *Aggregate,
reached time.Time,
endpoints []string, endpoints []string,
primaryDomain string, primaryDomain string,
) *PushedEvent { ) (PushedEvent, error) {
return &PushedEvent{ switch eventType {
BaseEvent: *eventstore.NewBaseEventForPush( case PushedInstanceCreatedEventType:
ctx, return NewInstanceCreatedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil
&newAggregate.Aggregate, case PushedAuthenticationSucceededOnInstanceEventType:
PushedEventType, return NewAuthenticationSucceededOnInstancePushedEvent(ctx, aggregate, endpoints, primaryDomain), nil
), case PushedProjectCreatedEventType:
Milestone: milestone, return NewProjectCreatedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil
Reached: reached, case PushedApplicationCreatedEventType:
Endpoints: endpoints, return NewApplicationCreatedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil
PrimaryDomain: primaryDomain, case PushedAuthenticationSucceededOnApplicationEventType:
return NewAuthenticationSucceededOnApplicationPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil
case PushedInstanceDeletedEventType:
return NewInstanceDeletedPushedEvent(ctx, aggregate, endpoints, primaryDomain), nil
}
return nil, fmt.Errorf("unknown event type %s", eventType)
}
type InstanceCreatedPushedEvent struct{ basePushedEvent }
func (e *InstanceCreatedPushedEvent) IsMilestoneEvent() {}
func NewInstanceCreatedPushedEvent(
ctx context.Context,
aggregate *Aggregate,
endpoints []string,
primaryDomain string,
) *InstanceCreatedPushedEvent {
return &InstanceCreatedPushedEvent{
basePushedEvent: basePushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate.Aggregate,
eventstore.EventType(PushedInstanceCreatedEventType),
),
Endpoints: endpoints,
PrimaryDomain: primaryDomain,
},
} }
} }
func PushedEventMapper(event *repository.Event) (eventstore.Event, error) { type AuthenticationSucceededOnInstancePushedEvent struct{ basePushedEvent }
e := &PushedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event), func (e *AuthenticationSucceededOnInstancePushedEvent) IsMilestoneEvent() {}
func NewAuthenticationSucceededOnInstancePushedEvent(
ctx context.Context,
aggregate *Aggregate,
endpoints []string,
primaryDomain string,
) *AuthenticationSucceededOnInstancePushedEvent {
return &AuthenticationSucceededOnInstancePushedEvent{
basePushedEvent: basePushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate.Aggregate,
eventstore.EventType(PushedAuthenticationSucceededOnInstanceEventType),
),
Endpoints: endpoints,
PrimaryDomain: primaryDomain,
},
}
}
type ProjectCreatedPushedEvent struct{ basePushedEvent }
func (e *ProjectCreatedPushedEvent) IsMilestoneEvent() {}
func NewProjectCreatedPushedEvent(
ctx context.Context,
aggregate *Aggregate,
endpoints []string,
primaryDomain string,
) *ProjectCreatedPushedEvent {
return &ProjectCreatedPushedEvent{
basePushedEvent: basePushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate.Aggregate,
eventstore.EventType(PushedProjectCreatedEventType),
),
Endpoints: endpoints,
PrimaryDomain: primaryDomain,
},
}
}
type ApplicationCreatedPushedEvent struct{ basePushedEvent }
func (e *ApplicationCreatedPushedEvent) IsMilestoneEvent() {}
func NewApplicationCreatedPushedEvent(
ctx context.Context,
aggregate *Aggregate,
endpoints []string,
primaryDomain string,
) *ApplicationCreatedPushedEvent {
return &ApplicationCreatedPushedEvent{
basePushedEvent: basePushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate.Aggregate,
eventstore.EventType(PushedApplicationCreatedEventType),
),
Endpoints: endpoints,
PrimaryDomain: primaryDomain,
},
}
}
type AuthenticationSucceededOnApplicationPushedEvent struct{ basePushedEvent }
func (e *AuthenticationSucceededOnApplicationPushedEvent) IsMilestoneEvent() {}
func NewAuthenticationSucceededOnApplicationPushedEvent(
ctx context.Context,
aggregate *Aggregate,
endpoints []string,
primaryDomain string,
) *AuthenticationSucceededOnApplicationPushedEvent {
return &AuthenticationSucceededOnApplicationPushedEvent{
basePushedEvent: basePushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate.Aggregate,
eventstore.EventType(PushedAuthenticationSucceededOnApplicationEventType),
),
Endpoints: endpoints,
PrimaryDomain: primaryDomain,
},
}
}
type InstanceDeletedPushedEvent struct{ basePushedEvent }
func (e *InstanceDeletedPushedEvent) IsMilestoneEvent() {}
func NewInstanceDeletedPushedEvent(
ctx context.Context,
aggregate *Aggregate,
endpoints []string,
primaryDomain string,
) *InstanceDeletedPushedEvent {
return &InstanceDeletedPushedEvent{
basePushedEvent: basePushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate.Aggregate,
eventstore.EventType(PushedInstanceDeletedEventType),
),
Endpoints: endpoints,
PrimaryDomain: primaryDomain,
},
} }
err := json.Unmarshal(event.Data, e)
if err != nil {
return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal milestone pushed")
}
return e, nil
} }

View File

@ -5,6 +5,10 @@ import (
) )
func RegisterEventMappers(es *eventstore.Eventstore) { func RegisterEventMappers(es *eventstore.Eventstore) {
es.RegisterFilterEventMapper(AggregateType, ReachedEventType, ReachedEventMapper). es.RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedProjectCreatedEventType), eventstore.GenericEventMapper[InstanceCreatedPushedEvent]).
RegisterFilterEventMapper(AggregateType, PushedEventType, PushedEventMapper) RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedAuthenticationSucceededOnInstanceEventType), eventstore.GenericEventMapper[AuthenticationSucceededOnInstancePushedEvent]).
RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedProjectCreatedEventType), eventstore.GenericEventMapper[ProjectCreatedPushedEvent]).
RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedApplicationCreatedEventType), eventstore.GenericEventMapper[ApplicationCreatedPushedEvent]).
RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedAuthenticationSucceededOnApplicationEventType), eventstore.GenericEventMapper[AuthenticationSucceededOnApplicationPushedEvent]).
RegisterFilterEventMapper(AggregateType, eventstore.EventType(PushedInstanceDeletedEventType), eventstore.GenericEventMapper[InstanceDeletedPushedEvent])
} }

View File

@ -1,30 +0,0 @@
// Code generated by "stringer -type=Milestone"; DO NOT EDIT.
package milestone
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[unknown-0]
_ = x[InstanceCreated-1]
_ = x[AuthenticationSucceededOnInstance-2]
_ = x[ProjectCreated-3]
_ = x[ApplicationCreated-4]
_ = x[AuthenticationSucceededOnApplication-5]
_ = x[InstanceDeleted-6]
_ = x[milestonesCount-7]
}
const _Milestone_name = "unknownInstanceCreatedAuthenticationSucceededOnInstanceProjectCreatedApplicationCreatedAuthenticationSucceededOnApplicationInstanceDeletedmilestonesCount"
var _Milestone_index = [...]uint8{0, 7, 22, 55, 69, 87, 123, 138, 153}
func (i Milestone) String() string {
if i < 0 || i >= Milestone(len(_Milestone_index)-1) {
return "Milestone(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Milestone_name[_Milestone_index[i]:_Milestone_index[i+1]]
}

View File

@ -1,25 +0,0 @@
//go:generate stringer -type=Milestone
package milestone
type Milestone int
const (
unknown Milestone = iota
InstanceCreated
AuthenticationSucceededOnInstance
ProjectCreated
ApplicationCreated
AuthenticationSucceededOnApplication
InstanceDeleted
milestonesCount
)
func All() []Milestone {
milestones := make([]Milestone, milestonesCount-1)
for i := 1; i < int(milestonesCount); i++ {
milestones[i] = Milestone(i)
}
return milestones
}

View File

@ -0,0 +1,5 @@
package pseudo
const (
AggregateType = "pseudo"
)

View File

@ -0,0 +1,65 @@
package pseudo
import (
"time"
"github.com/zitadel/zitadel/internal/eventstore"
)
const (
eventTypePrefix = eventstore.EventType("pseudo.")
TimestampEventType = eventTypePrefix + "timestamp"
)
var _ eventstore.Event = (*TimestampEvent)(nil)
type TimestampEvent struct {
Timestamp time.Time
InstanceIDs []string
}
func (t TimestampEvent) Aggregate() eventstore.Aggregate {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) EditorService() string {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) EditorUser() string {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) Type() eventstore.EventType {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) Sequence() uint64 {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) CreationDate() time.Time {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) PreviousAggregateSequence() uint64 {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) PreviousAggregateTypeSequence() uint64 {
panic("TimestampEvent is not a real event")
}
func (t TimestampEvent) DataAsBytes() []byte {
panic("TimestampEvent is not a real event")
}
func NewTimestampEvent(
timestamp time.Time,
instanceIDs ...string,
) *TimestampEvent {
return &TimestampEvent{
Timestamp: timestamp,
InstanceIDs: instanceIDs,
}
}