push milestones

This commit is contained in:
Elio Bischof
2023-06-19 12:15:30 +02:00
parent 084c6fa3a7
commit 4d5a6bb288
16 changed files with 385 additions and 14 deletions

View File

@@ -7,8 +7,8 @@ import (
"github.com/zitadel/zitadel/internal/repository/quota"
)
// ReportUsage calls notification hooks and emits the notified events
func (c *Commands) ReportUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error {
// ReportQuotaUsage writes a slice of *quota.NotificationDueEvent directly to the eventstore
func (c *Commands) ReportQuotaUsage(ctx context.Context, dueNotifications []*quota.NotificationDueEvent) error {
cmds := make([]eventstore.Command, len(dueNotifications))
for idx, notification := range dueNotifications {
cmds[idx] = notification

View File

@@ -0,0 +1,27 @@
package command
import (
"context"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/eventstore"
)
// ReportTelemetryUsage writes one or many *telemetry.PushDueEvent directly to the eventstore
func (c *Commands) ReportTelemetryUsage(ctx context.Context, dueEvent ...*milestone.ReachedEvent) error {
cmds := make([]eventstore.Command, len(dueEvent))
for idx, notification := range dueEvent {
cmds[idx] = notification
}
_, err := c.eventstore.Push(ctx, cmds...)
return err
}
func (c *Commands) TelemetryPushed(ctx context.Context, dueEvent *milestone.ReachedEvent, endpoints []string) error {
_, err := c.eventstore.Push(
ctx,
milestone.NewPushedEvent(ctx, dueEvent, endpoints),
)
return err
}

View File

@@ -4,16 +4,15 @@ import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/user"
)
func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, aggregateType eventstore.AggregateType, eventTypes ...eventstore.EventType) (bool, error) {
events, err := n.es.Filter(
ctx,
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
InstanceID(event.Aggregate().InstanceID).
AddQuery().
AggregateTypes(user.AggregateType).
AggregateTypes(aggregateType).
AggregateIDs(event.Aggregate().ID).
SequenceGreater(event.Sequence()).
EventTypes(eventTypes...).

View File

@@ -68,7 +68,7 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler.
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType)
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.AggregateType, quota.NotifiedEventType)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,113 @@
package handlers
import (
"context"
"net/http"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/notification/types"
"github.com/zitadel/zitadel/internal/query/projection"
)
const (
TelemetryProjectionTable = "projections.telemetry"
)
type TelemetryPusherConfig struct {
Enabled bool
Endpoints []string
}
type telemetryPusher struct {
cfg TelemetryPusherConfig
crdb.StatementHandler
commands *command.Commands
queries *NotificationQueries
metricSuccessfulDeliveriesJSON string
metricFailedDeliveriesJSON string
}
func NewTelemetryPusher(
ctx context.Context,
telemetryCfg TelemetryPusherConfig,
handlerCfg crdb.StatementHandlerConfig,
commands *command.Commands,
queries *NotificationQueries,
metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON string,
) *telemetryPusher {
p := new(telemetryPusher)
handlerCfg.ProjectionName = TelemetryProjectionTable
handlerCfg.Reducers = p.reducers()
p.cfg = telemetryCfg
p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg)
p.commands = commands
p.queries = queries
p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON
p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON
projection.TelemetryPusherProjection = p
return p
}
func (t *telemetryPusher) reducers() []handler.AggregateReducer {
if !t.cfg.Enabled {
return nil
}
return []handler.AggregateReducer{
{
Aggregate: milestone.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: milestone.ReachedEventType,
Reduce: t.reduceTelemetryPushDue,
},
},
},
}
}
func (t *telemetryPusher) reduceTelemetryPushDue(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*milestone.ReachedEvent)
if !ok {
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-UjA3E", "reduce.wrong.event.type %s", milestone.ReachedEventType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, nil, milestone.AggregateType, milestone.PushedEventType)
if err != nil {
return nil, err
}
if alreadyHandled {
return crdb.NewNoOpStatement(e), nil
}
for _, endpoint := range t.cfg.Endpoints {
if err = types.SendJSON(
ctx,
webhook.Config{
CallURL: endpoint,
Method: http.MethodPost,
},
t.queries.GetFileSystemProvider,
t.queries.GetLogProvider,
e,
e,
t.metricSuccessfulDeliveriesJSON,
t.metricFailedDeliveriesJSON,
).WithoutTemplate(); err != nil {
return nil, err
}
}
err = t.commands.TelemetryPushed(ctx, e, t.cfg.Endpoints)
if err != nil {
return nil, err
}
return crdb.NewNoOpStatement(e), nil
}

