From 4d5a6bb28887f7ba4f78a4ad1ff5792f6059ded3 Mon Sep 17 00:00:00 2001 From: Elio Bischof Date: Mon, 19 Jun 2023 12:15:30 +0200 Subject: [PATCH] push milestones --- cmd/defaults.yaml | 4 +- cmd/start/config.go | 3 + cmd/start/start.go | 4 +- docs/docs/self-hosting/manage/production.md | 4 +- internal/command/quota_report.go | 4 +- internal/command/telemetry.go | 27 +++++ .../notification/handlers/already_handled.go | 5 +- .../notification/handlers/quotanotifier.go | 2 +- .../notification/handlers/telemetrypusher.go | 113 ++++++++++++++++++ .../notification/handlers/usernotifier.go | 67 ++++++++++- internal/notification/projections.go | 11 ++ internal/query/projection/projection.go | 1 + internal/repository/milestone/aggregate.go | 27 +++++ internal/repository/milestone/events.go | 102 ++++++++++++++++ internal/repository/milestone/eventstore.go | 10 ++ .../milestone/serializable_event.go | 15 +++ 16 files changed, 385 insertions(+), 14 deletions(-) create mode 100644 internal/command/telemetry.go create mode 100644 internal/notification/handlers/telemetrypusher.go create mode 100644 internal/repository/milestone/aggregate.go create mode 100644 internal/repository/milestone/events.go create mode 100644 internal/repository/milestone/eventstore.go create mode 100644 internal/repository/milestone/serializable_event.go diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index ddd8a44b72..bf8d093f3e 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -15,7 +15,9 @@ Tracing: MetricPrefix: zitadel Telemetry: - # As long as Enabled is true, usage data is marked to be sent to the configured Telemetry.Endpoints + # As long as Enabled is true, usage data that is marked as due is also tried to be sent to the configured Telemetry.Endpoints. + # Data is marked as due, even if Enabled is false. + # This means that switching this to true makes ZITADEL try to send past data that was marked to send. Enabled: false # Push telemetry data to all these endpoints at least once. # If one endpoint returns an unsuccessful response code or times out, diff --git a/cmd/start/config.go b/cmd/start/config.go index 3517ef7bb2..f7387cecda 100644 --- a/cmd/start/config.go +++ b/cmd/start/config.go @@ -3,6 +3,8 @@ package start import ( "time" + "github.com/zitadel/zitadel/internal/notification/handlers" + "github.com/mitchellh/mapstructure" "github.com/spf13/viper" "github.com/zitadel/logging" @@ -65,6 +67,7 @@ type Config struct { Eventstore *eventstore.Config LogStore *logstore.Configs Quotas *QuotasConfig + Telemetry *handlers.TelemetryPusherConfig } type QuotasConfig struct { diff --git a/cmd/start/start.go b/cmd/start/start.go index 75516fba4d..3b48ee659d 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -201,14 +201,14 @@ func startZitadel(config *Config, masterKey string, server chan<- *Server) error return err } - usageReporter := logstore.UsageReporterFunc(commands.ReportUsage) + usageReporter := logstore.UsageReporterFunc(commands.ReportQuotaUsage) actionsLogstoreSvc := logstore.New(queries, usageReporter, actionsExecutionDBEmitter, actionsExecutionStdoutEmitter) if actionsLogstoreSvc.Enabled() { logging.Warn("execution logs are currently in beta") } actions.SetLogstoreService(actionsLogstoreSvc) - notification.Start(ctx, config.Projections.Customizations["notifications"], config.Projections.Customizations["notificationsquotas"], config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS) + notification.Start(ctx, config.Projections.Customizations["notifications"], config.Projections.Customizations["notificationsquotas"], config.Projections.Customizations["telemetry"], *config.Telemetry, config.ExternalPort, config.ExternalSecure, commands, queries, eventstoreClient, assets.AssetAPIFromDomain(config.ExternalSecure, config.ExternalPort), config.SystemDefaults.Notifications.FileSystemPath, keys.User, keys.SMTP, keys.SMS) router := mux.NewRouter() tlsConfig, err := config.TLS.Config() diff --git a/docs/docs/self-hosting/manage/production.md b/docs/docs/self-hosting/manage/production.md index 284209200f..eb1ae27002 100644 --- a/docs/docs/self-hosting/manage/production.md +++ b/docs/docs/self-hosting/manage/production.md @@ -63,11 +63,11 @@ For example, systemd has journald and kubernetes has fluentd and fluentbit. ## Telemetry -If you want to have some usage data pushed to external systems, enable telemetry in the ZITADEL configuration. +If you want to have some data about reached usage milestones pushed to external systems, enable telemetry in the ZITADEL configuration. Include https://zitadel.com/usage in the list of endpoints if you want to help the ZITADEL maintainers improve the product's usability by giving them some insights. Don't forget to opt in by setting Telemetry.Enabled to true. -The following table describes the data points that are sent to the endpoints: +The following table describes the milestones that are sent to the endpoints: | Trigger | Description | |-----------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------| diff --git a/internal/command/quota_report.go b/internal/command/quota_report.go index b18b9e9b51..19855452ed 100644 --- a/internal/command/quota_report.go +++ b/internal/command/quota_report.go @@ -7,8 +7,8 @@ import ( "github.com/zitadel/zitadel/internal/repository/quota" ) -// ReportUsage calls notification hooks and emits the notified events -func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error { +// ReportQuotaUsage writes a slice of *quota.NotificationDueEvent directly to the eventstore +func (c *Commands) ReportQuotaUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error { cmds := make([]eventstore.Command, len(dueNotifications)) for idx, notification := range dueNotifications { cmds[idx] = notification diff --git a/internal/command/telemetry.go b/internal/command/telemetry.go new file mode 100644 index 0000000000..b8e22fb0a6 --- /dev/null +++ b/internal/command/telemetry.go @@ -0,0 +1,27 @@ +package command + +import ( + "context" + + "github.com/zitadel/zitadel/internal/repository/milestone" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +// ReportTelemetryUsage writes one or many *telemetry.PushDueEvent directly to the eventstore +func (c *Commands) ReportTelemetryUsage(ctx context.Context, dueEvent ...*milestone.ReachedEvent) error { + cmds := make([]eventstore.Command, len(dueEvent)) + for idx, notification := range dueEvent { + cmds[idx] = notification + } + _, err := c.eventstore.Push(ctx, cmds...) + return err +} + +func (c *Commands) TelemetryPushed(ctx context.Context, dueEvent *milestone.ReachedEvent, endpoints []string) error { + _, err := c.eventstore.Push( + ctx, + milestone.NewPushedEvent(ctx, dueEvent, endpoints), + ) + return err +} diff --git a/internal/notification/handlers/already_handled.go b/internal/notification/handlers/already_handled.go index abf7eb901b..556a3e33c9 100644 --- a/internal/notification/handlers/already_handled.go +++ b/internal/notification/handlers/already_handled.go @@ -4,16 +4,15 @@ import ( "context" "github.com/zitadel/zitadel/internal/eventstore" - "github.com/zitadel/zitadel/internal/repository/user" ) -func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) { +func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, aggregateType eventstore.AggregateType, eventTypes ...eventstore.EventType) (bool, error) { events, err := n.es.Filter( ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). InstanceID(event.Aggregate().InstanceID). AddQuery(). - AggregateTypes(user.AggregateType). + AggregateTypes(aggregateType). AggregateIDs(event.Aggregate().ID). SequenceGreater(event.Sequence()). EventTypes(eventTypes...). diff --git a/internal/notification/handlers/quotanotifier.go b/internal/notification/handlers/quotanotifier.go index e53998490a..21daa92ffc 100644 --- a/internal/notification/handlers/quotanotifier.go +++ b/internal/notification/handlers/quotanotifier.go @@ -68,7 +68,7 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler. return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType) } ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.AggregateType, quota.NotifiedEventType) if err != nil { return nil, err } diff --git a/internal/notification/handlers/telemetrypusher.go b/internal/notification/handlers/telemetrypusher.go new file mode 100644 index 0000000000..5e87158482 --- /dev/null +++ b/internal/notification/handlers/telemetrypusher.go @@ -0,0 +1,113 @@ +package handlers + +import ( + "context" + "net/http" + + "github.com/zitadel/zitadel/internal/repository/milestone" + + "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" + "github.com/zitadel/zitadel/internal/notification/channels/webhook" + _ "github.com/zitadel/zitadel/internal/notification/statik" + "github.com/zitadel/zitadel/internal/notification/types" + "github.com/zitadel/zitadel/internal/query/projection" +) + +const ( + TelemetryProjectionTable = "projections.telemetry" +) + +type TelemetryPusherConfig struct { + Enabled bool + Endpoints []string +} + +type telemetryPusher struct { + cfg TelemetryPusherConfig + crdb.StatementHandler + commands *command.Commands + queries *NotificationQueries + metricSuccessfulDeliveriesJSON string + metricFailedDeliveriesJSON string +} + +func NewTelemetryPusher( + ctx context.Context, + telemetryCfg TelemetryPusherConfig, + handlerCfg crdb.StatementHandlerConfig, + commands *command.Commands, + queries *NotificationQueries, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON string, +) *telemetryPusher { + p := new(telemetryPusher) + handlerCfg.ProjectionName = TelemetryProjectionTable + handlerCfg.Reducers = p.reducers() + p.cfg = telemetryCfg + p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg) + p.commands = commands + p.queries = queries + p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON + p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON + projection.TelemetryPusherProjection = p + return p +} + +func (t *telemetryPusher) reducers() []handler.AggregateReducer { + if !t.cfg.Enabled { + return nil + } + return []handler.AggregateReducer{ + { + Aggregate: milestone.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: milestone.ReachedEventType, + Reduce: t.reduceTelemetryPushDue, + }, + }, + }, + } +} + +func (t *telemetryPusher) reduceTelemetryPushDue(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*milestone.ReachedEvent) + if !ok { + return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-UjA3E", "reduce.wrong.event.type %s", milestone.ReachedEventType) + } + ctx := HandlerContext(event.Aggregate()) + alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, nil, milestone.AggregateType, milestone.PushedEventType) + if err != nil { + return nil, err + } + if alreadyHandled { + return crdb.NewNoOpStatement(e), nil + } + for _, endpoint := range t.cfg.Endpoints { + if err = types.SendJSON( + ctx, + webhook.Config{ + CallURL: endpoint, + Method: http.MethodPost, + }, + t.queries.GetFileSystemProvider, + t.queries.GetLogProvider, + e, + e, + t.metricSuccessfulDeliveriesJSON, + t.metricFailedDeliveriesJSON, + ).WithoutTemplate(); err != nil { + return nil, err + } + } + + err = t.commands.TelemetryPushed(ctx, e, t.cfg.Endpoints) + if err != nil { + return nil, err + } + return crdb.NewNoOpStatement(e), nil +} diff --git a/internal/notification/handlers/usernotifier.go b/internal/notification/handlers/usernotifier.go index 5cd8102361..fd912a4d44 100644 --- a/internal/notification/handlers/usernotifier.go +++ b/internal/notification/handlers/usernotifier.go @@ -2,8 +2,13 @@ package handlers import ( "context" + "fmt" "time" + "github.com/zitadel/zitadel/internal/repository/project" + + "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/domain" @@ -106,11 +111,67 @@ func (u *userNotifier) reducers() []handler.AggregateReducer { Event: user.HumanPasswordChangedType, Reduce: u.reducePasswordChanged, }, + { + Event: user.UserTokenAddedType, + Reduce: u.reduceUserTokenAdded, + }, + }, + }, + { + Aggregate: instance.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: instance.InstanceAddedEventType, + Reduce: u.reduceInstanceAdded, + }, + { + Event: instance.InstanceRemovedEventType, + Reduce: u.reduceInstanceRemoved, + }, + }, + }, + { + Aggregate: project.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: project.ProjectAddedType, + Reduce: u.reduceProjectAdded, + }, + { + Event: project.ApplicationAddedType, + Reduce: u.reduceApplicationAdded, + }, }, }, } } +func (u *userNotifier) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) { + fmt.Println("reduceInstanceAdded") + return crdb.NewNoOpStatement(event), nil +} + +func (u *userNotifier) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) { + // ignore instance.ProjectSetEventType + fmt.Println("reduceProjectAdded") + return crdb.NewNoOpStatement(event), nil +} + +func (u *userNotifier) reduceApplicationAdded(event eventstore.Event) (*handler.Statement, error) { + fmt.Println("reduceApplicationAdded") + return crdb.NewNoOpStatement(event), nil +} + +func (u *userNotifier) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) { + fmt.Println("reduceUserTokenAdded") + return crdb.NewNoOpStatement(event), nil +} + +func (u *userNotifier) reduceInstanceRemoved(event eventstore.Event) (*handler.Statement, error) { + fmt.Println("reduceInstanceRemoved") + return crdb.NewNoOpStatement(event), nil +} + func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) { e, ok := event.(*user.HumanInitialCodeAddedEvent) if !ok { @@ -334,7 +395,7 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType) } ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.UserDomainClaimedType, user.UserDomainClaimedSentType) if err != nil { return nil, err @@ -462,7 +523,7 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType) } ctx := HandlerContext(event.Aggregate()) - alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType) + alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.HumanPasswordChangeSentType) if err != nil { return nil, err } @@ -591,5 +652,5 @@ func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, e if event.CreationDate().Add(expiry).Before(time.Now().UTC()) { return true, nil } - return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...) + return u.queries.IsAlreadyHandled(ctx, event, data, user.AggregateType, eventTypes...) } diff --git a/internal/notification/projections.go b/internal/notification/projections.go index f4abf3b7bc..aad38e3afd 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -29,6 +29,8 @@ func Start( ctx context.Context, userHandlerCustomConfig projection.CustomConfig, quotaHandlerCustomConfig projection.CustomConfig, + telemetryHandlerCustomConfig projection.CustomConfig, + telemetryCfg handlers.TelemetryPusherConfig, externalPort uint16, externalSecure bool, commands *command.Commands, @@ -74,4 +76,13 @@ func Start( metricSuccessfulDeliveriesJSON, metricFailedDeliveriesJSON, ).Start() + handlers.NewTelemetryPusher( + ctx, + telemetryCfg, + projection.ApplyCustomConfig(telemetryHandlerCustomConfig), + commands, + q, + metricSuccessfulDeliveriesJSON, + metricFailedDeliveriesJSON, + ).Start() } diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 45670b8bac..7e09aacfca 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -64,6 +64,7 @@ var ( NotificationPolicyProjection *notificationPolicyProjection NotificationsProjection interface{} NotificationsQuotaProjection interface{} + TelemetryPusherProjection interface{} DeviceAuthProjection *deviceAuthProjection SessionProjection *sessionProjection ) diff --git a/internal/repository/milestone/aggregate.go b/internal/repository/milestone/aggregate.go new file mode 100644 index 0000000000..568ed78307 --- /dev/null +++ b/internal/repository/milestone/aggregate.go @@ -0,0 +1,27 @@ +package milestone + +import ( + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + AggregateType = "milestone" + AggregateVersion = "v1" +) + +type Aggregate struct { + eventstore.Aggregate +} + +// Each data point receives its own aggregate +func newAggregate(id, instanceId, resourceOwner string) *Aggregate { + return &Aggregate{ + Aggregate: eventstore.Aggregate{ + Type: AggregateType, + Version: AggregateVersion, + ID: id, + InstanceID: instanceId, + ResourceOwner: resourceOwner, + }, + } +} diff --git a/internal/repository/milestone/events.go b/internal/repository/milestone/events.go new file mode 100644 index 0000000000..f6018eae8a --- /dev/null +++ b/internal/repository/milestone/events.go @@ -0,0 +1,102 @@ +package milestone + +import ( + "context" + "encoding/json" + + "github.com/zitadel/zitadel/internal/errors" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/repository" +) + +const ( + eventTypePrefix = eventstore.EventType("milestone.") + ReachedEventType = eventTypePrefix + "reached" + PushedEventType = eventTypePrefix + "pushed" +) + +type ReachedEvent struct { + eventstore.BaseEvent `json:"-"` + MilestoneEvent SerializableEvent `json:"milestoneEvent"` +} + +func (n *ReachedEvent) Data() interface{} { + return n +} + +func (n *ReachedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil +} + +func NewReachedEvent( + ctx context.Context, + newAggregateID string, + milestoneEvent eventstore.BaseEvent, +) *ReachedEvent { + triggeringEventsAggregate := milestoneEvent.Aggregate() + return &ReachedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &newAggregate(newAggregateID, triggeringEventsAggregate.InstanceID, triggeringEventsAggregate.ResourceOwner).Aggregate, + ReachedEventType, + ), + MilestoneEvent: newSerializableEvent(milestoneEvent), + } +} + +func ReachedEventMapper(event *repository.Event) (eventstore.Event, error) { + e := &ReachedEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(event), + } + + err := json.Unmarshal(event.Data, e) + if err != nil { + return nil, errors.ThrowInternal(err, "QUOTA-k56rT", "unable to unmarshal milestone reached") + } + + return e, nil +} + +type PushedEvent struct { + eventstore.BaseEvent `json:"-"` + ReachedEventSequence uint64 `json:"reachedEventSequence"` + Endpoints []string `json:"endpoints"` +} + +func (e *PushedEvent) Data() interface{} { + return e +} + +func (e *PushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint { + return nil +} + +func NewPushedEvent( + ctx context.Context, + reachedEvent *ReachedEvent, + endpoints []string, +) *PushedEvent { + aggregate := reachedEvent.Aggregate() + return &PushedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + &aggregate, + PushedEventType, + ), + ReachedEventSequence: reachedEvent.Sequence(), + Endpoints: endpoints, + } +} + +func PushedEventMapper(event *repository.Event) (eventstore.Event, error) { + e := &PushedEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(event), + } + + err := json.Unmarshal(event.Data, e) + if err != nil { + return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal milestone pushed") + } + + return e, nil +} diff --git a/internal/repository/milestone/eventstore.go b/internal/repository/milestone/eventstore.go new file mode 100644 index 0000000000..a7d229638d --- /dev/null +++ b/internal/repository/milestone/eventstore.go @@ -0,0 +1,10 @@ +package milestone + +import ( + "github.com/zitadel/zitadel/internal/eventstore" +) + +func RegisterEventMappers(es *eventstore.Eventstore) { + es.RegisterFilterEventMapper(AggregateType, ReachedEventType, ReachedEventMapper). + RegisterFilterEventMapper(AggregateType, PushedEventType, PushedEventMapper) +} diff --git a/internal/repository/milestone/serializable_event.go b/internal/repository/milestone/serializable_event.go new file mode 100644 index 0000000000..535af16fbc --- /dev/null +++ b/internal/repository/milestone/serializable_event.go @@ -0,0 +1,15 @@ +package milestone + +import "github.com/zitadel/zitadel/internal/eventstore" + +type SerializableEvent struct { + eventstore.BaseEvent `json:",inline"` + Data []byte `json:"data"` +} + +func newSerializableEvent(triggeringEvent eventstore.BaseEvent) SerializableEvent { + return SerializableEvent{ + BaseEvent: triggeringEvent, + Data: triggeringEvent.DataAsBytes(), + } +}