feat: new user auth api (#1168)

* fix: correct selectors for extended writemodel

* fix: no previous checks in eventstore

* start check previous

* feat: auth user commands

* feat: auth user commands

* feat: auth user commands

* feat: otp

* feat: corrections from pr merge

* feat: webauthn

* feat: comment old webauthn

* feat: refactor user, human, machine

* feat: webauth command side

* feat: command and query side in login

* feat: fix user writemodel append events

* fix: remove creation dates on command side

* fix: remove previous sequence

* previous sequence

* fix: external idps

* Update internal/api/grpc/management/user.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* Update internal/v2/command/user_human_email.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix: pr changes

* fix: phone verification

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi
2021-01-15 09:32:59 +01:00
committed by GitHub
parent e5731b0d3b
commit 959530ddad
74 changed files with 1554 additions and 1519 deletions

View File

@@ -74,7 +74,7 @@ func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Ev
event.AggregateType, event.AggregateID, previousSequence, previousSequence).Scan(&previousSequence, &event.CreationDate)
if err != nil {
logging.LogWithFields("SQL-IP3js",
logging.LogWithFields("SQL-5M0sd",
"aggregate", event.AggregateType,
"previousSequence", previousSequence,
"aggregateId", event.AggregateID,

View File

@@ -13,11 +13,6 @@ type Aggregater interface {
ResourceOwner() string
//Version represents the semantic version of the aggregate
Version() Version
//PreviouseSequence should return the sequence of the latest event of this aggregate
// stored in the eventstore
// it's set to the first event of this push transaction,
// later events consume the sequence of the previously pushed event of the aggregate
PreviousSequence() uint64
}
func NewAggregate(
@@ -25,15 +20,13 @@ func NewAggregate(
typ AggregateType,
resourceOwner string,
version Version,
previousSequence uint64,
) *Aggregate {
return &Aggregate{
id: id,
typ: typ,
resourceOwner: resourceOwner,
version: version,
previousSequence: previousSequence,
events: []EventPusher{},
id: id,
typ: typ,
resourceOwner: resourceOwner,
version: version,
events: []EventPusher{},
}
}
@@ -43,23 +36,21 @@ func AggregateFromWriteModel(
version Version,
) *Aggregate {
return &Aggregate{
id: wm.AggregateID,
typ: typ,
resourceOwner: wm.ResourceOwner,
version: version,
previousSequence: wm.ProcessedSequence,
events: []EventPusher{},
id: wm.AggregateID,
typ: typ,
resourceOwner: wm.ResourceOwner,
version: version,
events: []EventPusher{},
}
}
//Aggregate is the basic implementation of Aggregater
type Aggregate struct {
id string `json:"-"`
typ AggregateType `json:"-"`
events []EventPusher `json:"-"`
resourceOwner string `json:"-"`
version Version `json:"-"`
previousSequence uint64 `json:"-"`
id string `json:"-"`
typ AggregateType `json:"-"`
events []EventPusher `json:"-"`
resourceOwner string `json:"-"`
version Version `json:"-"`
}
//PushEvents adds all the events to the aggregate.
@@ -93,8 +84,3 @@ func (a *Aggregate) ResourceOwner() string {
func (a *Aggregate) Version() Version {
return a.version
}
//PreviousSequence implements Aggregater
func (a *Aggregate) PreviousSequence() uint64 {
return a.previousSequence
}

View File

@@ -33,6 +33,5 @@ type EventReader interface {
ResourceOwner() string
AggregateVersion() Version
Sequence() uint64
PreviousSequence() uint64
CreationDate() time.Time
}

View File

@@ -14,11 +14,10 @@ type BaseEvent struct {
aggregateType AggregateType `json:"-"`
EventType EventType `json:"-"`
resourceOwner string `json:"-"`
aggregateVersion Version `json:"-"`
sequence uint64 `json:"-"`
previouseSequence uint64 `json:"-"`
creationDate time.Time `json:"-"`
resourceOwner string `json:"-"`
aggregateVersion Version `json:"-"`
sequence uint64 `json:"-"`
creationDate time.Time `json:"-"`
//User is the user who created the event
User string `json:"-"`
@@ -56,9 +55,6 @@ func (e *BaseEvent) AggregateVersion() Version {
func (e *BaseEvent) Sequence() uint64 {
return e.sequence
}
func (e *BaseEvent) PreviousSequence() uint64 {
return e.previouseSequence
}
func (e *BaseEvent) CreationDate() time.Time {
return e.creationDate
}

View File

@@ -66,25 +66,21 @@ func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...Aggregat
func (es *Eventstore) aggregatesToEvents(aggregates []Aggregater) ([]*repository.Event, error) {
events := make([]*repository.Event, 0, len(aggregates))
for _, aggregate := range aggregates {
var previousEvent *repository.Event
for _, event := range aggregate.Events() {
data, err := eventData(event)
if err != nil {
return nil, err
}
events = append(events, &repository.Event{
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
PreviousEvent: previousEvent,
PreviousSequence: aggregate.PreviousSequence(),
Data: data,
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
Data: data,
})
previousEvent = events[len(events)-1]
}
}
return events, nil

View File

@@ -14,9 +14,8 @@ import (
)
type testAggregate struct {
id string
events []EventPusher
previousSequence uint64
id string
events []EventPusher
}
func (a *testAggregate) ID() string {
@@ -39,10 +38,6 @@ func (a *testAggregate) Version() Version {
return "v1"
}
func (a *testAggregate) PreviousSequence() uint64 {
return a.previousSequence
}
// testEvent implements the Event interface
type testEvent struct {
BaseEvent
@@ -425,8 +420,8 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
},
res: res{
wantErr: false,
events: linkEvents(
&repository.Event{
events: []*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -436,7 +431,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Type: "test.event",
Version: "v1",
},
&repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -446,7 +441,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Type: "test.event",
Version: "v1",
},
),
},
},
},
{
@@ -507,8 +502,8 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
res: res{
wantErr: false,
events: combineEventLists(
linkEvents(
&repository.Event{
[]*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -518,7 +513,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Type: "test.event",
Version: "v1",
},
&repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -528,7 +523,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
Type: "test.event",
Version: "v1",
},
),
},
[]*repository.Event{
{
AggregateID: "2",
@@ -695,8 +690,8 @@ func TestEventstore_Push(t *testing.T) {
fields: fields{
repo: &testRepo{
t: t,
events: linkEvents(
&repository.Event{
events: []*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -706,7 +701,7 @@ func TestEventstore_Push(t *testing.T) {
Type: "test.event",
Version: "v1",
},
&repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -716,7 +711,7 @@ func TestEventstore_Push(t *testing.T) {
Type: "test.event",
Version: "v1",
},
),
},
},
eventMapper: map[EventType]func(*repository.Event) (EventReader, error){
"test.event": func(e *repository.Event) (EventReader, error) {
@@ -766,8 +761,8 @@ func TestEventstore_Push(t *testing.T) {
repo: &testRepo{
t: t,
events: combineEventLists(
linkEvents(
&repository.Event{
[]*repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -777,7 +772,7 @@ func TestEventstore_Push(t *testing.T) {
Type: "test.event",
Version: "v1",
},
&repository.Event{
{
AggregateID: "1",
AggregateType: "test.aggregate",
Data: []byte(nil),
@@ -787,7 +782,7 @@ func TestEventstore_Push(t *testing.T) {
Type: "test.event",
Version: "v1",
},
),
},
[]*repository.Event{
{
AggregateID: "2",
@@ -1305,7 +1300,7 @@ func combineEventLists(lists ...[]*repository.Event) []*repository.Event {
func linkEvents(events ...*repository.Event) []*repository.Event {
for i := 1; i < len(events); i++ {
events[i].PreviousEvent = events[i-1]
// events[i].PreviousEvent = events[i-1]
}
return events
}
@@ -1337,9 +1332,6 @@ func compareEvents(t *testing.T, want, got *repository.Event) {
if want.Version != got.Version {
t.Errorf("wrong version got %q want %q", got.Version, want.Version)
}
if (want.PreviousEvent == nil) != (got.PreviousEvent == nil) {
t.Errorf("linking failed got was linked: %v want was linked: %v", (got.PreviousEvent != nil), (want.PreviousEvent != nil))
}
if want.PreviousSequence != got.PreviousSequence {
t.Errorf("wrong previous sequence got %d want %d", got.PreviousSequence, want.PreviousSequence)
}

View File

@@ -16,17 +16,13 @@ type Event struct {
// if it's 0 then it's the first event of this aggregate
PreviousSequence uint64
//PreviousEvent is needed in push to update PreviousSequence
// it implements a linked list
PreviousEvent *Event
//CreationDate is the time the event is created
// it's used for human readability.
// Don't use it for event ordering,
// time drifts in different services could cause integrity problems
CreationDate time.Time
//KeyType describes the cause of the event (e.g. user.added)
//Type describes the cause of the event (e.g. user.added)
// it should always be in past-form
Type EventType

View File

@@ -17,7 +17,10 @@ import (
)
const (
crdbInsert = "WITH input_event ( " +
//as soon as stored procedures are possible in crdb
// we could move the code to migrations and coll the procedure
// traking issue: https://github.com/cockroachdb/cockroach/issues/17511
crdbInsert = "WITH data ( " +
" event_type, " +
" aggregate_type, " +
" aggregate_id, " +
@@ -27,47 +30,35 @@ const (
" editor_user, " +
" editor_service, " +
" resource_owner, " +
" previous_sequence, " +
// variables below are calculated
" max_event_seq " +
") AS ( " +
" ( " +
//the following select will return no row if no previous event defined
" SELECT " +
" $1::VARCHAR, " +
" $2::VARCHAR, " +
" $3::VARCHAR, " +
" $4::VARCHAR, " +
" COALESCE($5::TIMESTAMPTZ, NOW()), " +
" $6::JSONB, " +
" $7::VARCHAR, " +
" $8::VARCHAR, " +
" resource_owner, " +
" $10::BIGINT, " +
" MAX(event_sequence) AS max_event_seq " +
" FROM eventstore.events " +
" WHERE " +
" aggregate_type = $2::VARCHAR " +
" AND aggregate_id = $3::VARCHAR " +
" GROUP BY resource_owner " +
" ) UNION (" +
// if no previous event we use the given data
" VALUES (" +
" $1::VARCHAR, " +
" $2::VARCHAR, " +
" $3::VARCHAR, " +
" $4::VARCHAR, " +
" COALESCE($5::TIMESTAMPTZ, NOW()), " +
" $6::JSONB, " +
" $7::VARCHAR, " +
" $8::VARCHAR, " +
" $9::VARCHAR, " +
" $10::BIGINT, " +
" NULL::BIGINT " +
" ) " +
" ) " +
// ensure only 1 row in input_event
" LIMIT 1 " +
" previous_sequence" +
") AS (" +
//previous_data selects the needed data of the latest event of the aggregate
// and buffers it (crdb inmemory)
" WITH previous_data AS (" +
" SELECT MAX(event_sequence) AS seq, resource_owner " +
" FROM eventstore.events " +
//TODO: remove LIMIT 1 as soon as data cleaned up (only 1 resource_owner per aggregate)
" WHERE aggregate_type = $2 AND aggregate_id = $3 GROUP BY resource_owner LIMIT 1" +
" )" +
// defines the data to be inserted
" SELECT " +
" $1::VARCHAR AS event_type, " +
" $2::VARCHAR AS aggregate_type, " +
" $3::VARCHAR AS aggregate_id, " +
" $4::VARCHAR AS aggregate_version, " +
" NOW() AS creation_date, " +
" $5::JSONB AS event_data, " +
" $6::VARCHAR AS editor_user, " +
" $7::VARCHAR AS editor_service, " +
" CASE WHEN EXISTS (SELECT * FROM previous_data) " +
" THEN (SELECT resource_owner FROM previous_data) " +
" ELSE $8::VARCHAR " +
" end AS resource_owner, " +
" CASE WHEN EXISTS (SELECT * FROM previous_data) " +
" THEN (SELECT seq FROM previous_data) " +
" ELSE NULL " +
" end AS previous_sequence" +
") " +
"INSERT INTO eventstore.events " +
" ( " +
@@ -94,9 +85,9 @@ const (
" editor_service, " +
" resource_owner, " +
" previous_sequence " +
" FROM input_event " +
" FROM data " +
" ) " +
"RETURNING id, event_sequence, previous_sequence, creation_date, resource_owner "
"RETURNING id, event_sequence, previous_sequence, creation_date, resource_owner"
)
type CRDB struct {
@@ -119,28 +110,17 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
return caos_errs.ThrowInternal(err, "SQL-OdXRE", "prepare failed")
}
var previousSequence Sequence
for _, event := range events {
previousSequence := Sequence(event.PreviousSequence)
if event.PreviousEvent != nil {
if event.PreviousEvent.AggregateType != event.AggregateType || event.PreviousEvent.AggregateID != event.AggregateID {
return caos_errs.ThrowPreconditionFailed(nil, "SQL-J55uR", "aggregate of linked events unequal")
}
previousSequence = Sequence(event.PreviousEvent.Sequence)
}
err = stmt.QueryRowContext(ctx,
event.Type,
event.AggregateType,
event.AggregateID,
event.Version,
&sql.NullTime{
Time: event.CreationDate,
Valid: !event.CreationDate.IsZero(),
},
Data(event.Data),
event.EditorUser,
event.EditorService,
event.ResourceOwner,
previousSequence,
).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate, &event.ResourceOwner)
event.PreviousSequence = uint64(previousSequence)

View File

@@ -284,11 +284,11 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
res res
}{
{
name: "push 1 event with check previous",
name: "push 1 event",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "1", true, 0),
generateEvent(t, "1"),
},
},
res: res{
@@ -300,83 +300,14 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
}},
},
{
name: "fail push 1 event with check previous wrong sequence",
name: "push two events on agg",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "2", true, 5),
generateEvent(t, "6"),
generateEvent(t, "6"),
},
},
res: res{
wantErr: true,
eventsRes: eventsRes{
pushedEventsCount: 0,
aggID: []string{"2"},
aggType: repository.AggregateType(t.Name()),
},
},
},
{
name: "push 1 event without check previous",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "3", false, 0),
},
},
res: res{
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 1,
aggID: []string{"3"},
aggType: repository.AggregateType(t.Name()),
},
},
},
{
name: "push 1 event without check previous wrong sequence",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "4", false, 5),
},
},
res: res{
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 1,
aggID: []string{"4"},
aggType: repository.AggregateType(t.Name()),
},
},
},
{
name: "fail on push two events on agg without linking",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "5", true, 0),
generateEvent(t, "5", true, 0),
},
},
res: res{
wantErr: true,
eventsRes: eventsRes{
pushedEventsCount: 0,
aggID: []string{"5"},
aggType: repository.AggregateType(t.Name()),
},
},
},
{
name: "push two events on agg with linking",
args: args{
ctx: context.Background(),
events: linkEvents(
generateEvent(t, "6", true, 0),
generateEvent(t, "6", true, 0),
),
},
res: res{
wantErr: false,
eventsRes: eventsRes{
@@ -386,51 +317,12 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
},
},
},
{
name: "push two events on agg with linking without check previous",
args: args{
ctx: context.Background(),
events: linkEvents(
generateEvent(t, "7", false, 0),
generateEvent(t, "7", false, 0),
),
},
res: res{
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 2,
aggID: []string{"7"},
aggType: repository.AggregateType(t.Name()),
},
},
},
{
name: "push two events on agg with linking mixed check previous",
args: args{
ctx: context.Background(),
events: linkEvents(
generateEvent(t, "8", false, 0),
generateEvent(t, "8", true, 0),
generateEvent(t, "8", false, 0),
generateEvent(t, "8", true, 0),
generateEvent(t, "8", true, 0),
),
},
res: res{
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 5,
aggID: []string{"8"},
aggType: repository.AggregateType(t.Name()),
},
},
},
{
name: "failed push because context canceled",
args: args{
ctx: canceledCtx(),
events: []*repository.Event{
generateEvent(t, "9", true, 0),
generateEvent(t, "9"),
},
},
res: res{
@@ -485,11 +377,11 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
res res
}{
{
name: "push two aggregates both check previous",
name: "push two aggregates",
args: args{
events: []*repository.Event{
generateEvent(t, "100", true, 0),
generateEvent(t, "101", true, 0),
generateEvent(t, "100"),
generateEvent(t, "101"),
},
},
res: res{
@@ -502,18 +394,14 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
},
},
{
name: "push two aggregates both check previous multiple events",
name: "push two aggregates both multiple events",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "102", true, 0),
generateEvent(t, "102", true, 0),
),
linkEvents(
generateEvent(t, "103", true, 0),
generateEvent(t, "103", true, 0),
),
),
events: []*repository.Event{
generateEvent(t, "102"),
generateEvent(t, "102"),
generateEvent(t, "103"),
generateEvent(t, "103"),
},
},
res: res{
wantErr: false,
@@ -525,64 +413,28 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
},
},
{
name: "fail push linked events of different aggregates",
name: "push two aggregates mixed multiple events",
args: args{
events: linkEvents(
generateEvent(t, "104", false, 0),
generateEvent(t, "105", false, 0),
),
},
res: res{
wantErr: true,
eventsRes: eventsRes{
pushedEventsCount: 0,
aggID: []string{"104", "105"},
aggType: []repository.AggregateType{repository.AggregateType(t.Name())},
events: []*repository.Event{
generateEvent(t, "106"),
generateEvent(t, "106"),
generateEvent(t, "106"),
generateEvent(t, "106"),
generateEvent(t, "107"),
generateEvent(t, "107"),
generateEvent(t, "107"),
generateEvent(t, "107"),
generateEvent(t, "108"),
generateEvent(t, "108"),
generateEvent(t, "108"),
generateEvent(t, "108"),
},
},
},
{
name: "push two aggregates mixed check previous multiple events",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "106", true, 0),
generateEvent(t, "106", false, 0),
generateEvent(t, "106", false, 0),
generateEvent(t, "106", true, 0),
),
linkEvents(
generateEvent(t, "107", false, 0),
generateEvent(t, "107", true, 0),
generateEvent(t, "107", false, 0),
generateEvent(t, "107", true, 0),
),
linkEvents(
generateEvent(t, "108", true, 0),
generateEvent(t, "108", false, 0),
generateEvent(t, "108", false, 0),
generateEvent(t, "108", true, 0),
),
),
},
},
{
name: "failed push same aggregate in two transactions",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "109", true, 0),
),
linkEvents(
generateEvent(t, "109", true, 0),
),
),
},
res: res{
wantErr: true,
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 0,
aggID: []string{"109"},
pushedEventsCount: 12,
aggID: []string{"106", "107", "108"},
aggType: []repository.AggregateType{repository.AggregateType(t.Name())},
},
},
@@ -633,25 +485,19 @@ func TestCRDB_Push_Parallel(t *testing.T) {
name: "clients push different aggregates",
args: args{
events: [][]*repository.Event{
linkEvents(
generateEvent(t, "200", false, 0),
generateEvent(t, "200", true, 0),
generateEvent(t, "200", false, 0),
),
linkEvents(
generateEvent(t, "201", false, 0),
generateEvent(t, "201", true, 0),
generateEvent(t, "201", false, 0),
),
combineEventLists(
linkEvents(
generateEvent(t, "202", false, 0),
),
linkEvents(
generateEvent(t, "203", true, 0),
generateEvent(t, "203", false, 0),
),
),
{
generateEvent(t, "200"),
generateEvent(t, "200"),
generateEvent(t, "200"),
generateEvent(t, "201"),
generateEvent(t, "201"),
generateEvent(t, "201"),
},
{
generateEvent(t, "202"),
generateEvent(t, "203"),
generateEvent(t, "203"),
},
},
},
res: res{
@@ -664,41 +510,31 @@ func TestCRDB_Push_Parallel(t *testing.T) {
},
},
{
name: "clients push same aggregates no check previous",
name: "clients push same aggregates",
args: args{
events: [][]*repository.Event{
linkEvents(
generateEvent(t, "204", false, 0),
generateEvent(t, "204", false, 0),
),
linkEvents(
generateEvent(t, "204", false, 0),
generateEvent(t, "204", false, 0),
),
combineEventLists(
linkEvents(
generateEvent(t, "205", false, 0),
generateEvent(t, "205", false, 0),
generateEvent(t, "205", false, 0),
),
linkEvents(
generateEvent(t, "206", false, 0),
generateEvent(t, "206", false, 0),
generateEvent(t, "206", false, 0),
),
),
combineEventLists(
linkEvents(
generateEvent(t, "204", false, 0),
),
linkEvents(
generateEvent(t, "205", false, 0),
generateEvent(t, "205", false, 0),
),
linkEvents(
generateEvent(t, "206", false, 0),
),
),
{
generateEvent(t, "204"),
generateEvent(t, "204"),
},
{
generateEvent(t, "204"),
generateEvent(t, "204"),
},
{
generateEvent(t, "205"),
generateEvent(t, "205"),
generateEvent(t, "205"),
generateEvent(t, "206"),
generateEvent(t, "206"),
generateEvent(t, "206"),
},
{
generateEvent(t, "204"),
generateEvent(t, "205"),
generateEvent(t, "205"),
generateEvent(t, "206"),
},
},
},
res: res{
@@ -711,24 +547,24 @@ func TestCRDB_Push_Parallel(t *testing.T) {
},
},
{
name: "clients push different aggregates one with check previous",
name: "clients push different aggregates",
args: args{
events: [][]*repository.Event{
linkEvents(
generateEvent(t, "207", false, 0),
generateEvent(t, "207", false, 0),
generateEvent(t, "207", false, 0),
generateEvent(t, "207", false, 0),
generateEvent(t, "207", false, 0),
generateEvent(t, "207", false, 0),
),
linkEvents(
generateEvent(t, "208", true, 0),
generateEvent(t, "208", true, 0),
generateEvent(t, "208", true, 0),
generateEvent(t, "208", true, 0),
generateEvent(t, "208", true, 0),
),
{
generateEvent(t, "207"),
generateEvent(t, "207"),
generateEvent(t, "207"),
generateEvent(t, "207"),
generateEvent(t, "207"),
generateEvent(t, "207"),
},
{
generateEvent(t, "208"),
generateEvent(t, "208"),
generateEvent(t, "208"),
generateEvent(t, "208"),
generateEvent(t, "208"),
},
},
},
res: res{
@@ -741,21 +577,21 @@ func TestCRDB_Push_Parallel(t *testing.T) {
},
},
{
name: "clients push different aggregates all with check previous on first event fail",
name: "clients push same aggregates",
args: args{
events: [][]*repository.Event{
linkEvents(
generateEventWithData(t, "210", true, 0, []byte(`{ "transaction": 1 }`)),
generateEventWithData(t, "210", false, 0, []byte(`{ "transaction": 1.1 }`)),
),
linkEvents(
generateEventWithData(t, "210", true, 0, []byte(`{ "transaction": 2 }`)),
generateEventWithData(t, "210", false, 0, []byte(`{ "transaction": 2.1 }`)),
),
linkEvents(
generateEventWithData(t, "210", true, 0, []byte(`{ "transaction": 3 }`)),
generateEventWithData(t, "210", false, 0, []byte(`{ "transaction": 30.1 }`)),
),
{
generateEventWithData(t, "210", []byte(`{ "transaction": 1 }`)),
generateEventWithData(t, "210", []byte(`{ "transaction": 1.1 }`)),
},
{
generateEventWithData(t, "210", []byte(`{ "transaction": 2 }`)),
generateEventWithData(t, "210", []byte(`{ "transaction": 2.1 }`)),
},
{
generateEventWithData(t, "210", []byte(`{ "transaction": 3 }`)),
generateEventWithData(t, "210", []byte(`{ "transaction": 30.1 }`)),
},
},
},
res: res{
@@ -850,9 +686,9 @@ func TestCRDB_Filter(t *testing.T) {
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "300", false, 0),
generateEvent(t, "300", false, 0),
generateEvent(t, "300", false, 0),
generateEvent(t, "300"),
generateEvent(t, "300"),
generateEvent(t, "300"),
},
},
res: res{
@@ -873,10 +709,10 @@ func TestCRDB_Filter(t *testing.T) {
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "305", false, 0),
generateEvent(t, "303"),
generateEvent(t, "303"),
generateEvent(t, "303"),
generateEvent(t, "305"),
},
},
res: res{
@@ -938,9 +774,9 @@ func TestCRDB_LatestSequence(t *testing.T) {
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "400", false, 0),
generateEvent(t, "400", false, 0),
generateEvent(t, "400", false, 0),
generateEvent(t, "400"),
generateEvent(t, "400"),
generateEvent(t, "400"),
},
},
res: res{
@@ -960,9 +796,9 @@ func TestCRDB_LatestSequence(t *testing.T) {
},
fields: fields{
existingEvents: []*repository.Event{
generateEvent(t, "401", false, 0),
generateEvent(t, "401", false, 0),
generateEvent(t, "401", false, 0),
generateEvent(t, "401"),
generateEvent(t, "401"),
generateEvent(t, "401"),
},
},
res: res{
@@ -1016,8 +852,8 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
name: "two events of same aggregate same resource owner",
args: args{
events: []*repository.Event{
generateEvent(t, "500", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "500", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "500", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "500", func(e *repository.Event) { e.ResourceOwner = "caos" }),
},
},
fields: fields{
@@ -1032,8 +868,8 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
name: "two events of different aggregate same resource owner",
args: args{
events: []*repository.Event{
generateEvent(t, "501", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "502", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "501", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "502", func(e *repository.Event) { e.ResourceOwner = "caos" }),
},
},
fields: fields{
@@ -1048,8 +884,8 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
name: "two events of different aggregate different resource owner",
args: args{
events: []*repository.Event{
generateEvent(t, "503", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "504", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "503", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "504", func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
},
},
fields: fields{
@@ -1063,16 +899,12 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
{
name: "events of different aggregate different resource owner",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "505", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "505", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
),
linkEvents(
generateEvent(t, "506", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "506", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
),
),
events: []*repository.Event{
generateEvent(t, "505", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "505", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "506", func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "506", func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
},
},
fields: fields{
aggregateIDs: []string{"505", "506"},
@@ -1085,16 +917,12 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
{
name: "events of different aggregate different resource owner per event",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "507", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "507", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
linkEvents(
generateEvent(t, "508", false, 0, func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "508", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
),
events: []*repository.Event{
generateEvent(t, "507", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "507", func(e *repository.Event) { e.ResourceOwner = "ignored" }),
generateEvent(t, "508", func(e *repository.Event) { e.ResourceOwner = "zitadel" }),
generateEvent(t, "508", func(e *repository.Event) { e.ResourceOwner = "ignored" }),
},
},
fields: fields{
aggregateIDs: []string{"507", "508"},
@@ -1107,16 +935,12 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
{
name: "events of one aggregate different resource owner per event",
args: args{
events: combineEventLists(
linkEvents(
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
linkEvents(
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
generateEvent(t, "509", false, 0, func(e *repository.Event) { e.ResourceOwner = "ignored" }),
),
),
events: []*repository.Event{
generateEvent(t, "509", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "509", func(e *repository.Event) { e.ResourceOwner = "ignored" }),
generateEvent(t, "509", func(e *repository.Event) { e.ResourceOwner = "ignored" }),
generateEvent(t, "509", func(e *repository.Event) { e.ResourceOwner = "ignored" }),
},
},
fields: fields{
aggregateIDs: []string{"509"},
@@ -1180,34 +1004,16 @@ func canceledCtx() context.Context {
return ctx
}
func combineEventLists(lists ...[]*repository.Event) []*repository.Event {
combined := make([]*repository.Event, 0)
for _, list := range lists {
combined = append(combined, list...)
}
return combined
}
func linkEvents(events ...*repository.Event) []*repository.Event {
for i := 1; i < len(events); i++ {
events[i].PreviousEvent = events[i-1]
}
return events
}
func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64, opts ...func(*repository.Event)) *repository.Event {
func generateEvent(t *testing.T, aggregateID string, opts ...func(*repository.Event)) *repository.Event {
t.Helper()
e := &repository.Event{
AggregateID: aggregateID,
AggregateType: repository.AggregateType(t.Name()),
CheckPreviousSequence: checkPrevious,
EditorService: "svc",
EditorUser: "user",
PreviousEvent: nil,
PreviousSequence: previousSeq,
ResourceOwner: "ro",
Type: "test.created",
Version: "v1",
AggregateID: aggregateID,
AggregateType: repository.AggregateType(t.Name()),
EditorService: "svc",
EditorUser: "user",
ResourceOwner: "ro",
Type: "test.created",
Version: "v1",
}
for _, opt := range opts {
@@ -1217,19 +1023,16 @@ func generateEvent(t *testing.T, aggregateID string, checkPrevious bool, previou
return e
}
func generateEventWithData(t *testing.T, aggregateID string, checkPrevious bool, previousSeq uint64, data []byte) *repository.Event {
func generateEventWithData(t *testing.T, aggregateID string, data []byte) *repository.Event {
t.Helper()
return &repository.Event{
AggregateID: aggregateID,
AggregateType: repository.AggregateType(t.Name()),
CheckPreviousSequence: checkPrevious,
EditorService: "svc",
EditorUser: "user",
PreviousEvent: nil,
PreviousSequence: previousSeq,
ResourceOwner: "ro",
Type: "test.created",
Version: "v1",
Data: data,
AggregateID: aggregateID,
AggregateType: repository.AggregateType(t.Name()),
EditorService: "svc",
EditorUser: "user",
ResourceOwner: "ro",
Type: "test.created",
Version: "v1",
Data: data,
}
}

View File

@@ -27,9 +27,13 @@ func TestMain(m *testing.M) {
}
testCRDBClient, err = sql.Open("postgres", ts.PGURL().String())
if err != nil {
logging.LogWithFields("REPOS-CF6dQ", "error", err).Fatal("unable to connect to db")
}
if err = testCRDBClient.Ping(); err != nil {
logging.LogWithFields("REPOS-CF6dQ", "error", err).Fatal("unable to ping db")
}
defer func() {
testCRDBClient.Close()

View File

@@ -112,12 +112,11 @@ func eventsScanner(scanner scan, dest interface{}) (err error) {
)
if err != nil {
logging.Log("SQL-kn1Sw").WithError(err).Warn("unable to scan row")
return z_errors.ThrowInternal(err, "SQL-J0hFS", "unable to scan row")
logging.Log("SQL-3mofs").WithError(err).Warn("unable to scan row")
return z_errors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row")
}
event.PreviousSequence = uint64(previousSequence)
event.Data = make([]byte, len(data))
copy(event.Data, data)

View File

@@ -329,9 +329,9 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "300", false, 0),
generateEvent(t, "300", false, 0),
generateEvent(t, "300", false, 0),
generateEvent(t, "300"),
generateEvent(t, "300"),
generateEvent(t, "300"),
},
},
res: res{
@@ -352,10 +352,10 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "301", false, 0),
generateEvent(t, "302", false, 0),
generateEvent(t, "302", false, 0),
generateEvent(t, "303", false, 0, func(e *repository.Event) { e.AggregateType = "not in list" }),
generateEvent(t, "301"),
generateEvent(t, "302"),
generateEvent(t, "302"),
generateEvent(t, "303", func(e *repository.Event) { e.AggregateType = "not in list" }),
},
},
res: res{
@@ -377,11 +377,11 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "303", false, 0),
generateEvent(t, "304", false, 0, func(e *repository.Event) { e.AggregateType = "not in list" }),
generateEvent(t, "305", false, 0),
generateEvent(t, "303"),
generateEvent(t, "303"),
generateEvent(t, "303"),
generateEvent(t, "304", func(e *repository.Event) { e.AggregateType = "not in list" }),
generateEvent(t, "305"),
},
},
res: res{
@@ -402,11 +402,11 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "306", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "307", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "308", false, 0, func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.ResourceOwner = "orgID" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.ResourceOwner = "orgID" }),
generateEvent(t, "306", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "307", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "308", func(e *repository.Event) { e.ResourceOwner = "caos" }),
generateEvent(t, "309", func(e *repository.Event) { e.ResourceOwner = "orgID" }),
generateEvent(t, "309", func(e *repository.Event) { e.ResourceOwner = "orgID" }),
},
},
res: res{
@@ -428,11 +428,11 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "307", false, 0, func(e *repository.Event) { e.EditorService = "MANAGEMENT-API" }),
generateEvent(t, "307", false, 0, func(e *repository.Event) { e.EditorService = "MANAGEMENT-API" }),
generateEvent(t, "308", false, 0, func(e *repository.Event) { e.EditorService = "ADMIN-API" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.EditorService = "AUTHAPI" }),
generateEvent(t, "309", false, 0, func(e *repository.Event) { e.EditorService = "AUTHAPI" }),
generateEvent(t, "307", func(e *repository.Event) { e.EditorService = "MANAGEMENT-API" }),
generateEvent(t, "307", func(e *repository.Event) { e.EditorService = "MANAGEMENT-API" }),
generateEvent(t, "308", func(e *repository.Event) { e.EditorService = "ADMIN-API" }),
generateEvent(t, "309", func(e *repository.Event) { e.EditorService = "AUTHAPI" }),
generateEvent(t, "309", func(e *repository.Event) { e.EditorService = "AUTHAPI" }),
},
},
res: res{
@@ -455,13 +455,13 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "310", false, 0, func(e *repository.Event) { e.EditorUser = "adlerhurst" }),
generateEvent(t, "310", false, 0, func(e *repository.Event) { e.EditorUser = "adlerhurst" }),
generateEvent(t, "310", false, 0, func(e *repository.Event) { e.EditorUser = "nobody" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.EditorUser = "" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.EditorUser = "" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.EditorUser = "fforootd" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.EditorUser = "fforootd" }),
generateEvent(t, "310", func(e *repository.Event) { e.EditorUser = "adlerhurst" }),
generateEvent(t, "310", func(e *repository.Event) { e.EditorUser = "adlerhurst" }),
generateEvent(t, "310", func(e *repository.Event) { e.EditorUser = "nobody" }),
generateEvent(t, "311", func(e *repository.Event) { e.EditorUser = "" }),
generateEvent(t, "311", func(e *repository.Event) { e.EditorUser = "" }),
generateEvent(t, "312", func(e *repository.Event) { e.EditorUser = "fforootd" }),
generateEvent(t, "312", func(e *repository.Event) { e.EditorUser = "fforootd" }),
},
},
res: res{
@@ -483,15 +483,15 @@ func Test_query_events_with_crdb(t *testing.T) {
fields: fields{
client: testCRDBClient,
existingEvents: []*repository.Event{
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.created" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.updated" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.deactivated" }),
generateEvent(t, "311", false, 0, func(e *repository.Event) { e.Type = "user.locked" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.created" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.updated" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.deactivated" }),
generateEvent(t, "312", false, 0, func(e *repository.Event) { e.Type = "user.reactivated" }),
generateEvent(t, "313", false, 0, func(e *repository.Event) { e.Type = "user.locked" }),
generateEvent(t, "311", func(e *repository.Event) { e.Type = "user.created" }),
generateEvent(t, "311", func(e *repository.Event) { e.Type = "user.updated" }),
generateEvent(t, "311", func(e *repository.Event) { e.Type = "user.deactivated" }),
generateEvent(t, "311", func(e *repository.Event) { e.Type = "user.locked" }),
generateEvent(t, "312", func(e *repository.Event) { e.Type = "user.created" }),
generateEvent(t, "312", func(e *repository.Event) { e.Type = "user.updated" }),
generateEvent(t, "312", func(e *repository.Event) { e.Type = "user.deactivated" }),
generateEvent(t, "312", func(e *repository.Event) { e.Type = "user.reactivated" }),
generateEvent(t, "313", func(e *repository.Event) { e.Type = "user.locked" }),
},
},
res: res{