fix scheduled pseudo event projection

This commit is contained in:
Elio Bischof 2023-06-28 17:43:19 +02:00
parent ec8b587ba6
commit 2b8dac40de
No known key found for this signature in database
GPG Key ID: 7B383FDE4DDBF1BD
9 changed files with 42 additions and 29 deletions

View File

@ -40,7 +40,7 @@ LATEST=$(curl -i https://github.com/zitadel/zitadel/releases/latest | grep locat
## Run ZITADEL
```bash
ZITADEL_EXTERNALSECURE=false ZITADEL_ANALYTICS_PUSH_ALLENABLED=true zitadel start-from-init --masterkey "MasterkeyNeedsToHave32Characters" --tlsMode disabled
ZITADEL_EXTERNALSECURE=false ZITADEL_TELEMETRY_ENABLED=true zitadel start-from-init --masterkey "MasterkeyNeedsToHave32Characters" --tlsMode disabled
```
<DefaultUser components={props.components} />

View File

@ -45,12 +45,12 @@ Console:
InstanceManagementURL: "https://example.com/instances/{{.InstanceID}}"
Projections:
RequeueEvery: 1s
RequeueEvery: 20s
Customizations:
NotificationsQuotas:
# RequeueEvery: 1s
Telemetry:
RequeueEvery: 10s
RequeueEvery: 20s
DefaultInstance:
LoginPolicy:

View File

@ -9,7 +9,6 @@ import (
// MilestonePushed writes a new milestone.PushedEvent with a new milestone.Aggregate to the eventstore
func (c *Commands) MilestonePushed(
ctx context.Context,
instanceID string,
msType milestone.Type,
endpoints []string,
primaryDomain string,
@ -18,6 +17,6 @@ func (c *Commands) MilestonePushed(
if err != nil {
return err
}
_, err = c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, milestone.NewAggregate(id, instanceID, instanceID), msType, endpoints, primaryDomain))
_, err = c.eventstore.Push(ctx, milestone.NewPushedEvent(ctx, milestone.NewAggregate(ctx, id), msType, endpoints, primaryDomain))
return err
}

View File

@ -5,14 +5,13 @@ import (
"database/sql"
"fmt"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/repository/pseudo"
)
var (
@ -102,7 +101,7 @@ func NewStatementHandler(
func (h *StatementHandler) Start() {
h.initialized <- true
close(h.initialized)
if h.reduceScheduledPseudoEvent {
if !h.reduceScheduledPseudoEvent {
h.Subscribe(h.aggregates...)
}
}

View File

@ -102,7 +102,7 @@ func NewProjectionHandler(
go func() {
<-initialized
if h.reduceScheduledPseudoEvent {
if !h.reduceScheduledPseudoEvent {
go h.subscribe(ctx)
}
go h.schedule(ctx)
@ -177,9 +177,9 @@ func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string
return nil, false, err
}
if h.reduceScheduledPseudoEvent {
events[0] = pseudo.NewScheduledEvent(ctx, time.Now(), instances...)
events = []eventstore.Event{pseudo.NewScheduledEvent(ctx, time.Now(), events[0], instances...)}
}
return events, int(eventsLimit) == len(events), err
return events, int(eventsLimit) == len(events) && !h.reduceScheduledPseudoEvent, err
}
func (h *ProjectionHandler) subscribe(ctx context.Context) {

View File

@ -2,27 +2,25 @@ package handlers
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/repository/pseudo"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/notification/types"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/pseudo"
)
const (
@ -78,7 +76,16 @@ func (t *telemetryPusher) reducers() []handler.AggregateReducer {
}}
}
func printEvent(event eventstore.Event) {
bytes, err := json.MarshalIndent(event, "", " ")
if err != nil {
panic(err)
}
fmt.Println(event.Type(), string(bytes))
}
func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) {
printEvent(event)
ctx := call.WithTimestamp(context.Background())
scheduledEvent, ok := event.(*pseudo.ScheduledEvent)
if !ok {
@ -123,6 +130,7 @@ func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.State
}
func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.ScheduledEvent, ms *query.Milestone) error {
ctx = authz.WithInstanceID(ctx, ms.InstanceID)
for _, endpoint := range t.endpoints {
if err := types.SendJSON(
ctx,
@ -140,5 +148,5 @@ func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.Sched
return err
}
}
return t.commands.MilestonePushed(ctx, ms.InstanceID, ms.Type, t.endpoints, ms.PrimaryDomain)
return t.commands.MilestonePushed(ctx, ms.Type, t.endpoints, ms.PrimaryDomain)
}

View File

@ -1,7 +1,6 @@
package projection
import (
"bytes"
"context"
"encoding/json"
"fmt"
@ -147,6 +146,7 @@ func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Ev
func (p *milestoneProjection) milestoneReached(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) {
return func(event eventstore.Event) (*handler.Statement, error) {
printEvent(event)
if event.EditorUser() == "" || event.EditorService() == "" {
return crdb.NewNoOpStatement(event), nil
}
@ -188,9 +188,9 @@ func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*han
}
func printEvent(event eventstore.Event) {
var pretty bytes.Buffer
if err := json.Indent(&pretty, event.DataAsBytes(), "", " "); err != nil {
bytes, err := json.MarshalIndent(event, "", " ")
if err != nil {
panic(err)
}
fmt.Println(event.Type(), pretty.String())
fmt.Println(event.Type(), string(bytes))
}

View File

@ -1,6 +1,9 @@
package milestone
import (
"context"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
)
@ -13,13 +16,14 @@ type Aggregate struct {
eventstore.Aggregate
}
func NewAggregate(id, resourceOwner, instanceID string) *Aggregate {
func NewAggregate(ctx context.Context, id string) *Aggregate {
instanceID := authz.GetInstance(ctx).InstanceID()
return &Aggregate{
Aggregate: eventstore.Aggregate{
Type: AggregateType,
Version: AggregateVersion,
ID: id,
ResourceOwner: resourceOwner,
ResourceOwner: instanceID,
InstanceID: instanceID,
},
}

View File

@ -16,13 +16,15 @@ var _ eventstore.Event = (*ScheduledEvent)(nil)
type ScheduledEvent struct {
*eventstore.BaseEvent `json:"-"`
Timestamp time.Time `json:"-"`
InstanceIDs []string `json:"-"`
Timestamp time.Time `json:"timestamp"`
InstanceIDs []string `json:"instanceIDs"`
TriggeringEvent eventstore.Event `json:"triggeringEvent"`
}
func NewScheduledEvent(
ctx context.Context,
timestamp time.Time,
triggeringEvent eventstore.Event,
instanceIDs ...string,
) *ScheduledEvent {
return &ScheduledEvent{
@ -31,7 +33,8 @@ func NewScheduledEvent(
&NewAggregate().Aggregate,
ScheduledEventType,
),
Timestamp: timestamp,
InstanceIDs: instanceIDs,
Timestamp: timestamp,
InstanceIDs: instanceIDs,
TriggeringEvent: triggeringEvent,
}
}