From 2b8dac40de70c5ec647464046f045178ebac6b1c Mon Sep 17 00:00:00 2001 From: Elio Bischof Date: Wed, 28 Jun 2023 17:43:19 +0200 Subject: [PATCH] fix scheduled pseudo event projection --- docs/docs/self-hosting/deploy/macos.mdx | 2 +- e2e/config/localhost/zitadel.yaml | 4 ++-- internal/command/milestone.go | 3 +-- .../eventstore/handler/crdb/handler_stmt.go | 5 ++-- .../eventstore/handler/handler_projection.go | 6 ++--- .../notification/handlers/telemetry_pusher.go | 24 ++++++++++++------- internal/query/projection/milestones.go | 8 +++---- internal/repository/milestone/aggregate.go | 8 +++++-- internal/repository/pseudo/events.go | 11 +++++---- 9 files changed, 42 insertions(+), 29 deletions(-) diff --git a/docs/docs/self-hosting/deploy/macos.mdx b/docs/docs/self-hosting/deploy/macos.mdx index 6541577df9..e30bbacdbf 100644 --- a/docs/docs/self-hosting/deploy/macos.mdx +++ b/docs/docs/self-hosting/deploy/macos.mdx @@ -40,7 +40,7 @@ LATEST=$(curl -i https://github.com/zitadel/zitadel/releases/latest | grep locat ## Run ZITADEL ```bash -ZITADEL_EXTERNALSECURE=false ZITADEL_ANALYTICS_PUSH_ALLENABLED=true zitadel start-from-init --masterkey "MasterkeyNeedsToHave32Characters" --tlsMode disabled +ZITADEL_EXTERNALSECURE=false ZITADEL_TELEMETRY_ENABLED=true zitadel start-from-init --masterkey "MasterkeyNeedsToHave32Characters" --tlsMode disabled ``` diff --git a/e2e/config/localhost/zitadel.yaml b/e2e/config/localhost/zitadel.yaml index e838e081ca..2b72fb0d2d 100644 --- a/e2e/config/localhost/zitadel.yaml +++ b/e2e/config/localhost/zitadel.yaml @@ -45,12 +45,12 @@ Console: InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}" Projections: - RequeueEvery: 1s + RequeueEvery: 20s Customizations: NotificationsQuotas: # RequeueEvery: 1s Telemetry: - RequeueEvery: 10s + RequeueEvery: 20s DefaultInstance: LoginPolicy: diff --git a/internal/command/milestone.go b/internal/command/milestone.go index 83a1cdabc2..70f119459b 100644 --- a/internal/command/milestone.go +++ b/internal/command/milestone.go @@ -9,7 +9,6 @@ import ( // MilestonePushed writes a new milestone.PushedEvent with a new milestone.Aggregate to the eventstore func (c *Commands) MilestonePushed( ctx context.Context, - instanceID string, msType milestone.Type, endpoints []string, primaryDomain string, @@ -18,6 +17,6 @@ func (c *Commands) MilestonePushed( if err != nil { return err } - _, err = c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, milestone.NewAggregate(id, instanceID, instanceID), msType, endpoints, primaryDomain)) + _, err = c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, milestone.NewAggregate(ctx, id), msType, endpoints, primaryDomain)) return err } diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index bae2bd8f8a..c7c8c1c569 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -5,14 +5,13 @@ import ( "database/sql" "fmt" - "github.com/zitadel/zitadel/internal/repository/pseudo" - "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/repository/pseudo" ) var ( @@ -102,7 +101,7 @@ func NewStatementHandler( func (h *StatementHandler) Start() { h.initialized <- true close(h.initialized) - if h.reduceScheduledPseudoEvent { + if !h.reduceScheduledPseudoEvent { h.Subscribe(h.aggregates...) } } diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index 0462108642..d7aed5ef3b 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -102,7 +102,7 @@ func NewProjectionHandler( go func() { <-initialized - if h.reduceScheduledPseudoEvent { + if !h.reduceScheduledPseudoEvent { go h.subscribe(ctx) } go h.schedule(ctx) @@ -177,9 +177,9 @@ func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string return nil, false, err } if h.reduceScheduledPseudoEvent { - events[0] = pseudo.NewScheduledEvent(ctx, time.Now(), instances...) + events = []eventstore.Event{pseudo.NewScheduledEvent(ctx, time.Now(), events[0], instances...)} } - return events, int(eventsLimit) == len(events), err + return events, int(eventsLimit) == len(events) && !h.reduceScheduledPseudoEvent, err } func (h *ProjectionHandler) subscribe(ctx context.Context) { diff --git a/internal/notification/handlers/telemetry_pusher.go b/internal/notification/handlers/telemetry_pusher.go index 49b1b1e38b..cbbac02620 100644 --- a/internal/notification/handlers/telemetry_pusher.go +++ b/internal/notification/handlers/telemetry_pusher.go @@ -2,27 +2,25 @@ package handlers import ( "context" + "encoding/json" "fmt" "net/http" "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/query" - + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/call" - - "github.com/zitadel/zitadel/internal/repository/pseudo" - - "github.com/zitadel/zitadel/internal/errors" - "github.com/zitadel/zitadel/internal/command" + "github.com/zitadel/zitadel/internal/errors" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler" "github.com/zitadel/zitadel/internal/eventstore/handler/crdb" "github.com/zitadel/zitadel/internal/notification/channels/webhook" _ "github.com/zitadel/zitadel/internal/notification/statik" "github.com/zitadel/zitadel/internal/notification/types" + "github.com/zitadel/zitadel/internal/query" "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/repository/pseudo" ) const ( @@ -78,7 +76,16 @@ func (t *telemetryPusher) reducers() []handler.AggregateReducer { }} } +func printEvent(event eventstore.Event) { + bytes, err := json.MarshalIndent(event, "", " ") + if err != nil { + panic(err) + } + fmt.Println(event.Type(), string(bytes)) +} + func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) { + printEvent(event) ctx := call.WithTimestamp(context.Background()) scheduledEvent, ok := event.(*pseudo.ScheduledEvent) if !ok { @@ -123,6 +130,7 @@ func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.State } func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.ScheduledEvent, ms *query.Milestone) error { + ctx = authz.WithInstanceID(ctx, ms.InstanceID) for _, endpoint := range t.endpoints { if err := types.SendJSON( ctx, @@ -140,5 +148,5 @@ func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.Sched return err } } - return t.commands.MilestonePushed(ctx, ms.InstanceID, ms.Type, t.endpoints, ms.PrimaryDomain) + return t.commands.MilestonePushed(ctx, ms.Type, t.endpoints, ms.PrimaryDomain) } diff --git a/internal/query/projection/milestones.go b/internal/query/projection/milestones.go index 5223731ddd..f13f640f1e 100644 --- a/internal/query/projection/milestones.go +++ b/internal/query/projection/milestones.go @@ -1,7 +1,6 @@ package projection import ( - "bytes" "context" "encoding/json" "fmt" @@ -147,6 +146,7 @@ func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Ev func (p *milestoneProjection) milestoneReached(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) { return func(event eventstore.Event) (*handler.Statement, error) { + printEvent(event) if event.EditorUser() == "" || event.EditorService() == "" { return crdb.NewNoOpStatement(event), nil } @@ -188,9 +188,9 @@ func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*han } func printEvent(event eventstore.Event) { - var pretty bytes.Buffer - if err := json.Indent(&pretty, event.DataAsBytes(), "", " "); err != nil { + bytes, err := json.MarshalIndent(event, "", " ") + if err != nil { panic(err) } - fmt.Println(event.Type(), pretty.String()) + fmt.Println(event.Type(), string(bytes)) } diff --git a/internal/repository/milestone/aggregate.go b/internal/repository/milestone/aggregate.go index 2304faf8ef..bb9ca99cb3 100644 --- a/internal/repository/milestone/aggregate.go +++ b/internal/repository/milestone/aggregate.go @@ -1,6 +1,9 @@ package milestone import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/eventstore" ) @@ -13,13 +16,14 @@ type Aggregate struct { eventstore.Aggregate } -func NewAggregate(id, resourceOwner, instanceID string) *Aggregate { +func NewAggregate(ctx context.Context, id string) *Aggregate { + instanceID := authz.GetInstance(ctx).InstanceID() return &Aggregate{ Aggregate: eventstore.Aggregate{ Type: AggregateType, Version: AggregateVersion, ID: id, - ResourceOwner: resourceOwner, + ResourceOwner: instanceID, InstanceID: instanceID, }, } diff --git a/internal/repository/pseudo/events.go b/internal/repository/pseudo/events.go index c1e5900fa1..5a139d848e 100644 --- a/internal/repository/pseudo/events.go +++ b/internal/repository/pseudo/events.go @@ -16,13 +16,15 @@ var _ eventstore.Event = (*ScheduledEvent)(nil) type ScheduledEvent struct { *eventstore.BaseEvent `json:"-"` - Timestamp time.Time `json:"-"` - InstanceIDs []string `json:"-"` + Timestamp time.Time `json:"timestamp"` + InstanceIDs []string `json:"instanceIDs"` + TriggeringEvent eventstore.Event `json:"triggeringEvent"` } func NewScheduledEvent( ctx context.Context, timestamp time.Time, + triggeringEvent eventstore.Event, instanceIDs ...string, ) *ScheduledEvent { return &ScheduledEvent{ @@ -31,7 +33,8 @@ func NewScheduledEvent( &NewAggregate().Aggregate, ScheduledEventType, ), - Timestamp: timestamp, - InstanceIDs: instanceIDs, + Timestamp: timestamp, + InstanceIDs: instanceIDs, + TriggeringEvent: triggeringEvent, } }