View File

@@ -2,8 +2,13 @@ package handlers
import (
"context"
"fmt"
"time"
"github.com/zitadel/zitadel/internal/repository/project"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/domain"
@@ -106,11 +111,67 @@ func (u *userNotifier) reducers() []handler.AggregateReducer {
Event: user.HumanPasswordChangedType,
Reduce: u.reducePasswordChanged,
},
{
Event: user.UserTokenAddedType,
Reduce: u.reduceUserTokenAdded,
},
},
},
{
Aggregate: instance.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: instance.InstanceAddedEventType,
Reduce: u.reduceInstanceAdded,
},
{
Event: instance.InstanceRemovedEventType,
Reduce: u.reduceInstanceRemoved,
},
},
},
{
Aggregate: project.AggregateType,
EventRedusers: []handler.EventReducer{
{
Event: project.ProjectAddedType,
Reduce: u.reduceProjectAdded,
},
{
Event: project.ApplicationAddedType,
Reduce: u.reduceApplicationAdded,
},
},
},
}
}
func (u *userNotifier) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) {
fmt.Println("reduceInstanceAdded")
return crdb.NewNoOpStatement(event), nil
}
func (u *userNotifier) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) {
// ignore instance.ProjectSetEventType
fmt.Println("reduceProjectAdded")
return crdb.NewNoOpStatement(event), nil
}
func (u *userNotifier) reduceApplicationAdded(event eventstore.Event) (*handler.Statement, error) {
fmt.Println("reduceApplicationAdded")
return crdb.NewNoOpStatement(event), nil
}
func (u *userNotifier) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) {
fmt.Println("reduceUserTokenAdded")
return crdb.NewNoOpStatement(event), nil
}
func (u *userNotifier) reduceInstanceRemoved(event eventstore.Event) (*handler.Statement, error) {
fmt.Println("reduceInstanceRemoved")
return crdb.NewNoOpStatement(event), nil
}
func (u *userNotifier) reduceInitCodeAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*user.HumanInitialCodeAddedEvent)
if !ok {
@@ -334,7 +395,7 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType,
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
if err != nil {
return nil, err
@@ -462,7 +523,7 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
}
ctx := HandlerContext(event.Aggregate())
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.HumanPasswordChangeSentType)
if err != nil {
return nil, err
}
@@ -591,5 +652,5 @@ func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, e
if event.CreationDate().Add(expiry).Before(time.Now().UTC()) {
return true, nil
}
return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...)
return u.queries.IsAlreadyHandled(ctx, event, data, user.AggregateType, eventTypes...)
}

View File

@@ -29,6 +29,8 @@ func Start(
ctx context.Context,
userHandlerCustomConfig projection.CustomConfig,
quotaHandlerCustomConfig projection.CustomConfig,
telemetryHandlerCustomConfig projection.CustomConfig,
telemetryCfg handlers.TelemetryPusherConfig,
externalPort uint16,
externalSecure bool,
commands *command.Commands,
@@ -74,4 +76,13 @@ func Start(
metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON,
).Start()
handlers.NewTelemetryPusher(
ctx,
telemetryCfg,
projection.ApplyCustomConfig(telemetryHandlerCustomConfig),
commands,
q,
metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON,
).Start()
}

View File

