mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:17:32 +00:00
feat: org command sides (#96)
* start org * refactor(eventstore): filter in sql for querier * feat(eventstore): Aggregate precondition preconditions are checked right before insert. Insert is still transaction save * feat(eventstore): check preconditions in repository * test(eventstore): test precondition in models * test(eventstore): precondition-tests * start org * refactor(eventstore): filter in sql for querier * feat(eventstore): Aggregate precondition preconditions are checked right before insert. Insert is still transaction save * feat(admin): start implement org * feat(eventstore): check preconditions in repository * fix(eventstore): data as NULL if empty refactor(eventstore): naming in sequence methods * feat(admin): org command side * feat(management): start org-repo * feat(org): member * fix: replace ObjectRoot.ID with ObjectRoot.AggregateID * aggregateID * add remove,change member * refactor(org): namings * refactor(eventstore): querier as type * fix(precondition): rename validation from precondition to validation * test(eventstore): isErr func instead of wantErr bool * fix(tests): Data * fix(eventstore): correct check for existing events in push, simplify insert statement * fix(eventstore): aggregate id public * test(org): eventsourcing * test(org): eventstore * test(org): deactivate, reactivate, orgbyid * test(org): getMemberByIDs * tests * running tests * add user repo to admin * thorw not found if no org found * eventstore tests done * lauft * validate if user is already member of org * modules * delete unused file * add member validation test * return error if unable to validat member * generate org id once, set resourceowner of org * Update internal/admin/repository/eventsourcing/eventstore/org.go * Update internal/admin/repository/eventsourcing/eventstore/org.go * Update internal/org/repository/eventsourcing/member_model.go * Update internal/org/repository/eventsourcing/org.go * Update internal/org/repository/eventsourcing/org.go * Update internal/org/repository/eventsourcing/org_member.go * Update internal/org/repository/eventsourcing/org_member.go * Update internal/org/repository/eventsourcing/org_model.go * Update internal/org/repository/eventsourcing/org.go * Update internal/org/repository/eventsourcing/org_model.go * Update internal/org/repository/eventsourcing/org_model.go * typo * correct user events * usercreate for setuporg instead of userregister * set data * mod * mod * tests * cleanup code * code styling * return member on add and change * change username in startup * girignore * orgID as parameter in re-/deactive org * startup config * migration for admin_api-user * probes fro admin * move unique org Co-authored-by: Fabiennne <fabienne.gerschwiler@gmail.com>
This commit is contained in:
25
internal/eventstore/internal/repository/sql/data.go
Normal file
25
internal/eventstore/internal/repository/sql/data.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package sql
|
||||
|
||||
import "database/sql/driver"
|
||||
|
||||
// Data represents a byte array that may be null.
|
||||
// Data implements the sql.Scanner interface
|
||||
type Data []byte
|
||||
|
||||
// Scan implements the Scanner interface.
|
||||
func (data *Data) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
*data = nil
|
||||
return nil
|
||||
}
|
||||
*data = Data(value.([]byte))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value implements the driver Valuer interface.
|
||||
func (data Data) Value() (driver.Value, error) {
|
||||
if len(data) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return []byte(data), nil
|
||||
}
|
@@ -24,16 +24,13 @@ var (
|
||||
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` WHERE aggregate_id = \$1 AND aggregate_type = ANY\(\$2\) ORDER BY event_sequence LIMIT \$3`).String()
|
||||
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence`).String()
|
||||
|
||||
expectedInsertStatement = regexp.MustCompile(`insert into eventstore\.events ` +
|
||||
expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` +
|
||||
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence\) ` +
|
||||
`select \$1, \$2, \$3, \$4, coalesce\(\$5, now\(\)\), \$6, \$7, \$8, \$9, ` +
|
||||
`case \(select exists\(select event_sequence from eventstore\.events where aggregate_type = \$10 AND aggregate_id = \$11\)\) ` +
|
||||
`WHEN true then \(select event_sequence from eventstore\.events where aggregate_type = \$12 AND aggregate_id = \$13 order by event_sequence desc limit 1\) ` +
|
||||
`ELSE NULL ` +
|
||||
`end ` +
|
||||
`where \(` +
|
||||
`\(select count\(id\) from eventstore\.events where event_sequence >= COALESCE\(\$14, 0\) AND aggregate_type = \$15 AND aggregate_id = \$16\) = 1 OR ` +
|
||||
`\(\(select count\(id\) from eventstore\.events where aggregate_type = \$17 and aggregate_id = \$18\) = 0 AND COALESCE\(\$19, 0\) = 0\)\) RETURNING id, event_sequence, creation_date`).String()
|
||||
`SELECT \$1, \$2, \$3, \$4, COALESCE\(\$5, now\(\)\), \$6, \$7, \$8, \$9, \$10 ` +
|
||||
`WHERE EXISTS \(SELECT 1 WHERE ` +
|
||||
`EXISTS \(SELECT 1 FROM eventstore\.events WHERE event_sequence = COALESCE\(\$11, 0\) AND aggregate_type = \$12 AND aggregate_id = \$13\) OR ` +
|
||||
`NOT EXISTS \(SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$14 AND aggregate_id = \$15\) AND COALESCE\(\$16, 0\) = 0\) ` +
|
||||
`RETURNING id, event_sequence, creation_date`).String()
|
||||
)
|
||||
|
||||
type dbMock struct {
|
||||
@@ -105,9 +102,7 @@ func (db *dbMock) expectRollback(err error) *dbMock {
|
||||
func (db *dbMock) expectInsertEvent(e *models.Event, returnedID string, returnedSequence uint64) *dbMock {
|
||||
db.mock.ExpectQuery(expectedInsertStatement).
|
||||
WithArgs(
|
||||
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), e.Data, e.EditorUser, e.EditorService, e.ResourceOwner,
|
||||
e.AggregateType, e.AggregateID,
|
||||
e.AggregateType, e.AggregateID,
|
||||
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence),
|
||||
Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID,
|
||||
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence),
|
||||
).
|
||||
@@ -122,9 +117,7 @@ func (db *dbMock) expectInsertEvent(e *models.Event, returnedID string, returned
|
||||
func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock {
|
||||
db.mock.ExpectQuery(expectedInsertStatement).
|
||||
WithArgs(
|
||||
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), e.Data, e.EditorUser, e.EditorService, e.ResourceOwner,
|
||||
e.AggregateType, e.AggregateID,
|
||||
e.AggregateType, e.AggregateID,
|
||||
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence),
|
||||
Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID,
|
||||
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence),
|
||||
).
|
||||
|
@@ -54,6 +54,7 @@ func filter(querier Querier, searchQuery *es_models.SearchQuery) (events []*es_m
|
||||
for rows.Next() {
|
||||
event := new(models.Event)
|
||||
var previousSequence Sequence
|
||||
data := make(Data, 0)
|
||||
|
||||
err = rows.Scan(
|
||||
&event.ID,
|
||||
@@ -61,7 +62,7 @@ func filter(querier Querier, searchQuery *es_models.SearchQuery) (events []*es_m
|
||||
&event.Type,
|
||||
&event.Sequence,
|
||||
&previousSequence,
|
||||
&event.Data,
|
||||
&data,
|
||||
&event.EditorService,
|
||||
&event.EditorUser,
|
||||
&event.ResourceOwner,
|
||||
@@ -76,6 +77,10 @@ func filter(querier Querier, searchQuery *es_models.SearchQuery) (events []*es_m
|
||||
}
|
||||
|
||||
event.PreviousSequence = uint64(previousSequence)
|
||||
|
||||
event.Data = make([]byte, len(data))
|
||||
copy(event.Data, data)
|
||||
|
||||
events = append(events, event)
|
||||
}
|
||||
|
||||
|
@@ -11,19 +11,14 @@ import (
|
||||
"github.com/cockroachdb/cockroach-go/crdb"
|
||||
)
|
||||
|
||||
const insertStmt = "insert into eventstore.events " +
|
||||
const insertStmt = "INSERT INTO eventstore.events " +
|
||||
"(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " +
|
||||
"select $1, $2, $3, $4, coalesce($5, now()), $6, $7, $8, $9, " +
|
||||
// case is to set the highest sequence or NULL in previous_sequence
|
||||
"case (select exists(select event_sequence from eventstore.events where aggregate_type = $10 AND aggregate_id = $11)) " +
|
||||
"WHEN true then (select event_sequence from eventstore.events where aggregate_type = $12 AND aggregate_id = $13 order by event_sequence desc limit 1) " +
|
||||
"ELSE NULL " +
|
||||
"end " +
|
||||
"where (" +
|
||||
// exactly one event of requested aggregate must have a >= sequence (last inserted event)
|
||||
"(select count(id) from eventstore.events where event_sequence >= COALESCE($14, 0) AND aggregate_type = $15 AND aggregate_id = $16) = 1 OR " +
|
||||
// previous sequence = 0, no events must exist for the requested aggregate
|
||||
"((select count(id) from eventstore.events where aggregate_type = $17 and aggregate_id = $18) = 0 AND COALESCE($19, 0) = 0)) " +
|
||||
"SELECT $1, $2, $3, $4, COALESCE($5, now()), $6, $7, $8, $9, $10 " +
|
||||
"WHERE EXISTS (SELECT 1 WHERE " +
|
||||
// exactly one event of requested aggregate must have the given previous sequence as sequence (last inserted event)
|
||||
"EXISTS (SELECT 1 FROM eventstore.events WHERE event_sequence = COALESCE($11, 0) AND aggregate_type = $12 AND aggregate_id = $13) OR " +
|
||||
// if previous sequence = 0, no events must exist for the requested aggregate
|
||||
"NOT EXISTS (SELECT 1 FROM eventstore.events WHERE aggregate_type = $14 AND aggregate_id = $15) AND COALESCE($16, 0) = 0) " +
|
||||
"RETURNING id, event_sequence, creation_date"
|
||||
|
||||
func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
|
||||
@@ -74,14 +69,7 @@ func precondtion(tx *sql.Tx, aggregate *models.Aggregate) error {
|
||||
|
||||
func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Event) error {
|
||||
for _, event := range events {
|
||||
if event.Data == nil || len(event.Data) == 0 {
|
||||
//json decoder failes with EOF if json text is empty
|
||||
event.Data = []byte("{}")
|
||||
}
|
||||
|
||||
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, event.Data, event.EditorUser, event.EditorService, event.ResourceOwner,
|
||||
event.AggregateType, event.AggregateID,
|
||||
event.AggregateType, event.AggregateID,
|
||||
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, Data(event.Data), event.EditorUser, event.EditorService, event.ResourceOwner, previousSequence,
|
||||
previousSequence, event.AggregateType, event.AggregateID,
|
||||
event.AggregateType, event.AggregateID, previousSequence)
|
||||
|
||||
@@ -99,6 +87,7 @@ func insertEvents(stmt *sql.Stmt, previousSequence Sequence, events []*models.Ev
|
||||
}
|
||||
|
||||
if !rowInserted {
|
||||
logging.LogWithFields("SQL-5aATu", "aggregate", event.AggregateType, "id", event.AggregateID).Info("wrong sequence")
|
||||
return caos_errs.ThrowAlreadyExists(nil, "SQL-GKcAa", "wrong sequence")
|
||||
}
|
||||
|
||||
|
@@ -89,7 +89,6 @@ func TestSQL_PushAggregates(t *testing.T) {
|
||||
ResourceOwner: "ro",
|
||||
PreviousSequence: 34,
|
||||
Type: "eventTyp",
|
||||
Data: []byte("{}"),
|
||||
AggregateVersion: "v0.0.1",
|
||||
},
|
||||
"asdfölk-234", 45).
|
||||
@@ -101,7 +100,6 @@ func TestSQL_PushAggregates(t *testing.T) {
|
||||
ResourceOwner: "ro2",
|
||||
PreviousSequence: 45,
|
||||
Type: "eventTyp",
|
||||
Data: []byte("{}"),
|
||||
AggregateVersion: "v0.0.1",
|
||||
}, "asdfölk-233", 46).
|
||||
expectReleaseSavepoint(nil).
|
||||
@@ -151,7 +149,6 @@ func TestSQL_PushAggregates(t *testing.T) {
|
||||
EditorUser: "usr",
|
||||
ResourceOwner: "ro",
|
||||
PreviousSequence: 34,
|
||||
Data: []byte("{}"),
|
||||
Type: "eventTyp",
|
||||
AggregateVersion: "v0.0.1",
|
||||
}, "asdfölk-233", 47).
|
||||
@@ -162,7 +159,6 @@ func TestSQL_PushAggregates(t *testing.T) {
|
||||
EditorUser: "usr",
|
||||
ResourceOwner: "ro",
|
||||
PreviousSequence: 40,
|
||||
Data: []byte("{}"),
|
||||
Type: "eventTyp",
|
||||
AggregateVersion: "v0.0.1",
|
||||
}, "asdfölk-233", 48).
|
||||
@@ -410,8 +406,11 @@ func Test_precondtion(t *testing.T) {
|
||||
t.FailNow()
|
||||
}
|
||||
err = precondtion(tx, tt.args.aggregate)
|
||||
if (tt.isErr != nil && err == nil) || (tt.isErr != nil && !tt.isErr(err)) {
|
||||
t.Error("precondtion() wrong error", err)
|
||||
if (err != nil) && (tt.isErr == nil) {
|
||||
t.Errorf("no error expected got: %v", err)
|
||||
}
|
||||
if tt.isErr != nil && !tt.isErr(err) {
|
||||
t.Errorf("precondtion() wrong error %T, %v", err, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@@ -5,16 +5,16 @@ import (
|
||||
)
|
||||
|
||||
// Sequence represents a number that may be null.
|
||||
// Sequence implements the sql.Scanner interface so
|
||||
// Sequence implements the sql.Scanner interface
|
||||
type Sequence uint64
|
||||
|
||||
// Scan implements the Scanner interface.
|
||||
func (n *Sequence) Scan(value interface{}) error {
|
||||
func (seq *Sequence) Scan(value interface{}) error {
|
||||
if value == nil {
|
||||
*n = 0
|
||||
*seq = 0
|
||||
return nil
|
||||
}
|
||||
*n = Sequence(value.(int64))
|
||||
*seq = Sequence(value.(int64))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -15,7 +15,7 @@ func (at AggregateType) String() string {
|
||||
type Aggregates []*Aggregate
|
||||
|
||||
type Aggregate struct {
|
||||
id string
|
||||
ID string
|
||||
typ AggregateType
|
||||
PreviousSequence uint64
|
||||
version Version
|
||||
@@ -45,7 +45,7 @@ func (a *Aggregate) AppendEvent(typ EventType, payload interface{}) (*Aggregate,
|
||||
CreationDate: time.Now(),
|
||||
Data: data,
|
||||
Type: typ,
|
||||
AggregateID: a.id,
|
||||
AggregateID: a.ID,
|
||||
AggregateType: a.typ,
|
||||
AggregateVersion: a.version,
|
||||
EditorService: a.editorService,
|
||||
@@ -66,7 +66,7 @@ func (a *Aggregate) Validate() error {
|
||||
if a == nil {
|
||||
return errors.ThrowPreconditionFailed(nil, "MODEL-yi5AC", "aggregate is nil")
|
||||
}
|
||||
if a.id == "" {
|
||||
if a.ID == "" {
|
||||
return errors.ThrowPreconditionFailed(nil, "MODEL-FSjKV", "id not set")
|
||||
}
|
||||
if string(a.typ) == "" {
|
||||
|
@@ -22,7 +22,7 @@ func (c *AggregateCreator) NewAggregate(ctx context.Context, id string, typ Aggr
|
||||
resourceOwner := ctxData.OrgID
|
||||
|
||||
aggregate := &Aggregate{
|
||||
id: id,
|
||||
ID: id,
|
||||
typ: typ,
|
||||
PreviousSequence: previousSequence,
|
||||
version: version,
|
||||
|
@@ -83,7 +83,7 @@ func TestAggregateCreator_NewAggregate(t *testing.T) {
|
||||
creator: &AggregateCreator{serviceName: "admin"},
|
||||
wantErr: false,
|
||||
want: &Aggregate{
|
||||
id: "hodor",
|
||||
ID: "hodor",
|
||||
Events: make([]*Event, 0, 2),
|
||||
typ: "user",
|
||||
version: "v1.0.0",
|
||||
|
@@ -114,7 +114,7 @@ func TestAggregate_Validate(t *testing.T) {
|
||||
name: "no type error",
|
||||
wantErr: true,
|
||||
fields: fields{aggregate: &Aggregate{
|
||||
id: "aggID",
|
||||
ID: "aggID",
|
||||
version: "v1.0.0",
|
||||
editorService: "svc",
|
||||
editorUser: "hodor",
|
||||
@@ -135,7 +135,7 @@ func TestAggregate_Validate(t *testing.T) {
|
||||
name: "invalid version error",
|
||||
wantErr: true,
|
||||
fields: fields{aggregate: &Aggregate{
|
||||
id: "aggID",
|
||||
ID: "aggID",
|
||||
typ: "user",
|
||||
editorService: "svc",
|
||||
editorUser: "hodor",
|
||||
@@ -156,7 +156,7 @@ func TestAggregate_Validate(t *testing.T) {
|
||||
name: "no query in precondition error",
|
||||
wantErr: true,
|
||||
fields: fields{aggregate: &Aggregate{
|
||||
id: "aggID",
|
||||
ID: "aggID",
|
||||
typ: "user",
|
||||
version: "v1.0.0",
|
||||
editorService: "svc",
|
||||
@@ -182,7 +182,7 @@ func TestAggregate_Validate(t *testing.T) {
|
||||
name: "no func in precondition error",
|
||||
wantErr: true,
|
||||
fields: fields{aggregate: &Aggregate{
|
||||
id: "aggID",
|
||||
ID: "aggID",
|
||||
typ: "user",
|
||||
version: "v1.0.0",
|
||||
editorService: "svc",
|
||||
@@ -208,7 +208,7 @@ func TestAggregate_Validate(t *testing.T) {
|
||||
name: "validation without precondition ok",
|
||||
wantErr: false,
|
||||
fields: fields{aggregate: &Aggregate{
|
||||
id: "aggID",
|
||||
ID: "aggID",
|
||||
typ: "user",
|
||||
version: "v1.0.0",
|
||||
editorService: "svc",
|
||||
@@ -231,7 +231,7 @@ func TestAggregate_Validate(t *testing.T) {
|
||||
name: "validation with precondition ok",
|
||||
wantErr: false,
|
||||
fields: fields{aggregate: &Aggregate{
|
||||
id: "aggID",
|
||||
ID: "aggID",
|
||||
typ: "user",
|
||||
version: "v1.0.0",
|
||||
editorService: "svc",
|
||||
|
@@ -1,6 +1,8 @@
|
||||
package models
|
||||
|
||||
import "github.com/caos/zitadel/internal/errors"
|
||||
import (
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
)
|
||||
|
||||
type SearchQuery struct {
|
||||
Limit uint64
|
||||
@@ -33,6 +35,10 @@ func (q *SearchQuery) AggregateIDFilter(id string) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_AggregateID, id, Operation_Equals))
|
||||
}
|
||||
|
||||
func (q *SearchQuery) AggregateIDsFilter(ids ...string) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_AggregateID, ids, Operation_In))
|
||||
}
|
||||
|
||||
func (q *SearchQuery) AggregateTypeFilter(types ...AggregateType) *SearchQuery {
|
||||
return q.setFilter(NewFilter(Field_AggregateType, types, Operation_In))
|
||||
}
|
||||
|
Reference in New Issue
Block a user