Tim Möhlmann 3aba942162
feat: add debug events API (#8533)
# Which Problems Are Solved

Add a debug API which allows pushing a set of events to be reduced in a
dedicated projection.
The events can carry a sleep duration which simulates a slow query
during projection handling.

# How the Problems Are Solved

- `CreateDebugEvents` allows pushing multiple events which simulate the
lifecycle of a resource. Each event has a `projectionSleep` field, which
issues a `pg_sleep()` statement query in the projection handler :
  - Add
  - Change
  - Remove
- `ListDebugEventsStates` list the current state of the projection,
optionally with a Trigger
- `GetDebugEventsStateByID` get the current state of the aggregate ID in
the projection, optionally with a Trigger


# Additional Changes

- none

# Additional Context

-  Allows reproduction of https://github.com/zitadel/zitadel/issues/8517
2024-09-11 08:24:00 +00:00

126 lines
3.1 KiB
Go

package debug_events
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
const (
AddedEventType = eventTypePrefix + "added"
ChangedEventType = eventTypePrefix + "changed"
RemovedEventType = eventTypePrefix + "removed"
)
type AddedEvent struct {
eventstore.BaseEvent `json:"-"`
ProjectionSleep time.Duration `json:"projectionSleep,omitempty"`
Blob *string `json:"blob,omitempty"`
}
func (e *AddedEvent) Payload() interface{} {
return e
}
func (e *AddedEvent) UniqueConstraints() []*eventstore.UniqueConstraint {
return nil
}
func NewAddedEvent(ctx context.Context, aggregate *eventstore.Aggregate, projectionSleep time.Duration, blob *string) *AddedEvent {
return &AddedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
AddedEventType,
),
Blob: blob,
ProjectionSleep: projectionSleep,
}
}
func DebugAddedEventMapper(event eventstore.Event) (eventstore.Event, error) {
debugAdded := &AddedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
err := event.Unmarshal(debugAdded)
if err != nil {
return nil, zerrors.ThrowInternal(err, "ORG-Bren2", "unable to unmarshal debug added")
}
return debugAdded, nil
}
type ChangedEvent struct {
eventstore.BaseEvent `json:"-"`
ProjectionSleep time.Duration `json:"projectionSleep,omitempty"`
Blob *string `json:"blob,omitempty"`
}
func (e *ChangedEvent) Payload() interface{} {
return e
}
func (e *ChangedEvent) UniqueConstraints() []*eventstore.UniqueConstraint {
return nil
}
func NewChangedEvent(ctx context.Context, aggregate *eventstore.Aggregate, projectionSleep time.Duration, blob *string) *ChangedEvent {
return &ChangedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
ChangedEventType,
),
ProjectionSleep: projectionSleep,
Blob: blob,
}
}
func DebugChangedEventMapper(event eventstore.Event) (eventstore.Event, error) {
debugChanged := &ChangedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}
err := event.Unmarshal(debugChanged)
if err != nil {
return nil, zerrors.ThrowInternal(err, "ORG-Bren2", "unable to unmarshal debug added")
}
return debugChanged, nil
}
type RemovedEvent struct {
eventstore.BaseEvent `json:"-"`
ProjectionSleep time.Duration `json:"projectionSleep,omitempty"`
}
func (e *RemovedEvent) Payload() interface{} {
return nil
}
func (e *RemovedEvent) UniqueConstraints() []*eventstore.UniqueConstraint {
return nil
}
func NewRemovedEvent(ctx context.Context, aggregate *eventstore.Aggregate, projectionSleep time.Duration) *RemovedEvent {
return &RemovedEvent{
BaseEvent: *eventstore.NewBaseEventForPush(
ctx,
aggregate,
RemovedEventType,
),
ProjectionSleep: projectionSleep,
}
}
func DebugRemovedEventMapper(event eventstore.Event) (eventstore.Event, error) {
return &RemovedEvent{
BaseEvent: *eventstore.BaseEventFromRepo(event),
}, nil
}
func AggregateFromWriteModel(ctx context.Context, wm *eventstore.WriteModel) *eventstore.Aggregate {
return eventstore.AggregateFromWriteModelCtx(ctx, wm, AggregateType, AggregateVersion)
}