@@ -64,6 +64,7 @@ var (
NotificationPolicyProjection *notificationPolicyProjection
NotificationsProjection interface{}
NotificationsQuotaProjection interface{}
TelemetryPusherProjection interface{}
DeviceAuthProjection *deviceAuthProjection
SessionProjection *sessionProjection
)

View File

@@ -0,0 +1,27 @@
package milestone
import (
"github.com/zitadel/zitadel/internal/eventstore"
)
const (
AggregateType = "milestone"
AggregateVersion = "v1"
)
type Aggregate struct {
eventstore.Aggregate
}
// Each data point receives its own aggregate
func newAggregate(id, instanceId, resourceOwner string) *Aggregate {
return &Aggregate{
Aggregate: eventstore.Aggregate{
Type: AggregateType,
Version: AggregateVersion,
ID: id,
InstanceID: instanceId,
ResourceOwner: resourceOwner,
},
}
}

View File

@@ -0,0 +1,102 @@
package milestone
import (
"context"
"encoding/json"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
const (
eventTypePrefix = eventstore.EventType("milestone.")
ReachedEventType = eventTypePrefix + "reached"
PushedEventType = eventTypePrefix + "pushed"
)
type ReachedEvent struct {
eventstore.BaseEvent `json:"-"`
MilestoneEvent SerializableEvent `json:"milestoneEvent"`
}
func (n *ReachedEvent) Data() interface{} {
return n
}
func (n *ReachedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
return nil
}
func NewReachedEvent(
ctx context.Context,
newAggregateID string,
milestoneEvent eventstore.BaseEvent,
) *ReachedEvent {
triggeringEventsAggregate := milestoneEvent.Aggregate()
return &ReachedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&newAggregate(newAggregateID, triggeringEventsAggregate.InstanceID, triggeringEventsAggregate.ResourceOwner).Aggregate,
ReachedEventType,
),
MilestoneEvent: newSerializableEvent(milestoneEvent),
}
}
func ReachedEventMapper(event *repository.Event) (eventstore.Event, error) {
e := &ReachedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
err := json.Unmarshal(event.Data, e)
if err != nil {
return nil, errors.ThrowInternal(err, "QUOTA-k56rT", "unable to unmarshal milestone reached")
}
return e, nil
}
type PushedEvent struct {
eventstore.BaseEvent `json:"-"`
ReachedEventSequence uint64 `json:"reachedEventSequence"`
Endpoints []string `json:"endpoints"`
}
func (e *PushedEvent) Data() interface{} {
return e
}
func (e *PushedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
return nil
}
func NewPushedEvent(
ctx context.Context,
reachedEvent *ReachedEvent,
endpoints []string,
) *PushedEvent {
aggregate := reachedEvent.Aggregate()
return &PushedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
&aggregate,
PushedEventType,
),
ReachedEventSequence: reachedEvent.Sequence(),
Endpoints: endpoints,
}
}
func PushedEventMapper(event *repository.Event) (eventstore.Event, error) {
e := &PushedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
err := json.Unmarshal(event.Data, e)
if err != nil {
return nil, errors.ThrowInternal(err, "QUOTA-4n8vs", "unable to unmarshal milestone pushed")
}
return e, nil
}

View File

@@ -0,0 +1,10 @@
package milestone
import (
"github.com/zitadel/zitadel/internal/eventstore"
)
func RegisterEventMappers(es *eventstore.Eventstore) {
es.RegisterFilterEventMapper(AggregateType, ReachedEventType, ReachedEventMapper).
RegisterFilterEventMapper(AggregateType, PushedEventType, PushedEventMapper)
}

View File

@@ -0,0 +1,15 @@
package milestone
import "github.com/zitadel/zitadel/internal/eventstore"
type SerializableEvent struct {
eventstore.BaseEvent `json:",inline"`
Data []byte `json:"data"`
}
func newSerializableEvent(triggeringEvent eventstore.BaseEvent) SerializableEvent {
return SerializableEvent{
BaseEvent: triggeringEvent,
Data: triggeringEvent.DataAsBytes(),
}
}