mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-10 10:03:59 +00:00
test projection
This commit is contained in:
parent
69fe35411d
commit
5f7e3b2968
14
internal/query/projection/assert.go
Normal file
14
internal/query/projection/assert.go
Normal file
@ -0,0 +1,14 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"github.com/zitadel/zitadel/internal/errors"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
)
|
||||
|
||||
func assertEvent[T eventstore.Event](event eventstore.Event) (T, error) {
|
||||
e, ok := event.(T)
|
||||
if !ok {
|
||||
return e, errors.ThrowInvalidArgumentf(nil, "HANDL-1m9fS", "reduce.wrong.event.type %T", event)
|
||||
}
|
||||
return e, nil
|
||||
}
|
51
internal/query/projection/assert_test.go
Normal file
51
internal/query/projection/assert_test.go
Normal file
@ -0,0 +1,51 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/repository/instance"
|
||||
)
|
||||
|
||||
func Test_assertEvent(t *testing.T) {
|
||||
type args struct {
|
||||
event eventstore.Event
|
||||
assertFunc func(eventstore.Event) (eventstore.Event, error)
|
||||
}
|
||||
type testCase struct {
|
||||
name string
|
||||
args args
|
||||
wantErr assert.ErrorAssertionFunc
|
||||
}
|
||||
tests := []testCase{
|
||||
{
|
||||
name: "correct event type",
|
||||
args: args{
|
||||
event: instance.NewInstanceAddedEvent(context.Background(), &instance.NewAggregate("instance-id").Aggregate, "instance-name"),
|
||||
assertFunc: func(event eventstore.Event) (eventstore.Event, error) {
|
||||
return assertEvent[*instance.InstanceAddedEvent](event)
|
||||
},
|
||||
},
|
||||
wantErr: assert.NoError,
|
||||
}, {
|
||||
name: "wrong event type",
|
||||
args: args{
|
||||
event: instance.NewInstanceRemovedEvent(context.Background(), &instance.NewAggregate("instance-id").Aggregate, "instance-name", nil),
|
||||
assertFunc: func(event eventstore.Event) (eventstore.Event, error) {
|
||||
return assertEvent[*instance.InstanceAddedEvent](event)
|
||||
},
|
||||
},
|
||||
wantErr: assert.Error,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, err := tt.args.assertFunc(tt.args.event)
|
||||
if !tt.wantErr(t, err) {
|
||||
return
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -14,12 +14,26 @@ func testEvent(
|
||||
eventType repository.EventType,
|
||||
aggregateType repository.AggregateType,
|
||||
data []byte,
|
||||
) *repository.Event {
|
||||
return timedTestEvent(eventType, aggregateType, data, time.Now())
|
||||
}
|
||||
|
||||
func toSystemEvent(event *repository.Event) *repository.Event {
|
||||
event.EditorService = "SYSTEM"
|
||||
return event
|
||||
}
|
||||
|
||||
func timedTestEvent(
|
||||
eventType repository.EventType,
|
||||
aggregateType repository.AggregateType,
|
||||
data []byte,
|
||||
creationDate time.Time,
|
||||
) *repository.Event {
|
||||
return &repository.Event{
|
||||
Sequence: 15,
|
||||
PreviousAggregateSequence: 10,
|
||||
PreviousAggregateTypeSequence: 10,
|
||||
CreationDate: time.Now(),
|
||||
CreationDate: creationDate,
|
||||
Type: eventType,
|
||||
AggregateType: aggregateType,
|
||||
Data: data,
|
||||
|
@ -3,7 +3,6 @@ package projection
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/errors"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/handler"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
|
||||
@ -63,7 +62,7 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer {
|
||||
},
|
||||
{
|
||||
Event: instance.InstanceRemovedEventType,
|
||||
Reduce: p.reduceReachedFunc(milestone.InstanceDeleted),
|
||||
Reduce: p.reduceInstanceRemoved,
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -72,11 +71,11 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer {
|
||||
EventRedusers: []handler.EventReducer{
|
||||
{
|
||||
Event: project.ProjectAddedType,
|
||||
Reduce: p.reduceReachedIfUserEventFunc(milestone.ProjectCreated),
|
||||
Reduce: p.reduceProjectAdded,
|
||||
},
|
||||
{
|
||||
Event: project.ApplicationAddedType,
|
||||
Reduce: p.reduceReachedIfUserEventFunc(milestone.ApplicationCreated),
|
||||
Reduce: p.reduceApplicationAdded,
|
||||
},
|
||||
{
|
||||
Event: project.OIDCConfigAddedType,
|
||||
@ -104,64 +103,17 @@ func (p *milestoneProjection) reducers() []handler.AggregateReducer {
|
||||
EventRedusers: []handler.EventReducer{
|
||||
{
|
||||
Event: milestone.PushedEventType,
|
||||
Reduce: p.reducePushed,
|
||||
Reduce: p.reduceMilestonePushed,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceReachedIfUserEventFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) {
|
||||
return func(event eventstore.Event) (*handler.Statement, error) {
|
||||
if p.isSystemEvent(event) {
|
||||
return crdb.NewNoOpStatement(event), nil
|
||||
}
|
||||
return p.reduceReachedFunc(msType)(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceReachedFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) {
|
||||
return func(event eventstore.Event) (*handler.Statement, error) {
|
||||
return crdb.NewUpdateStatement(event, []handler.Column{
|
||||
handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
handler.NewCond(MilestoneColumnType, msType),
|
||||
crdb.NewIsNullCond(MilestoneColumnReachedDate),
|
||||
}), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reducePushed(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, ok := event.(*milestone.PushedEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-XJGXK", "reduce.wrong.event.type %s", milestone.PushedEventType)
|
||||
}
|
||||
if e.MilestoneType != milestone.InstanceDeleted {
|
||||
return crdb.NewUpdateStatement(
|
||||
event,
|
||||
[]handler.Column{
|
||||
handler.NewCol(MilestoneColumnPushedDate, event.CreationDate()),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
handler.NewCond(MilestoneColumnType, e.MilestoneType),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
return crdb.NewDeleteStatement(
|
||||
event,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, ok := event.(*instance.InstanceAddedEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-JbHGS", "reduce.wrong.event.type %s", instance.InstanceAddedEventType)
|
||||
e, err := assertEvent[*instance.InstanceAddedEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
allTypes := milestone.AllTypes()
|
||||
statements := make([]func(eventstore.Event) crdb.Exec, 0, len(allTypes))
|
||||
@ -179,9 +131,9 @@ func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*hand
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, ok := event.(*instance.DomainPrimarySetEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Sfrgf", "reduce.wrong.event.type %s", instance.InstanceDomainPrimarySetEventType)
|
||||
e, err := assertEvent[*instance.DomainPrimarySetEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
e,
|
||||
@ -195,43 +147,43 @@ func (p *milestoneProjection) reduceInstanceDomainPrimarySet(event eventstore.Ev
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceProjectAdded(event eventstore.Event) (*handler.Statement, error) {
|
||||
if _, err := assertEvent[*project.ProjectAddedEvent](event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.reduceReachedIfUserEventFunc(milestone.ProjectCreated)(event)
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceApplicationAdded(event eventstore.Event) (*handler.Statement, error) {
|
||||
if _, err := assertEvent[*project.ApplicationAddedEvent](event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.reduceReachedIfUserEventFunc(milestone.ApplicationCreated)(event)
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceOIDCConfigAdded(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, ok := event.(*project.OIDCConfigAddedEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Rw7Cv", "reduce.wrong.event.type %s", project.OIDCConfigAddedType)
|
||||
e, err := assertEvent[*project.OIDCConfigAddedEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.reduceAppConfigAdded(e, e.ClientID)
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceAPIConfigAdded(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, ok := event.(*project.APIConfigAddedEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-QudD2", "reduce.wrong.event.type %s", project.APIConfigAddedType)
|
||||
e, err := assertEvent[*project.APIConfigAddedEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.reduceAppConfigAdded(e, e.ClientID)
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceAppConfigAdded(event eventstore.Event, clientID string) (*handler.Statement, error) {
|
||||
if !p.isSystemEvent(event) {
|
||||
return crdb.NewNoOpStatement(event), nil
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
event,
|
||||
[]handler.Column{
|
||||
crdb.NewArrayAppendCol(MilestoneColumnIgnoreClientIDs, clientID),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication),
|
||||
crdb.NewIsNullCond(MilestoneColumnReachedDate),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, ok := event.(*user.UserTokenAddedEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-3xhJ7", "reduce.wrong.event.type %s", user.UserTokenAddedType)
|
||||
e, err := assertEvent[*user.UserTokenAddedEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if p.isSystemEvent(event) {
|
||||
return crdb.NewNoOpStatement(event), nil
|
||||
}
|
||||
statements := []func(eventstore.Event) crdb.Exec{
|
||||
crdb.AddUpdateStatement(
|
||||
@ -262,6 +214,77 @@ func (p *milestoneProjection) reduceUserTokenAdded(event eventstore.Event) (*han
|
||||
return crdb.NewMultiStatement(e, statements...), nil
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceInstanceRemoved(event eventstore.Event) (*handler.Statement, error) {
|
||||
if _, err := assertEvent[*instance.InstanceRemovedEvent](event); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p.reduceReachedFunc(milestone.InstanceDeleted)(event)
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceMilestonePushed(event eventstore.Event) (*handler.Statement, error) {
|
||||
e, err := assertEvent[*milestone.PushedEvent](event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if e.MilestoneType != milestone.InstanceDeleted {
|
||||
return crdb.NewUpdateStatement(
|
||||
event,
|
||||
[]handler.Column{
|
||||
handler.NewCol(MilestoneColumnPushedDate, event.CreationDate()),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
handler.NewCond(MilestoneColumnType, e.MilestoneType),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
return crdb.NewDeleteStatement(
|
||||
event,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceReachedIfUserEventFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) {
|
||||
return func(event eventstore.Event) (*handler.Statement, error) {
|
||||
if p.isSystemEvent(event) {
|
||||
return crdb.NewNoOpStatement(event), nil
|
||||
}
|
||||
return p.reduceReachedFunc(msType)(event)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceReachedFunc(msType milestone.Type) func(event eventstore.Event) (*handler.Statement, error) {
|
||||
return func(event eventstore.Event) (*handler.Statement, error) {
|
||||
return crdb.NewUpdateStatement(event, []handler.Column{
|
||||
handler.NewCol(MilestoneColumnReachedDate, event.CreationDate()),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
handler.NewCond(MilestoneColumnType, msType),
|
||||
crdb.NewIsNullCond(MilestoneColumnReachedDate),
|
||||
}), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) reduceAppConfigAdded(event eventstore.Event, clientID string) (*handler.Statement, error) {
|
||||
if !p.isSystemEvent(event) {
|
||||
return crdb.NewNoOpStatement(event), nil
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
event,
|
||||
[]handler.Column{
|
||||
crdb.NewArrayAppendCol(MilestoneColumnIgnoreClientIDs, clientID),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(MilestoneColumnInstanceID, event.Aggregate().InstanceID),
|
||||
handler.NewCond(MilestoneColumnType, milestone.AuthenticationSucceededOnApplication),
|
||||
crdb.NewIsNullCond(MilestoneColumnReachedDate),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *milestoneProjection) isSystemEvent(event eventstore.Event) bool {
|
||||
return event.EditorUser() == "" || event.EditorService() == "" || event.EditorService() == "SYSTEM"
|
||||
}
|
||||
|
423
internal/query/projection/milestones_test.go
Normal file
423
internal/query/projection/milestones_test.go
Normal file
@ -0,0 +1,423 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/errors"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/handler"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/repository"
|
||||
"github.com/zitadel/zitadel/internal/repository/instance"
|
||||
"github.com/zitadel/zitadel/internal/repository/milestone"
|
||||
"github.com/zitadel/zitadel/internal/repository/project"
|
||||
"github.com/zitadel/zitadel/internal/repository/user"
|
||||
)
|
||||
|
||||
func TestMilestonesProjection_reduces(t *testing.T) {
|
||||
type args struct {
|
||||
event func(t *testing.T) eventstore.Event
|
||||
}
|
||||
now := time.Now()
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
reduce func(event eventstore.Event) (*handler.Statement, error)
|
||||
want wantReduce
|
||||
}{
|
||||
{
|
||||
name: "reduceInstanceAdded",
|
||||
args: args{
|
||||
event: getEvent(timedTestEvent(
|
||||
repository.EventType(instance.InstanceAddedEventType),
|
||||
instance.AggregateType,
|
||||
[]byte(`{}`),
|
||||
now,
|
||||
), instance.InstanceAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceInstanceAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("instance"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "INSERT INTO projections.milestones (instance_id, type, reached_date) VALUES ($1, $2, $3)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
milestone.InstanceCreated,
|
||||
now,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
milestone.AuthenticationSucceededOnInstance,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
milestone.ProjectCreated,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
milestone.ApplicationCreated,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
milestone.AuthenticationSucceededOnApplication,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO projections.milestones (instance_id, type) VALUES ($1, $2)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
milestone.InstanceDeleted,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceInstancePrimaryDomainSet",
|
||||
args: args{
|
||||
event: getEvent(testEvent(
|
||||
repository.EventType(instance.InstanceDomainPrimarySetEventType),
|
||||
instance.AggregateType,
|
||||
[]byte(`{"domain": "my.domain"}`),
|
||||
), instance.DomainPrimarySetEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceInstanceDomainPrimarySet,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("instance"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET primary_domain = $1 WHERE (instance_id = $2) AND (last_pushed_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
"my.domain",
|
||||
"instance-id",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceProjectAdded",
|
||||
args: args{
|
||||
event: getEvent(timedTestEvent(
|
||||
repository.EventType(project.ProjectAddedType),
|
||||
project.AggregateType,
|
||||
[]byte(`{}`),
|
||||
now,
|
||||
), project.ProjectAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceProjectAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("project"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
now,
|
||||
"instance-id",
|
||||
milestone.ProjectCreated,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceApplicationAdded",
|
||||
args: args{
|
||||
event: getEvent(timedTestEvent(
|
||||
repository.EventType(project.ApplicationAddedType),
|
||||
project.AggregateType,
|
||||
[]byte(`{}`),
|
||||
now,
|
||||
), project.ApplicationAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceApplicationAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("project"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
now,
|
||||
"instance-id",
|
||||
milestone.ApplicationCreated,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceOIDCConfigAdded user event",
|
||||
args: args{
|
||||
event: getEvent(testEvent(
|
||||
repository.EventType(project.OIDCConfigAddedType),
|
||||
project.AggregateType,
|
||||
[]byte(`{}`),
|
||||
), project.OIDCConfigAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceOIDCConfigAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("project"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceOIDCConfigAdded system event",
|
||||
args: args{
|
||||
event: getEvent(toSystemEvent(testEvent(
|
||||
repository.EventType(project.OIDCConfigAddedType),
|
||||
project.AggregateType,
|
||||
[]byte(`{"clientId": "client-id"}`),
|
||||
)), project.OIDCConfigAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceOIDCConfigAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("project"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET ignore_client_ids = array_append(ignore_client_ids, $1) WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
"client-id",
|
||||
"instance-id",
|
||||
milestone.AuthenticationSucceededOnApplication,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceAPIConfigAdded user event",
|
||||
args: args{
|
||||
event: getEvent(testEvent(
|
||||
repository.EventType(project.APIConfigAddedType),
|
||||
project.AggregateType,
|
||||
[]byte(`{}`),
|
||||
), project.APIConfigAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceAPIConfigAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("project"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceAPIConfigAdded system event",
|
||||
args: args{
|
||||
event: getEvent(toSystemEvent(testEvent(
|
||||
repository.EventType(project.APIConfigAddedType),
|
||||
project.AggregateType,
|
||||
[]byte(`{"clientId": "client-id"}`),
|
||||
)), project.APIConfigAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceAPIConfigAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("project"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET ignore_client_ids = array_append(ignore_client_ids, $1) WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
"client-id",
|
||||
"instance-id",
|
||||
milestone.AuthenticationSucceededOnApplication,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceUserTokenAdded system event",
|
||||
args: args{
|
||||
event: getEvent(toSystemEvent(timedTestEvent(
|
||||
repository.EventType(user.UserTokenAddedType),
|
||||
user.AggregateType,
|
||||
[]byte(`{"applicationId": "client-id"}`),
|
||||
now,
|
||||
)), user.UserTokenAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceUserTokenAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("user"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceUserTokenAdded user event",
|
||||
args: args{
|
||||
event: getEvent(timedTestEvent(
|
||||
repository.EventType(user.UserTokenAddedType),
|
||||
user.AggregateType,
|
||||
[]byte(`{"applicationId": "client-id"}`),
|
||||
now,
|
||||
), user.UserTokenAddedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceUserTokenAdded,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("user"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
// TODO: This can be optimized to only use one statement with OR
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
now,
|
||||
"instance-id",
|
||||
milestone.AuthenticationSucceededOnInstance,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (NOT (ignore_client_ids @> $4)) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
now,
|
||||
"instance-id",
|
||||
milestone.AuthenticationSucceededOnApplication,
|
||||
database.StringArray{"client-id"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceInstanceRemoved",
|
||||
args: args{
|
||||
event: getEvent(timedTestEvent(
|
||||
repository.EventType(instance.InstanceRemovedEventType),
|
||||
instance.AggregateType,
|
||||
[]byte(`{}`),
|
||||
now,
|
||||
), instance.InstanceRemovedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceInstanceRemoved,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("instance"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET reached_date = $1 WHERE (instance_id = $2) AND (type = $3) AND (reached_date IS NULL)",
|
||||
expectedArgs: []interface{}{
|
||||
now,
|
||||
"instance-id",
|
||||
milestone.InstanceDeleted,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceMilestonePushed normal milestone",
|
||||
args: args{
|
||||
event: getEvent(timedTestEvent(
|
||||
repository.EventType(milestone.PushedEventType),
|
||||
milestone.AggregateType,
|
||||
[]byte(`{"type": "ProjectCreated"}`),
|
||||
now,
|
||||
), milestone.PushedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceMilestonePushed,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("milestone"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "UPDATE projections.milestones SET last_pushed_date = $1 WHERE (instance_id = $2) AND (type = $3)",
|
||||
expectedArgs: []interface{}{
|
||||
now,
|
||||
"instance-id",
|
||||
milestone.ProjectCreated,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceMilestonePushed instance deleted milestone",
|
||||
args: args{
|
||||
event: getEvent(testEvent(
|
||||
repository.EventType(milestone.PushedEventType),
|
||||
milestone.AggregateType,
|
||||
[]byte(`{"type": "InstanceDeleted"}`),
|
||||
), milestone.PushedEventMapper),
|
||||
},
|
||||
reduce: (&milestoneProjection{}).reduceMilestonePushed,
|
||||
want: wantReduce{
|
||||
aggregateType: eventstore.AggregateType("milestone"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "DELETE FROM projections.milestones WHERE (instance_id = $1)",
|
||||
expectedArgs: []interface{}{
|
||||
"instance-id",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
event := baseEvent(t)
|
||||
got, err := tt.reduce(event)
|
||||
if _, ok := err.(errors.InvalidArgument); !ok {
|
||||
t.Errorf("no wrong event mapping: %v, got: %v", err, got)
|
||||
}
|
||||
|
||||
event = tt.args.event(t)
|
||||
got, err = tt.reduce(event)
|
||||
assertReduce(t, got, err, MilestonesProjectionTable, tt.want)
|
||||
})
|
||||
}
|
||||
}
|
@ -31,6 +31,8 @@ func (p *PushedEvent) SetBaseEvent(b *eventstore.BaseEvent) {
|
||||
p.BaseEvent = b
|
||||
}
|
||||
|
||||
var PushedEventMapper = eventstore.GenericEventMapper[PushedEvent]
|
||||
|
||||
func NewPushedEvent(
|
||||
ctx context.Context,
|
||||
aggregate *Aggregate,
|
||||
|
@ -5,5 +5,5 @@ import (
|
||||
)
|
||||
|
||||
func RegisterEventMappers(es *eventstore.Eventstore) {
|
||||
es.RegisterFilterEventMapper(AggregateType, PushedEventType, eventstore.GenericEventMapper[PushedEvent])
|
||||
es.RegisterFilterEventMapper(AggregateType, PushedEventType, PushedEventMapper)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user