From 5132ebe07c4b7f87e8b8c9845df9bd41723ae651 Mon Sep 17 00:00:00 2001 From: Fabi <38692350+fgerschwiler@users.noreply.github.com> Date: Tue, 15 Mar 2022 07:19:02 +0100 Subject: [PATCH] feat: add tenant column to eventstore (#3314) * feat: add tenant column to eventstore * feat: read tenant from context on push and filter * Update 07_events_table.sql * pass tenant to queryFactory * fix some query tests * init in tests * add missing sql files Co-authored-by: Livio Amstutz --- cmd/admin/initialise/init.go | 21 ++- cmd/admin/initialise/sql/09_events_table.sql | 5 +- .../initialise/sql/10_system_sequence.sql | 1 + .../sql/11_unique_constraints_table.sql | 5 + cmd/admin/initialise/verify_database.go | 9 +- cmd/admin/initialise/verify_database_test.go | 26 ++-- cmd/admin/initialise/verify_grant.go | 11 +- cmd/admin/initialise/verify_grant_test.go | 45 ++----- cmd/admin/initialise/verify_user.go | 11 +- cmd/admin/initialise/verify_user_test.go | 54 +++----- cmd/admin/initialise/verify_zitadel.go | 49 +++++-- internal/api/authz/context.go | 1 + internal/api/authz/context_mock.go | 8 +- internal/command/org_domain.go | 2 +- internal/command/user_grant_test.go | 66 +++++----- internal/eventstore/aggregate.go | 4 + internal/eventstore/event_base.go | 1 + internal/eventstore/eventstore.go | 10 +- internal/eventstore/eventstore_test.go | 45 +++++-- internal/eventstore/example_test.go | 7 +- .../eventstore/handler/crdb/failed_stmt.go | 4 +- .../eventstore/handler/crdb/handler_stmt.go | 2 +- .../eventstore/handler/handler_projection.go | 32 ++--- internal/eventstore/handler/statement.go | 2 +- internal/eventstore/local_crdb_test.go | 124 +++--------------- internal/eventstore/read_model.go | 4 + internal/eventstore/repository/event.go | 3 + .../eventstore/repository/search_query.go | 2 + internal/eventstore/repository/sql/crdb.go | 31 +++-- .../repository/sql/local_crdb_test.go | 121 ++--------------- internal/eventstore/repository/sql/query.go | 10 +- .../eventstore/repository/sql/query_test.go | 20 +-- internal/eventstore/repository/sql/setup.go | 2 +- internal/eventstore/search_query.go | 18 ++- internal/eventstore/search_query_test.go | 29 +++- internal/eventstore/subscription.go | 1 + .../internal/repository/sql/db_mock_test.go | 22 ++-- .../v1/internal/repository/sql/filter.go | 4 +- .../v1/internal/repository/sql/query.go | 8 +- .../v1/internal/repository/sql/query_test.go | 15 ++- internal/eventstore/v1/models/aggregate.go | 2 + .../eventstore/v1/models/aggregate_creator.go | 2 + internal/eventstore/v1/models/event.go | 1 + internal/eventstore/v1/models/field.go | 1 + internal/eventstore/v1/models/object.go | 4 + internal/eventstore/v1/models/search_query.go | 16 +++ .../eventstore/v1/models/search_query_old.go | 4 + internal/eventstore/v1/query/handler.go | 4 +- internal/eventstore/v1/spooler/config.go | 5 +- internal/eventstore/v1/spooler/spooler.go | 15 +-- internal/eventstore/write_model.go | 4 + 51 files changed, 414 insertions(+), 479 deletions(-) create mode 100644 cmd/admin/initialise/sql/10_system_sequence.sql create mode 100644 cmd/admin/initialise/sql/11_unique_constraints_table.sql diff --git a/cmd/admin/initialise/init.go b/cmd/admin/initialise/init.go index e8af457b14..a45e008912 100644 --- a/cmd/admin/initialise/init.go +++ b/cmd/admin/initialise/init.go @@ -5,12 +5,13 @@ import ( _ "embed" "github.com/caos/logging" - "github.com/caos/zitadel/internal/database" "github.com/spf13/cobra" "github.com/spf13/viper" //sql import _ "github.com/lib/pq" + + "github.com/caos/zitadel/internal/database" ) func New() *cobra.Command { @@ -33,9 +34,9 @@ The user provided by flags needs priviledge to return err } if err := initialise(config, - verifyUser(config.Database), - verifyDatabase(config.Database), - verifyGrant(config.Database), + VerifyUser(config.Database.User.Username, config.Database.User.Password), + VerifyDatabase(config.Database.Database), + VerifyGrant(config.Database.Database, config.Database.User.Username), ); err != nil { return err } @@ -55,12 +56,18 @@ func initialise(config Config, steps ...func(*sql.DB) error) error { if err != nil { return err } + err = Initialise(db, steps...) + if err != nil { + return err + } + return db.Close() +} +func Initialise(db *sql.DB, steps ...func(*sql.DB) error) error { for _, step := range steps { - if err = step(db); err != nil { + if err := step(db); err != nil { return err } } - - return db.Close() + return nil } diff --git a/cmd/admin/initialise/sql/09_events_table.sql b/cmd/admin/initialise/sql/09_events_table.sql index 4a2497ab92..e31cc8c722 100644 --- a/cmd/admin/initialise/sql/09_events_table.sql +++ b/cmd/admin/initialise/sql/09_events_table.sql @@ -12,13 +12,14 @@ CREATE TABLE eventstore.events ( , editor_user TEXT NOT NULL , editor_service TEXT NOT NULL , resource_owner TEXT NOT NULL + , tenant TEXT , PRIMARY KEY (event_sequence DESC) USING HASH WITH BUCKET_COUNT = 10 , INDEX agg_type_agg_id (aggregate_type, aggregate_id) , INDEX agg_type (aggregate_type) , INDEX agg_type_seq (aggregate_type, event_sequence DESC) - STORING (id, event_type, aggregate_id, aggregate_version, previous_aggregate_sequence, creation_date, event_data, editor_user, editor_service, resource_owner, previous_aggregate_type_sequence) + STORING (id, event_type, aggregate_id, aggregate_version, previous_aggregate_sequence, creation_date, event_data, editor_user, editor_service, resource_owner, tenant, previous_aggregate_type_sequence) , INDEX max_sequence (aggregate_type, aggregate_id, event_sequence DESC) , CONSTRAINT previous_sequence_unique UNIQUE (previous_aggregate_sequence DESC) , CONSTRAINT prev_agg_type_seq_unique UNIQUE(previous_aggregate_type_sequence) -) \ No newline at end of file +) diff --git a/cmd/admin/initialise/sql/10_system_sequence.sql b/cmd/admin/initialise/sql/10_system_sequence.sql new file mode 100644 index 0000000000..25c3821e50 --- /dev/null +++ b/cmd/admin/initialise/sql/10_system_sequence.sql @@ -0,0 +1 @@ +CREATE SEQUENCE eventstore.system_seq diff --git a/cmd/admin/initialise/sql/11_unique_constraints_table.sql b/cmd/admin/initialise/sql/11_unique_constraints_table.sql new file mode 100644 index 0000000000..7f34148bc3 --- /dev/null +++ b/cmd/admin/initialise/sql/11_unique_constraints_table.sql @@ -0,0 +1,5 @@ +CREATE TABLE eventstore.unique_constraints ( + unique_type TEXT, + unique_field TEXT, + PRIMARY KEY (unique_type, unique_field) +) diff --git a/cmd/admin/initialise/verify_database.go b/cmd/admin/initialise/verify_database.go index 32722202f4..178775149f 100644 --- a/cmd/admin/initialise/verify_database.go +++ b/cmd/admin/initialise/verify_database.go @@ -5,7 +5,6 @@ import ( _ "embed" "fmt" - "github.com/caos/zitadel/internal/database" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -36,16 +35,16 @@ The user provided by flags needs priviledge to if err := viper.Unmarshal(&config); err != nil { return err } - return initialise(config, verifyDatabase(config.Database)) + return initialise(config, VerifyDatabase(config.Database.Database)) }, } } -func verifyDatabase(config database.Config) func(*sql.DB) error { +func VerifyDatabase(database string) func(*sql.DB) error { return func(db *sql.DB) error { return verify(db, - exists(searchDatabase, config.Database), - exec(fmt.Sprintf(databaseStmt, config.Database)), + exists(searchDatabase, database), + exec(fmt.Sprintf(databaseStmt, database)), ) } } diff --git a/cmd/admin/initialise/verify_database_test.go b/cmd/admin/initialise/verify_database_test.go index aaf9dca4b0..2dee733a6b 100644 --- a/cmd/admin/initialise/verify_database_test.go +++ b/cmd/admin/initialise/verify_database_test.go @@ -4,14 +4,12 @@ import ( "database/sql" "errors" "testing" - - "github.com/caos/zitadel/internal/database" ) func Test_verifyDB(t *testing.T) { type args struct { - db db - config database.Config + db db + database string } tests := []struct { name string @@ -21,10 +19,8 @@ func Test_verifyDB(t *testing.T) { { name: "exists fails", args: args{ - db: prepareDB(t, expectQueryErr("SELECT EXISTS(SELECT database_name FROM [show databases] WHERE database_name = $1)", sql.ErrConnDone, "zitadel")), - config: database.Config{ - Database: "zitadel", - }, + db: prepareDB(t, expectQueryErr("SELECT EXISTS(SELECT database_name FROM [show databases] WHERE database_name = $1)", sql.ErrConnDone, "zitadel")), + database: "zitadel", }, targetErr: sql.ErrConnDone, }, @@ -35,9 +31,7 @@ func Test_verifyDB(t *testing.T) { expectExists("SELECT EXISTS(SELECT database_name FROM [show databases] WHERE database_name = $1)", false, "zitadel"), expectExec("CREATE DATABASE zitadel", sql.ErrTxDone), ), - config: database.Config{ - Database: "zitadel", - }, + database: "zitadel", }, targetErr: sql.ErrTxDone, }, @@ -48,9 +42,7 @@ func Test_verifyDB(t *testing.T) { expectExists("SELECT EXISTS(SELECT database_name FROM [show databases] WHERE database_name = $1)", false, "zitadel"), expectExec("CREATE DATABASE zitadel", nil), ), - config: database.Config{ - Database: "zitadel", - }, + database: "zitadel", }, targetErr: nil, }, @@ -60,16 +52,14 @@ func Test_verifyDB(t *testing.T) { db: prepareDB(t, expectExists("SELECT EXISTS(SELECT database_name FROM [show databases] WHERE database_name = $1)", true, "zitadel"), ), - config: database.Config{ - Database: "zitadel", - }, + database: "zitadel", }, targetErr: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := verifyDatabase(tt.args.config)(tt.args.db.db); !errors.Is(err, tt.targetErr) { + if err := VerifyDatabase(tt.args.database)(tt.args.db.db); !errors.Is(err, tt.targetErr) { t.Errorf("verifyDB() error = %v, want: %v", err, tt.targetErr) } if err := tt.args.db.mock.ExpectationsWereMet(); err != nil { diff --git a/cmd/admin/initialise/verify_grant.go b/cmd/admin/initialise/verify_grant.go index 84240cf17b..c00969c92f 100644 --- a/cmd/admin/initialise/verify_grant.go +++ b/cmd/admin/initialise/verify_grant.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/caos/logging" - "github.com/caos/zitadel/internal/database" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -31,17 +30,17 @@ Prereqesits: if err := viper.Unmarshal(&config); err != nil { return err } - return initialise(config, verifyGrant(config.Database)) + return initialise(config, VerifyGrant(config.Database.Database, config.Database.User.Username)) }, } } -func verifyGrant(config database.Config) func(*sql.DB) error { +func VerifyGrant(database, username string) func(*sql.DB) error { return func(db *sql.DB) error { - logging.WithFields("user", config.Username).Info("verify grant") + logging.WithFields("user", username).Info("verify grant") return verify(db, - exists(fmt.Sprintf(searchGrant, config.Database), config.Username), - exec(fmt.Sprintf(grantStmt, config.Database, config.Username)), + exists(fmt.Sprintf(searchGrant, database), username), + exec(fmt.Sprintf(grantStmt, database, username)), ) } } diff --git a/cmd/admin/initialise/verify_grant_test.go b/cmd/admin/initialise/verify_grant_test.go index 1455c77c82..a8619927d9 100644 --- a/cmd/admin/initialise/verify_grant_test.go +++ b/cmd/admin/initialise/verify_grant_test.go @@ -4,14 +4,13 @@ import ( "database/sql" "errors" "testing" - - "github.com/caos/zitadel/internal/database" ) func Test_verifyGrant(t *testing.T) { type args struct { - db db - config database.Config + db db + database string + username string } tests := []struct { name string @@ -21,13 +20,9 @@ func Test_verifyGrant(t *testing.T) { { name: "exists fails", args: args{ - db: prepareDB(t, expectQueryErr("SELECT EXISTS(SELECT * FROM [SHOW GRANTS ON DATABASE zitadel] where grantee = $1 AND privilege_type = 'ALL'", sql.ErrConnDone, "zitadel-user")), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + db: prepareDB(t, expectQueryErr("SELECT EXISTS(SELECT * FROM [SHOW GRANTS ON DATABASE zitadel] where grantee = $1 AND privilege_type = 'ALL'", sql.ErrConnDone, "zitadel-user")), + database: "zitadel", + username: "zitadel-user", }, targetErr: sql.ErrConnDone, }, @@ -38,12 +33,8 @@ func Test_verifyGrant(t *testing.T) { expectExists("SELECT EXISTS(SELECT * FROM [SHOW GRANTS ON DATABASE zitadel] where grantee = $1 AND privilege_type = 'ALL'", false, "zitadel-user"), expectExec("GRANT ALL ON DATABASE zitadel TO zitadel-user", sql.ErrTxDone), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + database: "zitadel", + username: "zitadel-user", }, targetErr: sql.ErrTxDone, }, @@ -54,12 +45,8 @@ func Test_verifyGrant(t *testing.T) { expectExists("SELECT EXISTS(SELECT * FROM [SHOW GRANTS ON DATABASE zitadel] where grantee = $1 AND privilege_type = 'ALL'", false, "zitadel-user"), expectExec("GRANT ALL ON DATABASE zitadel TO zitadel-user", nil), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + database: "zitadel", + username: "zitadel-user", }, targetErr: nil, }, @@ -69,20 +56,16 @@ func Test_verifyGrant(t *testing.T) { db: prepareDB(t, expectExists("SELECT EXISTS(SELECT * FROM [SHOW GRANTS ON DATABASE zitadel] where grantee = $1 AND privilege_type = 'ALL'", true, "zitadel-user"), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + database: "zitadel", + username: "zitadel-user", }, targetErr: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := verifyGrant(tt.args.config)(tt.args.db.db); !errors.Is(err, tt.targetErr) { - t.Errorf("verifyGrant() error = %v, want: %v", err, tt.targetErr) + if err := VerifyGrant(tt.args.database, tt.args.username)(tt.args.db.db); !errors.Is(err, tt.targetErr) { + t.Errorf("VerifyGrant() error = %v, want: %v", err, tt.targetErr) } if err := tt.args.db.mock.ExpectationsWereMet(); err != nil { t.Error(err) diff --git a/cmd/admin/initialise/verify_user.go b/cmd/admin/initialise/verify_user.go index 93cab468ab..f57351cb50 100644 --- a/cmd/admin/initialise/verify_user.go +++ b/cmd/admin/initialise/verify_user.go @@ -5,7 +5,6 @@ import ( _ "embed" "github.com/caos/logging" - "github.com/caos/zitadel/internal/database" "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -35,17 +34,17 @@ The user provided by flags needs priviledge to if err := viper.Unmarshal(&config); err != nil { return err } - return initialise(config, verifyUser(config.Database)) + return initialise(config, VerifyUser(config.Database.User.Username, config.Database.User.Password)) }, } } -func verifyUser(config database.Config) func(*sql.DB) error { +func VerifyUser(username, password string) func(*sql.DB) error { return func(db *sql.DB) error { - logging.WithFields("username", config.Username).Info("verify user") + logging.WithFields("username", username).Info("verify user") return verify(db, - exists(searchUser, config.Username), - exec(createUserStmt, config.Username, &sql.NullString{String: config.Password, Valid: config.Password != ""}), + exists(searchUser, username), + exec(createUserStmt, username, &sql.NullString{String: password, Valid: password != ""}), ) } } diff --git a/cmd/admin/initialise/verify_user_test.go b/cmd/admin/initialise/verify_user_test.go index 7330c60369..6d7b272f5a 100644 --- a/cmd/admin/initialise/verify_user_test.go +++ b/cmd/admin/initialise/verify_user_test.go @@ -4,14 +4,13 @@ import ( "database/sql" "errors" "testing" - - "github.com/caos/zitadel/internal/database" ) func Test_verifyUser(t *testing.T) { type args struct { - db db - config database.Config + db db + username string + password string } tests := []struct { name string @@ -21,13 +20,9 @@ func Test_verifyUser(t *testing.T) { { name: "exists fails", args: args{ - db: prepareDB(t, expectQueryErr("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", sql.ErrConnDone, "zitadel-user")), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + db: prepareDB(t, expectQueryErr("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", sql.ErrConnDone, "zitadel-user")), + username: "zitadel-user", + password: "", }, targetErr: sql.ErrConnDone, }, @@ -38,12 +33,8 @@ func Test_verifyUser(t *testing.T) { expectExists("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", false, "zitadel-user"), expectExec("CREATE USER $1 WITH PASSWORD $2", sql.ErrTxDone, "zitadel-user", nil), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + username: "zitadel-user", + password: "", }, targetErr: sql.ErrTxDone, }, @@ -54,12 +45,8 @@ func Test_verifyUser(t *testing.T) { expectExists("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", false, "zitadel-user"), expectExec("CREATE USER $1 WITH PASSWORD $2", nil, "zitadel-user", nil), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + username: "zitadel-user", + password: "", }, targetErr: nil, }, @@ -70,13 +57,8 @@ func Test_verifyUser(t *testing.T) { expectExists("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", false, "zitadel-user"), expectExec("CREATE USER $1 WITH PASSWORD $2", nil, "zitadel-user", "password"), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - Password: "password", - }, - }, + username: "zitadel-user", + password: "password", }, targetErr: nil, }, @@ -86,20 +68,16 @@ func Test_verifyUser(t *testing.T) { db: prepareDB(t, expectExists("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", true, "zitadel-user"), ), - config: database.Config{ - Database: "zitadel", - User: database.User{ - Username: "zitadel-user", - }, - }, + username: "zitadel-user", + password: "", }, targetErr: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := verifyUser(tt.args.config)(tt.args.db.db); !errors.Is(err, tt.targetErr) { - t.Errorf("verifyGrant() error = %v, want: %v", err, tt.targetErr) + if err := VerifyUser(tt.args.username, tt.args.password)(tt.args.db.db); !errors.Is(err, tt.targetErr) { + t.Errorf("VerifyGrant() error = %v, want: %v", err, tt.targetErr) } if err := tt.args.db.mock.ExpectationsWereMet(); err != nil { t.Error(err) diff --git a/cmd/admin/initialise/verify_zitadel.go b/cmd/admin/initialise/verify_zitadel.go index b1702830c8..261dbfd0ef 100644 --- a/cmd/admin/initialise/verify_zitadel.go +++ b/cmd/admin/initialise/verify_zitadel.go @@ -12,16 +12,19 @@ import ( ) const ( - eventstoreSchema = "eventstore" - eventsTable = "events" - projectionsSchema = "projections" - systemSchema = "system" - encryptionKeysTable = "encryption_key" + eventstoreSchema = "eventstore" + eventsTable = "events" + uniqueConstraintsTable = "unique_constraints" + projectionsSchema = "projections" + systemSchema = "system" + encryptionKeysTable = "encryption_keys" ) var ( - searchTable = "SELECT table_name FROM [SHOW TABLES] WHERE table_name = $1" - searchSchema = "SELECT schema_name FROM [SHOW SCHEMAS] WHERE schema_name = $1" + searchSchema = "SELECT schema_name FROM [SHOW SCHEMAS] WHERE schema_name = $1" + searchTable = "SELECT table_name FROM [SHOW TABLES] WHERE table_name = $1" + searchSystemSequence = "SELECT sequence_name FROM [SHOW SEQUENCES] WHERE sequence_name = 'system_seq'" + //go:embed sql/04_eventstore.sql createEventstoreStmt string //go:embed sql/05_projections.sql @@ -34,6 +37,10 @@ var ( enableHashShardedIdx string //go:embed sql/09_events_table.sql createEventsStmt string + //go:embed sql/10_system_sequence.sql + createSystemSequenceStmt string + //go:embed sql/11_unique_constraints_table.sql + createUniqueConstraints string ) func newZitadel() *cobra.Command { @@ -55,13 +62,7 @@ Prereqesits: } } -func verifyZitadel(config database.Config) error { - logging.WithFields("database", config.Database).Info("verify database") - db, err := database.Connect(config) - if err != nil { - return err - } - +func VerifyZitadel(db *sql.DB) error { if err := verify(db, exists(searchSchema, systemSchema), exec(createSystemStmt)); err != nil { return err } @@ -82,6 +83,26 @@ func verifyZitadel(config database.Config) error { return err } + if err := verify(db, exists(searchSystemSequence), exec(createSystemSequenceStmt)); err != nil { + return err + } + + if err := verify(db, exists(searchTable, uniqueConstraintsTable), exec(createUniqueConstraints)); err != nil { + return err + } + return nil +} + +func verifyZitadel(config database.Config) error { + logging.WithFields("database", config.Database).Info("verify database") + db, err := database.Connect(config) + if err != nil { + return err + } + if err := VerifyZitadel(db); err != nil { + return nil + } + return db.Close() } diff --git a/internal/api/authz/context.go b/internal/api/authz/context.go index c16f4e2c2f..2e90a39bdd 100644 --- a/internal/api/authz/context.go +++ b/internal/api/authz/context.go @@ -20,6 +20,7 @@ const ( type CtxData struct { UserID string OrgID string + TenantID string //TODO: Set Tenant ID on some context ProjectID string AgentID string PreferredLanguage string diff --git a/internal/api/authz/context_mock.go b/internal/api/authz/context_mock.go index bf0051c17c..0dde79922f 100644 --- a/internal/api/authz/context_mock.go +++ b/internal/api/authz/context_mock.go @@ -2,11 +2,11 @@ package authz import "context" -func NewMockContext(orgID, userID string) context.Context { - return context.WithValue(context.Background(), dataKey, CtxData{UserID: userID, OrgID: orgID}) +func NewMockContext(tenantID, orgID, userID string) context.Context { + return context.WithValue(context.Background(), dataKey, CtxData{UserID: userID, OrgID: orgID, TenantID: tenantID}) } -func NewMockContextWithPermissions(orgID, userID string, permissions []string) context.Context { - ctx := context.WithValue(context.Background(), dataKey, CtxData{UserID: userID, OrgID: orgID}) +func NewMockContextWithPermissions(tenantID, orgID, userID string, permissions []string) context.Context { + ctx := context.WithValue(context.Background(), dataKey, CtxData{UserID: userID, OrgID: orgID, TenantID: tenantID}) return context.WithValue(ctx, requestPermissionsKey, permissions) } diff --git a/internal/command/org_domain.go b/internal/command/org_domain.go index 79f22b7125..b1153dcd24 100644 --- a/internal/command/org_domain.go +++ b/internal/command/org_domain.go @@ -195,7 +195,7 @@ func (c *Commands) addOrgDomain(ctx context.Context, orgAgg *eventstore.Aggregat for _, userID := range claimedUserIDs { userEvents, _, err := c.userDomainClaimed(ctx, userID) if err != nil { - logging.LogWithFields("COMMAND-nn8Jf", "userid", userID).WithError(err).Warn("could not claim user") + logging.WithFields("userid", userID).WithError(err).Warn("could not claim user") continue } events = append(events, userEvents...) diff --git a/internal/command/user_grant_test.go b/internal/command/user_grant_test.go index ed4c227e69..61f2ce385c 100644 --- a/internal/command/user_grant_test.go +++ b/internal/command/user_grant_test.go @@ -47,7 +47,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", }, @@ -90,7 +90,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -138,7 +138,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -180,7 +180,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -223,7 +223,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -283,7 +283,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -348,7 +348,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "usergrant1"), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -429,7 +429,7 @@ func TestCommandSide_AddUserGrant(t *testing.T) { idGenerator: id_mock.NewIDGeneratorExpectIDs(t, "usergrant1"), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", ProjectID: "project1", @@ -500,7 +500,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ UserID: "user1", }, @@ -549,7 +549,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -573,7 +573,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -605,7 +605,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -662,7 +662,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -722,7 +722,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -776,7 +776,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -831,7 +831,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -903,7 +903,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("org", "user", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "org", "user", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -983,7 +983,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -1079,7 +1079,7 @@ func TestCommandSide_ChangeUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrant: &domain.UserGrant{ ObjectRoot: models.ObjectRoot{ AggregateID: "usergrant1", @@ -1183,7 +1183,7 @@ func TestCommandSide_DeactivateUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1215,7 +1215,7 @@ func TestCommandSide_DeactivateUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1303,7 +1303,7 @@ func TestCommandSide_DeactivateUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1391,7 +1391,7 @@ func TestCommandSide_ReactivateUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1423,7 +1423,7 @@ func TestCommandSide_ReactivateUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1515,7 +1515,7 @@ func TestCommandSide_ReactivateUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1588,7 +1588,7 @@ func TestCommandSide_RemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1620,7 +1620,7 @@ func TestCommandSide_RemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1687,7 +1687,7 @@ func TestCommandSide_RemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1727,7 +1727,7 @@ func TestCommandSide_RemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantID: "usergrant1", resourceOwner: "org1", }, @@ -1800,7 +1800,7 @@ func TestCommandSide_BulkRemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantIDs: []string{"usergrant1", "usergrant2"}, resourceOwner: "org1", }, @@ -1832,7 +1832,7 @@ func TestCommandSide_BulkRemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantIDs: []string{"usergrant1", "usergrant2"}, resourceOwner: "org1", }, @@ -1913,7 +1913,7 @@ func TestCommandSide_BulkRemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantIDs: []string{"usergrant1", "usergrant2"}, resourceOwner: "org1", }, @@ -1967,7 +1967,7 @@ func TestCommandSide_BulkRemoveUserGrant(t *testing.T) { ), }, args: args{ - ctx: authz.NewMockContextWithPermissions("", "", []string{domain.RoleProjectOwner}), + ctx: authz.NewMockContextWithPermissions("", "", "", []string{domain.RoleProjectOwner}), userGrantIDs: []string{"usergrant1", "usergrant2"}, resourceOwner: "org1", }, diff --git a/internal/eventstore/aggregate.go b/internal/eventstore/aggregate.go index c3741339af..98f3b05e50 100644 --- a/internal/eventstore/aggregate.go +++ b/internal/eventstore/aggregate.go @@ -21,6 +21,7 @@ func NewAggregate( ID: id, Type: typ, ResourceOwner: authz.GetCtxData(ctx).OrgID, + Tenant: authz.GetCtxData(ctx).TenantID, Version: version, } @@ -49,6 +50,7 @@ func AggregateFromWriteModel( ID: wm.AggregateID, Type: typ, ResourceOwner: wm.ResourceOwner, + Tenant: wm.Tenant, Version: version, } } @@ -61,6 +63,8 @@ type Aggregate struct { Type AggregateType `json:"-"` //ResourceOwner is the org this aggregates belongs to ResourceOwner string `json:"-"` + //Tenant is the system this aggregate belongs to + Tenant string `json:"-"` //Version is the semver this aggregate represents Version Version `json:"-"` } diff --git a/internal/eventstore/event_base.go b/internal/eventstore/event_base.go index d97654888e..46f3a9304e 100644 --- a/internal/eventstore/event_base.go +++ b/internal/eventstore/event_base.go @@ -79,6 +79,7 @@ func BaseEventFromRepo(event *repository.Event) *BaseEvent { ID: event.AggregateID, Type: AggregateType(event.AggregateType), ResourceOwner: event.ResourceOwner.String, + Tenant: event.Tenant.String, Version: Version(event.Version), }, EventType: EventType(event.Type), diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index 9bc195ae3c..b6f80741ba 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -7,6 +7,7 @@ import ( "reflect" "sync" + "github.com/caos/zitadel/internal/api/authz" "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/repository" ) @@ -40,7 +41,7 @@ func (es *Eventstore) Health(ctx context.Context) error { //Push pushes the events in a single transaction // an event needs at least an aggregate func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error) { - events, constraints, err := commandsToRepository(cmds) + events, constraints, err := commandsToRepository(authz.GetCtxData(ctx).TenantID, cmds) if err != nil { return nil, err } @@ -58,7 +59,7 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error return eventReaders, nil } -func commandsToRepository(cmds []Command) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) { +func commandsToRepository(tenantID string, cmds []Command) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) { events = make([]*repository.Event, len(cmds)) for i, cmd := range cmds { data, err := EventData(cmd) @@ -81,6 +82,7 @@ func commandsToRepository(cmds []Command) (events []*repository.Event, constrain AggregateID: cmd.Aggregate().ID, AggregateType: repository.AggregateType(cmd.Aggregate().Type), ResourceOwner: sql.NullString{String: cmd.Aggregate().ResourceOwner, Valid: cmd.Aggregate().ResourceOwner != ""}, + Tenant: sql.NullString{String: tenantID, Valid: tenantID != ""}, EditorService: cmd.EditorService(), EditorUser: cmd.EditorUser(), Type: repository.EventType(cmd.Type()), @@ -111,7 +113,7 @@ func uniqueConstraintsToRepository(constraints []*EventUniqueConstraint) (unique //Filter filters the stored events based on the searchQuery // and maps the events to the defined event structs func (es *Eventstore) Filter(ctx context.Context, queryFactory *SearchQueryBuilder) ([]Event, error) { - query, err := queryFactory.build() + query, err := queryFactory.build(authz.GetCtxData(ctx).TenantID) if err != nil { return nil, err } @@ -168,7 +170,7 @@ func (es *Eventstore) FilterToReducer(ctx context.Context, searchQuery *SearchQu //LatestSequence filters the latest sequence for the given search query func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryBuilder) (uint64, error) { - query, err := queryFactory.build() + query, err := queryFactory.build(authz.GetCtxData(ctx).TenantID) if err != nil { return 0, err } diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index d055989ba5..48c9310673 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -29,8 +29,8 @@ func newTestEvent(id, description string, data func() interface{}, checkPrevious data: data, shouldCheckPrevious: checkPrevious, BaseEvent: *NewBaseEventForPush( - service.WithService(authz.NewMockContext("resourceOwner", "editorUser"), "editorService"), - NewAggregate(authz.NewMockContext("caos", "adlerhurst"), id, "test.aggregate", "v1"), + service.WithService(authz.NewMockContext("tenant", "resourceOwner", "editorUser"), "editorService"), + NewAggregate(authz.NewMockContext("zitadel", "caos", "adlerhurst"), id, "test.aggregate", "v1"), "test.event", ), } @@ -344,7 +344,8 @@ func Test_eventData(t *testing.T) { func TestEventstore_aggregatesToEvents(t *testing.T) { type args struct { - events []Command + tenantID string + events []Command } type res struct { wantErr bool @@ -358,6 +359,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { { name: "one aggregate one event", args: args{ + tenantID: "tenant", events: []Command{ newTestEvent( "1", @@ -378,6 +380,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "tenant", Valid: true}, Type: "test.event", Version: "v1", }, @@ -387,6 +390,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { { name: "one aggregate multiple events", args: args{ + tenantID: "tenant", events: []Command{ newTestEvent( "1", @@ -414,6 +418,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "tenant", Valid: true}, Type: "test.event", Version: "v1", }, @@ -424,6 +429,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "tenant", Valid: true}, Type: "test.event", Version: "v1", }, @@ -433,6 +439,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { { name: "invalid data", args: args{ + tenantID: "tenant", events: []Command{ newTestEvent( "1", @@ -453,9 +460,9 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { events: []Command{ &testEvent{ BaseEvent: *NewBaseEventForPush( - service.WithService(authz.NewMockContext("resourceOwner", "editorUser"), "editorService"), + service.WithService(authz.NewMockContext("tenant", "resourceOwner", "editorUser"), "editorService"), NewAggregate( - authz.NewMockContext("caos", "adlerhurst"), + authz.NewMockContext("zitadel", "caos", "adlerhurst"), "", "test.aggregate", "v1", @@ -478,9 +485,9 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { events: []Command{ &testEvent{ BaseEvent: *NewBaseEventForPush( - service.WithService(authz.NewMockContext("resourceOwner", "editorUser"), "editorService"), + service.WithService(authz.NewMockContext("tenant", "resourceOwner", "editorUser"), "editorService"), NewAggregate( - authz.NewMockContext("caos", "adlerhurst"), + authz.NewMockContext("zitadel", "caos", "adlerhurst"), "id", "", "v1", @@ -503,9 +510,9 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { events: []Command{ &testEvent{ BaseEvent: *NewBaseEventForPush( - service.WithService(authz.NewMockContext("resourceOwner", "editorUser"), "editorService"), + service.WithService(authz.NewMockContext("tenant", "resourceOwner", "editorUser"), "editorService"), NewAggregate( - authz.NewMockContext("caos", "adlerhurst"), + authz.NewMockContext("zitadel", "caos", "adlerhurst"), "id", "test.aggregate", "", @@ -528,9 +535,9 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { events: []Command{ &testEvent{ BaseEvent: *NewBaseEventForPush( - service.WithService(authz.NewMockContext("resourceOwner", "editorUser"), "editorService"), + service.WithService(authz.NewMockContext("tenant", "resourceOwner", "editorUser"), "editorService"), NewAggregate( - authz.NewMockContext("caos", "adlerhurst"), + authz.NewMockContext("zitadel", "caos", "adlerhurst"), "id", "test.aggregate", "v1", @@ -553,9 +560,9 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { events: []Command{ &testEvent{ BaseEvent: *NewBaseEventForPush( - service.WithService(authz.NewMockContext("", "editorUser"), "editorService"), + service.WithService(authz.NewMockContext("tenant", "", "editorUser"), "editorService"), NewAggregate( - authz.NewMockContext("", "adlerhurst"), + authz.NewMockContext("zitadel", "", "adlerhurst"), "id", "test.aggregate", "v1", @@ -578,6 +585,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "", Valid: false}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -622,6 +630,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -632,6 +641,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -644,6 +654,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -654,7 +665,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - events, _, err := commandsToRepository(tt.args.events) + events, _, err := commandsToRepository(tt.args.tenantID, tt.args.events) if (err != nil) != tt.res.wantErr { t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr) return @@ -761,6 +772,7 @@ func TestEventstore_Push(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -804,6 +816,7 @@ func TestEventstore_Push(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -814,6 +827,7 @@ func TestEventstore_Push(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -868,6 +882,7 @@ func TestEventstore_Push(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -878,6 +893,7 @@ func TestEventstore_Push(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, @@ -890,6 +906,7 @@ func TestEventstore_Push(t *testing.T) { EditorService: "editorService", EditorUser: "editorUser", ResourceOwner: sql.NullString{String: "caos", Valid: true}, + Tenant: sql.NullString{String: "zitadel"}, Type: "test.event", Version: "v1", }, diff --git a/internal/eventstore/example_test.go b/internal/eventstore/example_test.go index 86583b4919..08ac86e608 100644 --- a/internal/eventstore/example_test.go +++ b/internal/eventstore/example_test.go @@ -3,7 +3,6 @@ package eventstore_test import ( "context" "encoding/json" - "fmt" "testing" "time" @@ -18,7 +17,7 @@ import ( // ------------------------------------------------------------ func NewUserAggregate(id string) *eventstore.Aggregate { return eventstore.NewAggregate( - authz.NewMockContext("caos", "adlerhurst"), + authz.NewMockContext("zitadel", "caos", "adlerhurst"), id, "test.user", "v1", @@ -308,7 +307,7 @@ func TestUserReadModel(t *testing.T) { events = append(events, nil) - fmt.Printf("%+v\n", events) + t.Logf("%+v\n", events) users := UsersReadModel{} @@ -316,5 +315,5 @@ func TestUserReadModel(t *testing.T) { if err != nil { t.Errorf("unexpected error on filter to reducer: %v", err) } - fmt.Printf("%+v", users) + t.Logf("%+v", users) } diff --git a/internal/eventstore/handler/crdb/failed_stmt.go b/internal/eventstore/handler/crdb/failed_stmt.go index c509f8e95a..15040672c9 100644 --- a/internal/eventstore/handler/crdb/failed_stmt.go +++ b/internal/eventstore/handler/crdb/failed_stmt.go @@ -23,12 +23,12 @@ const ( func (h *StatementHandler) handleFailedStmt(tx *sql.Tx, stmt *handler.Statement, execErr error) (shouldContinue bool) { failureCount, err := h.failureCount(tx, stmt.Sequence) if err != nil { - logging.LogWithFields("CRDB-WJaFk", "projection", h.ProjectionName, "seq", stmt.Sequence).WithError(err).Warn("unable to get failure count") + logging.WithFields("projection", h.ProjectionName, "seq", stmt.Sequence).WithError(err).Warn("unable to get failure count") return false } failureCount += 1 err = h.setFailureCount(tx, stmt.Sequence, failureCount, execErr) - logging.LogWithFields("CRDB-cI0dB", "projection", h.ProjectionName, "seq", stmt.Sequence).OnError(err).Warn("unable to update failure count") + logging.WithFields("projection", h.ProjectionName, "seq", stmt.Sequence).OnError(err).Warn("unable to update failure count") return failureCount >= h.maxFailureCount } diff --git a/internal/eventstore/handler/crdb/handler_stmt.go b/internal/eventstore/handler/crdb/handler_stmt.go index d73baed817..dceba869d7 100644 --- a/internal/eventstore/handler/crdb/handler_stmt.go +++ b/internal/eventstore/handler/crdb/handler_stmt.go @@ -214,7 +214,7 @@ func (h *StatementHandler) executeStmts( continue } if stmt.PreviousSequence > 0 && stmt.PreviousSequence != sequences[stmt.AggregateType] { - logging.LogWithFields("CRDB-jJBJn", "projection", h.ProjectionName, "aggregateType", stmt.AggregateType, "seq", stmt.Sequence, "prevSeq", stmt.PreviousSequence, "currentSeq", sequences[stmt.AggregateType]).Warn("sequences do not match") + logging.WithFields("projection", h.ProjectionName, "aggregateType", stmt.AggregateType, "seq", stmt.Sequence, "prevSeq", stmt.PreviousSequence, "currentSeq", sequences[stmt.AggregateType]).Warn("sequences do not match") break } err := h.executeStmt(tx, stmt) diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index b7b7cee968..ea243ed552 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -69,12 +69,12 @@ func NewProjectionHandler(config ProjectionHandlerConfig) *ProjectionHandler { if !h.shouldBulk.Stop() { <-h.shouldBulk.C } - logging.LogWithFields("HANDL-mC9Xx", "projection", h.ProjectionName).Info("starting handler without requeue") + logging.WithFields("projection", h.ProjectionName).Info("starting handler without requeue") return h } else if config.RequeueEvery < 500*time.Millisecond { - logging.LogWithFields("HANDL-IEFsG", "projection", h.ProjectionName).Fatal("requeue every must be greater 500ms or <= 0") + logging.WithFields("projection", h.ProjectionName).Fatal("requeue every must be greater 500ms or <= 0") } - logging.LogWithFields("HANDL-fAC5O", "projection", h.ProjectionName).Info("starting handler") + logging.WithFields("projection", h.ProjectionName).Info("starting handler") return h } @@ -107,7 +107,7 @@ func (h *ProjectionHandler) Process( //handle panic defer func() { cause := recover() - logging.LogWithFields("HANDL-utWkv", "projection", h.ProjectionName, "cause", cause, "stack", string(debug.Stack())).Error("projection handler paniced") + logging.WithFields("projection", h.ProjectionName, "cause", cause, "stack", string(debug.Stack())).Error("projection handler paniced") }() execBulk := h.prepareExecuteBulk(query, reduce, update) @@ -121,7 +121,7 @@ func (h *ProjectionHandler) Process( return case event := <-h.Handler.EventQueue: if err := h.processEvent(ctx, event, reduce); err != nil { - logging.LogWithFields("HANDL-TUk5J", "projection", h.ProjectionName).WithError(err).Warn("process failed") + logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("process failed") continue } h.triggerShouldPush(0) @@ -139,7 +139,7 @@ func (h *ProjectionHandler) Process( return case event := <-h.Handler.EventQueue: if err := h.processEvent(ctx, event, reduce); err != nil { - logging.LogWithFields("HANDL-horKq", "projection", h.ProjectionName).WithError(err).Warn("process failed") + logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("process failed") continue } h.triggerShouldPush(0) @@ -161,7 +161,7 @@ func (h *ProjectionHandler) processEvent( ) error { stmt, err := reduce(event) if err != nil { - logging.Log("EVENT-PTr4j").WithError(err).Warn("unable to process event") + logging.New().WithError(err).Warn("unable to process event") return err } @@ -185,16 +185,16 @@ func (h *ProjectionHandler) bulk( errs := lock(ctx, h.requeueAfter) //wait until projection is locked if err, ok := <-errs; err != nil || !ok { - logging.LogWithFields("HANDL-XDJ4i", "projection", h.ProjectionName).OnError(err).Warn("initial lock failed") + logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("initial lock failed") return err } go h.cancelOnErr(ctx, errs, cancel) execErr := executeBulk(ctx) - logging.LogWithFields("EVENT-gwiu4", "projection", h.ProjectionName).OnError(execErr).Warn("unable to execute") + logging.WithFields("projection", h.ProjectionName).OnError(execErr).Warn("unable to execute") unlockErr := unlock() - logging.LogWithFields("EVENT-boPv1", "projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock") + logging.WithFields("projection", h.ProjectionName).OnError(unlockErr).Warn("unable to unlock") if execErr != nil { return execErr @@ -208,7 +208,7 @@ func (h *ProjectionHandler) cancelOnErr(ctx context.Context, errs <-chan error, select { case err := <-errs: if err != nil { - logging.LogWithFields("HANDL-cVop2", "projection", h.ProjectionName).WithError(err).Warn("bulk canceled") + logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("bulk canceled") cancel() return } @@ -235,7 +235,7 @@ func (h *ProjectionHandler) prepareExecuteBulk( default: hasLimitExeeded, err := h.fetchBulkStmts(ctx, query, reduce) if err != nil || len(h.stmts) == 0 { - logging.LogWithFields("HANDL-CzQvn", "projection", h.ProjectionName).OnError(err).Warn("unable to fetch stmts") + logging.WithFields("projection", h.ProjectionName).OnError(err).Warn("unable to fetch stmts") return err } @@ -258,19 +258,19 @@ func (h *ProjectionHandler) fetchBulkStmts( ) (limitExeeded bool, err error) { eventQuery, eventsLimit, err := query() if err != nil { - logging.LogWithFields("HANDL-x6qvs", "projection", h.ProjectionName).WithError(err).Warn("unable to create event query") + logging.WithFields("projection", h.ProjectionName).WithError(err).Warn("unable to create event query") return false, err } events, err := h.Eventstore.Filter(ctx, eventQuery) if err != nil { - logging.LogWithFields("HANDL-X8vlo", "projection", h.ProjectionName).WithError(err).Info("Unable to bulk fetch events") + logging.WithFields("projection", h.ProjectionName).WithError(err).Info("Unable to bulk fetch events") return false, err } for _, event := range events { if err = h.processEvent(ctx, event, reduce); err != nil { - logging.LogWithFields("HANDL-PaKlz", "projection", h.ProjectionName, "seq", event.Sequence()).WithError(err).Warn("unable to process event in bulk") + logging.WithFields("projection", h.ProjectionName, "seq", event.Sequence()).WithError(err).Warn("unable to process event in bulk") return false, err } } @@ -313,5 +313,5 @@ func (h *ProjectionHandler) shutdown() { if !h.shouldPush.Stop() { <-h.shouldPush.C } - logging.Log("EVENT-XG5Og").Info("stop processing") + logging.New().Info("stop processing") } diff --git a/internal/eventstore/handler/statement.go b/internal/eventstore/handler/statement.go index 887d050eb6..ff7e2b41af 100644 --- a/internal/eventstore/handler/statement.go +++ b/internal/eventstore/handler/statement.go @@ -55,7 +55,7 @@ func NewCol(name string, value interface{}) Column { func NewJSONCol(name string, value interface{}) Column { marshalled, err := json.Marshal(value) if err != nil { - logging.LogWithFields("HANDL-oFvsl", "column", name).WithError(err).Panic("unable to marshal column") + logging.WithFields("column", name).WithError(err).Panic("unable to marshal column") } return NewCol(name, marshalled) diff --git a/internal/eventstore/local_crdb_test.go b/internal/eventstore/local_crdb_test.go index fe8ffedfaa..64ee62f23b 100644 --- a/internal/eventstore/local_crdb_test.go +++ b/internal/eventstore/local_crdb_test.go @@ -2,33 +2,34 @@ package eventstore_test import ( "database/sql" - "fmt" - "io/ioutil" "os" - "path/filepath" - "sort" - "strconv" - "strings" "testing" "github.com/caos/logging" "github.com/cockroachdb/cockroach-go/v2/testserver" + + "github.com/caos/zitadel/cmd/admin/initialise" ) var ( - migrationsPath = os.ExpandEnv("${GOPATH}/src/github.com/caos/zitadel/migrations/cockroach") testCRDBClient *sql.DB ) func TestMain(m *testing.M) { ts, err := testserver.NewTestServer() if err != nil { - logging.LogWithFields("REPOS-RvjLG", "error", err).Fatal("unable to start db") + logging.WithFields("error", err).Fatal("unable to start db") } testCRDBClient, err = sql.Open("postgres", ts.PGURL().String()) if err != nil { - logging.LogWithFields("REPOS-CF6dQ", "error", err).Fatal("unable to connect to db") + logging.WithFields("error", err).Fatal("unable to connect to db") + } + if err != nil { + logging.WithFields("error", err).Fatal("unable to connect to db") + } + if err = testCRDBClient.Ping(); err != nil { + logging.WithFields("error", err).Fatal("unable to ping db") } defer func() { @@ -36,108 +37,21 @@ func TestMain(m *testing.M) { ts.Stop() }() - if err = executeMigrations(); err != nil { - logging.LogWithFields("REPOS-jehDD", "error", err).Fatal("migrations failed") + if err = initDB(testCRDBClient); err != nil { + logging.WithFields("error", err).Fatal("migrations failed") } os.Exit(m.Run()) } -func executeMigrations() error { - files, err := migrationFilePaths() +func initDB(db *sql.DB) error { + username := "zitadel" + database := "zitadel" + err := initialise.Initialise(db, initialise.VerifyUser(username, ""), + initialise.VerifyDatabase(database), + initialise.VerifyGrant(database, username)) if err != nil { return err } - sort.Sort(files) - if err = setPasswordNULL(); err != nil { - return err - } - if err = createFlywayHistory(); err != nil { - return err - } - for _, file := range files { - migrationData, err := ioutil.ReadFile(file) - if err != nil { - return err - } - migration := os.ExpandEnv(string(migrationData)) - if _, err = testCRDBClient.Exec(migration); err != nil { - return fmt.Errorf("exec file: %v || err: %w", file, err) - } - } - return nil -} - -func setPasswordNULL() error { - passwordNames := []string{ - "eventstorepassword", - "managementpassword", - "adminapipassword", - "authpassword", - "notificationpassword", - "authzpassword", - "queriespassword", - } - for _, name := range passwordNames { - if err := os.Setenv(name, "NULL"); err != nil { - return err - } - } - return nil -} - -func createFlywayHistory() error { - _, err := testCRDBClient.Exec("CREATE TABLE defaultdb.flyway_schema_history(id TEXT, PRIMARY KEY(id));") - return err -} - -type migrationPaths []string - -type version struct { - major int - minor int -} - -func versionFromPath(s string) version { - v := s[strings.Index(s, "/V")+2 : strings.Index(s, "__")] - splitted := strings.Split(v, ".") - res := version{} - var err error - if len(splitted) >= 1 { - res.major, err = strconv.Atoi(splitted[0]) - if err != nil { - panic(err) - } - } - - if len(splitted) >= 2 { - res.minor, err = strconv.Atoi(splitted[1]) - if err != nil { - panic(err) - } - } - - return res -} - -func (a migrationPaths) Len() int { return len(a) } -func (a migrationPaths) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a migrationPaths) Less(i, j int) bool { - versionI := versionFromPath(a[i]) - versionJ := versionFromPath(a[j]) - - return versionI.major < versionJ.major || - (versionI.major == versionJ.major && versionI.minor < versionJ.minor) -} - -func migrationFilePaths() (migrationPaths, error) { - files := make(migrationPaths, 0) - err := filepath.Walk(migrationsPath, func(path string, info os.FileInfo, err error) error { - if err != nil || info.IsDir() || !strings.HasSuffix(info.Name(), ".sql") { - return err - } - files = append(files, path) - return nil - }) - return files, err + return initialise.VerifyZitadel(db) } diff --git a/internal/eventstore/read_model.go b/internal/eventstore/read_model.go index 81a0d25a0d..7af27f3259 100644 --- a/internal/eventstore/read_model.go +++ b/internal/eventstore/read_model.go @@ -12,6 +12,7 @@ type ReadModel struct { ChangeDate time.Time `json:"-"` Events []Event `json:"-"` ResourceOwner string `json:"-"` + Tenant string `json:"-"` } //AppendEvents adds all the events to the read model. @@ -34,6 +35,9 @@ func (rm *ReadModel) Reduce() error { if rm.ResourceOwner == "" { rm.ResourceOwner = rm.Events[0].Aggregate().ResourceOwner } + if rm.Tenant == "" { + rm.Tenant = rm.Events[0].Aggregate().Tenant + } if rm.CreationDate.IsZero() { rm.CreationDate = rm.Events[0].CreationDate() diff --git a/internal/eventstore/repository/event.go b/internal/eventstore/repository/event.go index 428339b089..3d613fe8b7 100644 --- a/internal/eventstore/repository/event.go +++ b/internal/eventstore/repository/event.go @@ -56,6 +56,9 @@ type Event struct { // an aggregate can only be managed by one organisation // use the ID of the org ResourceOwner sql.NullString + //Tenant is the system where this event belongs to + // use the ID of the tenant + Tenant sql.NullString } //EventType is the description of the change diff --git a/internal/eventstore/repository/search_query.go b/internal/eventstore/repository/search_query.go index b62bac3987..0190036daf 100644 --- a/internal/eventstore/repository/search_query.go +++ b/internal/eventstore/repository/search_query.go @@ -66,6 +66,8 @@ const ( FieldSequence //FieldResourceOwner represents the resource owner field FieldResourceOwner + //FieldTenant represents the tenant field + FieldTenant //FieldEditorService represents the editor service field FieldEditorService //FieldEditorUser represents the editor user field diff --git a/internal/eventstore/repository/sql/crdb.go b/internal/eventstore/repository/sql/crdb.go index 8e5d91bfff..ef3ea2935c 100644 --- a/internal/eventstore/repository/sql/crdb.go +++ b/internal/eventstore/repository/sql/crdb.go @@ -9,10 +9,11 @@ import ( "strings" "github.com/caos/logging" - caos_errs "github.com/caos/zitadel/internal/errors" - "github.com/caos/zitadel/internal/eventstore/repository" "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/lib/pq" + + caos_errs "github.com/caos/zitadel/internal/errors" + "github.com/caos/zitadel/internal/eventstore/repository" ) const ( @@ -29,6 +30,7 @@ const ( " SELECT MAX(event_sequence) seq, 1 join_me" + " FROM eventstore.events" + " WHERE aggregate_type = $2" + + " AND (CASE WHEN $9::STRING IS NULL THEN tenant is null else tenant = $9::STRING END)" + ") AS agg_type " + // combined with "LEFT JOIN " + @@ -37,6 +39,7 @@ const ( " SELECT event_sequence seq, resource_owner ro, 1 join_me" + " FROM eventstore.events" + " WHERE aggregate_type = $2 AND aggregate_id = $3" + + " AND (CASE WHEN $9::STRING IS NULL THEN tenant is null else tenant = $9::STRING END)" + " ORDER BY event_sequence DESC" + " LIMIT 1" + ") AS agg USING(join_me)" + @@ -51,6 +54,8 @@ const ( " editor_user," + " editor_service," + " resource_owner," + + " tenant," + + " event_sequence," + " previous_aggregate_sequence," + " previous_aggregate_type_sequence" + ") " + @@ -64,11 +69,13 @@ const ( " $5::JSONB AS event_data," + " $6::VARCHAR AS editor_user," + " $7::VARCHAR AS editor_service," + - " IFNULL((resource_owner), $8::VARCHAR) AS resource_owner," + + " IFNULL((resource_owner), $8::VARCHAR) AS resource_owner," + + " $9::VARCHAR AS tenant," + + " NEXTVAL(CONCAT('eventstore.', IFNULL($9, 'system'), '_seq'))," + " aggregate_sequence AS previous_aggregate_sequence," + " aggregate_type_sequence AS previous_aggregate_type_sequence " + "FROM previous_data " + - "RETURNING id, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, creation_date, resource_owner" + "RETURNING id, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, creation_date, resource_owner, tenant" uniqueInsert = `INSERT INTO eventstore.unique_constraints ( @@ -113,17 +120,20 @@ func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueCons event.EditorUser, event.EditorService, event.ResourceOwner, - ).Scan(&event.ID, &event.Sequence, &previousAggregateSequence, &previousAggregateTypeSequence, &event.CreationDate, &event.ResourceOwner) + event.Tenant, + ).Scan(&event.ID, &event.Sequence, &previousAggregateSequence, &previousAggregateTypeSequence, &event.CreationDate, &event.ResourceOwner, &event.Tenant) event.PreviousAggregateSequence = uint64(previousAggregateSequence) event.PreviousAggregateTypeSequence = uint64(previousAggregateTypeSequence) if err != nil { - logging.LogWithFields("SQL-NOqH7", + logging.WithFields( "aggregate", event.AggregateType, "aggregateId", event.AggregateID, "aggregateType", event.AggregateType, - "eventType", event.Type).WithError(err).Info("query failed") + "eventType", event.Type, + "tenant", event.Tenant, + ).WithError(err).Info("query failed") return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event") } } @@ -152,7 +162,7 @@ func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueC if uniqueConstraint.Action == repository.UniqueConstraintAdd { _, err := tx.ExecContext(ctx, uniqueInsert, uniqueConstraint.UniqueType, uniqueConstraint.UniqueField) if err != nil { - logging.LogWithFields("SQL-IP3js", + logging.WithFields( "unique_type", uniqueConstraint.UniqueType, "unique_field", uniqueConstraint.UniqueField).WithError(err).Info("insert unique constraint failed") @@ -165,7 +175,7 @@ func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueC } else if uniqueConstraint.Action == repository.UniqueConstraintRemoved { _, err := tx.ExecContext(ctx, uniqueDelete, uniqueConstraint.UniqueType, uniqueConstraint.UniqueField) if err != nil { - logging.LogWithFields("SQL-M0vsf", + logging.WithFields( "unique_type", uniqueConstraint.UniqueType, "unique_field", uniqueConstraint.UniqueField).WithError(err).Info("delete unique constraint failed") return caos_errs.ThrowInternal(err, "SQL-6n88i", "unable to remove unique constraint ") @@ -219,6 +229,7 @@ func (db *CRDB) eventQuery() string { ", editor_service" + ", editor_user" + ", resource_owner" + + ", tenant" + ", aggregate_type" + ", aggregate_id" + ", aggregate_version" + @@ -239,6 +250,8 @@ func (db *CRDB) columnName(col repository.Field) string { return "event_sequence" case repository.FieldResourceOwner: return "resource_owner" + case repository.FieldTenant: + return "tenant" case repository.FieldEditorService: return "editor_service" case repository.FieldEditorUser: diff --git a/internal/eventstore/repository/sql/local_crdb_test.go b/internal/eventstore/repository/sql/local_crdb_test.go index e67c314c4a..799026f932 100644 --- a/internal/eventstore/repository/sql/local_crdb_test.go +++ b/internal/eventstore/repository/sql/local_crdb_test.go @@ -2,37 +2,31 @@ package sql import ( "database/sql" - "fmt" - "io/ioutil" "os" - "path/filepath" - "sort" - "strconv" - "strings" "testing" "github.com/caos/logging" "github.com/cockroachdb/cockroach-go/v2/testserver" + + "github.com/caos/zitadel/cmd/admin/initialise" ) var ( - migrationsPath = os.ExpandEnv("${GOPATH}/src/github.com/caos/zitadel/migrations/cockroach") testCRDBClient *sql.DB ) func TestMain(m *testing.M) { ts, err := testserver.NewTestServer() if err != nil { - logging.LogWithFields("REPOS-RvjLG", "error", err).Fatal("unable to start db") + logging.WithFields("error", err).Fatal("unable to start db") } testCRDBClient, err = sql.Open("postgres", ts.PGURL().String()) - if err != nil { - logging.LogWithFields("REPOS-CF6dQ", "error", err).Fatal("unable to connect to db") + logging.WithFields("error", err).Fatal("unable to connect to db") } if err = testCRDBClient.Ping(); err != nil { - logging.LogWithFields("REPOS-CF6dQ", "error", err).Fatal("unable to ping db") + logging.WithFields("error", err).Fatal("unable to ping db") } defer func() { @@ -40,113 +34,26 @@ func TestMain(m *testing.M) { ts.Stop() }() - if err = executeMigrations(); err != nil { - logging.LogWithFields("REPOS-jehDD", "error", err).Fatal("migrations failed") + if err = initDB(testCRDBClient); err != nil { + logging.WithFields("error", err).Fatal("migrations failed") } os.Exit(m.Run()) } -func executeMigrations() error { - files, err := migrationFilePaths() +func initDB(db *sql.DB) error { + username := "zitadel" + database := "zitadel" + err := initialise.Initialise(db, initialise.VerifyUser(username, ""), + initialise.VerifyDatabase(database), + initialise.VerifyGrant(database, username)) if err != nil { return err } - sort.Sort(files) - if err = setPasswordNULL(); err != nil { - return err - } - if err = createFlywayHistory(); err != nil { - return err - } - for _, file := range files { - migrationData, err := ioutil.ReadFile(file) - if err != nil { - return err - } - migration := os.ExpandEnv(string(migrationData)) - if _, err = testCRDBClient.Exec(migration); err != nil { - return fmt.Errorf("exec file: %v || err: %w", file, err) - } - } - return nil -} - -func setPasswordNULL() error { - passwordNames := []string{ - "eventstorepassword", - "managementpassword", - "adminapipassword", - "authpassword", - "notificationpassword", - "authzpassword", - "queriespassword", - } - for _, name := range passwordNames { - if err := os.Setenv(name, "NULL"); err != nil { - return err - } - } - return nil -} - -func createFlywayHistory() error { - _, err := testCRDBClient.Exec("CREATE TABLE defaultdb.flyway_schema_history(id TEXT, PRIMARY KEY(id));") - return err + return initialise.VerifyZitadel(db) } func fillUniqueData(unique_type, field string) error { _, err := testCRDBClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field) VALUES ($1, $2)", unique_type, field) return err } - -type migrationPaths []string - -type version struct { - major int - minor int -} - -func versionFromPath(s string) version { - v := s[strings.Index(s, "/V")+2 : strings.Index(s, "__")] - splitted := strings.Split(v, ".") - res := version{} - var err error - if len(splitted) >= 1 { - res.major, err = strconv.Atoi(splitted[0]) - if err != nil { - panic(err) - } - } - - if len(splitted) >= 2 { - res.minor, err = strconv.Atoi(splitted[1]) - if err != nil { - panic(err) - } - } - - return res -} - -func (a migrationPaths) Len() int { return len(a) } -func (a migrationPaths) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a migrationPaths) Less(i, j int) bool { - versionI := versionFromPath(a[i]) - versionJ := versionFromPath(a[j]) - - return versionI.major < versionJ.major || - (versionI.major == versionJ.major && versionI.minor < versionJ.minor) -} - -func migrationFilePaths() (migrationPaths, error) { - files := make(migrationPaths, 0) - err := filepath.Walk(migrationsPath, func(path string, info os.FileInfo, err error) error { - if err != nil || info.IsDir() || !strings.HasSuffix(info.Name(), ".sql") { - return err - } - files = append(files, path) - return nil - }) - return files, err -} diff --git a/internal/eventstore/repository/sql/query.go b/internal/eventstore/repository/sql/query.go index 285ad46e73..be6672de14 100644 --- a/internal/eventstore/repository/sql/query.go +++ b/internal/eventstore/repository/sql/query.go @@ -9,9 +9,10 @@ import ( "strings" "github.com/caos/logging" + "github.com/lib/pq" + z_errors "github.com/caos/zitadel/internal/errors" "github.com/caos/zitadel/internal/eventstore/repository" - "github.com/lib/pq" ) type querier interface { @@ -48,7 +49,7 @@ func query(ctx context.Context, criteria querier, searchQuery *repository.Search rows, err := criteria.db().QueryContext(ctx, query, values...) if err != nil { - logging.Log("SQL-HP3Uk").WithError(err).Info("query failed") + logging.New().WithError(err).Info("query failed") return z_errors.ThrowInternal(err, "SQL-KyeAx", "unable to filter events") } defer rows.Close() @@ -108,13 +109,14 @@ func eventsScanner(scanner scan, dest interface{}) (err error) { &event.EditorService, &event.EditorUser, &event.ResourceOwner, + &event.Tenant, &event.AggregateType, &event.AggregateID, &event.Version, ) if err != nil { - logging.Log("SQL-3mofs").WithError(err).Warn("unable to scan row") + logging.New().WithError(err).Warn("unable to scan row") return z_errors.ThrowInternal(err, "SQL-M0dsf", "unable to scan row") } @@ -147,7 +149,7 @@ func prepareCondition(criteria querier, filters [][]*repository.Filter) (clause var err error value, err = json.Marshal(value) if err != nil { - logging.Log("SQL-BSsNy").WithError(err).Warn("unable to marshal search value") + logging.New().WithError(err).Warn("unable to marshal search value") continue } } diff --git a/internal/eventstore/repository/sql/query_test.go b/internal/eventstore/repository/sql/query_test.go index 60ed92698b..8f0e7dd6c1 100644 --- a/internal/eventstore/repository/sql/query_test.go +++ b/internal/eventstore/repository/sql/query_test.go @@ -130,13 +130,13 @@ func Test_prepareColumns(t *testing.T) { dest: &[]*repository.Event{}, }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", expected: []*repository.Event{ {AggregateID: "hodor", AggregateType: "user", Sequence: 5, Data: make(Data, 0)}, }, }, fields: fields{ - dbRow: []interface{}{time.Time{}, repository.EventType(""), uint64(5), Sequence(0), Sequence(0), Data(nil), "", "", sql.NullString{String: ""}, repository.AggregateType("user"), "hodor", repository.Version("")}, + dbRow: []interface{}{time.Time{}, repository.EventType(""), uint64(5), Sequence(0), Sequence(0), Data(nil), "", "", sql.NullString{String: ""}, sql.NullString{String: ""}, repository.AggregateType("user"), "hodor", repository.Version("")}, }, }, { @@ -146,7 +146,7 @@ func Test_prepareColumns(t *testing.T) { dest: []*repository.Event{}, }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", dbErr: errors.IsErrorInvalidArgument, }, }, @@ -158,7 +158,7 @@ func Test_prepareColumns(t *testing.T) { dbErr: sql.ErrConnDone, }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", dbErr: errors.IsInternal, }, }, @@ -592,7 +592,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery(t, - `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`, + `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`, []driver.Value{repository.AggregateType("user")}, ), }, @@ -621,7 +621,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery(t, - `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence LIMIT \$2`, + `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence LIMIT \$2`, []driver.Value{repository.AggregateType("user"), uint64(5)}, ), }, @@ -650,7 +650,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery(t, - `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`, + `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`, []driver.Value{repository.AggregateType("user"), uint64(5)}, ), }, @@ -679,7 +679,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQueryErr(t, - `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`, + `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`, []driver.Value{repository.AggregateType("user")}, sql.ErrConnDone), }, @@ -708,7 +708,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery(t, - `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`, + `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`, []driver.Value{repository.AggregateType("user")}, &repository.Event{Sequence: 100}), }, @@ -776,7 +776,7 @@ func Test_query_events_mocked(t *testing.T) { }, fields: fields{ mock: newMockClient(t).expectQuery(t, - `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) OR \( aggregate_type = \$2 AND aggregate_id = \$3 \) ORDER BY event_sequence DESC LIMIT \$4`, + `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) OR \( aggregate_type = \$2 AND aggregate_id = \$3 \) ORDER BY event_sequence DESC LIMIT \$4`, []driver.Value{repository.AggregateType("user"), repository.AggregateType("org"), "asdf42", uint64(5)}, ), }, diff --git a/internal/eventstore/repository/sql/setup.go b/internal/eventstore/repository/sql/setup.go index 4f57d4f2cc..b73cb0dc4a 100644 --- a/internal/eventstore/repository/sql/setup.go +++ b/internal/eventstore/repository/sql/setup.go @@ -52,7 +52,7 @@ func (db *CRDB) Step20(ctx context.Context, latestSequence uint64) error { if err = tx.Commit(); err != nil { return err } - logging.LogWithFields("SQL-bXVwS", "currentSeq", currentSequence, "events", len(events)).Info("events updated") + logging.WithFields("currentSeq", currentSequence, "events", len(events)).Info("events updated") } return nil } diff --git a/internal/eventstore/search_query.go b/internal/eventstore/search_query.go index 5d8a752c2b..927f1099b3 100644 --- a/internal/eventstore/search_query.go +++ b/internal/eventstore/search_query.go @@ -12,6 +12,7 @@ type SearchQueryBuilder struct { limit uint64 desc bool resourceOwner string + tenant string queries []*SearchQuery } @@ -67,6 +68,12 @@ func (factory *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQu return factory } +//Tenant defines the tenant (system) of the events +func (factory *SearchQueryBuilder) Tenant(tenant string) *SearchQueryBuilder { + factory.tenant = tenant + return factory +} + //OrderDesc changes the sorting order of the returned events to descending func (factory *SearchQueryBuilder) OrderDesc() *SearchQueryBuilder { factory.desc = true @@ -138,12 +145,13 @@ func (query *SearchQuery) Builder() *SearchQueryBuilder { return query.builder } -func (builder *SearchQueryBuilder) build() (*repository.SearchQuery, error) { +func (builder *SearchQueryBuilder) build(tenantID string) (*repository.SearchQuery, error) { if builder == nil || len(builder.queries) < 1 || builder.columns.Validate() != nil { return nil, errors.ThrowPreconditionFailed(nil, "MODEL-4m9gs", "builder invalid") } + builder.tenant = tenantID filters := make([][]*repository.Filter, len(builder.queries)) for i, query := range builder.queries { @@ -155,6 +163,7 @@ func (builder *SearchQueryBuilder) build() (*repository.SearchQuery, error) { query.eventSequenceGreaterFilter, query.eventSequenceLessFilter, query.builder.resourceOwnerFilter, + query.builder.tenantFilter, } { if filter := f(); filter != nil { if err := filter.Validate(); err != nil { @@ -238,6 +247,13 @@ func (builder *SearchQueryBuilder) resourceOwnerFilter() *repository.Filter { return repository.NewFilter(repository.FieldResourceOwner, builder.resourceOwner, repository.OperationEquals) } +func (builder *SearchQueryBuilder) tenantFilter() *repository.Filter { + if builder.tenant == "" { + return nil + } + return repository.NewFilter(repository.FieldTenant, builder.tenant, repository.OperationEquals) +} + func (query *SearchQuery) eventDataFilter() *repository.Filter { if len(query.eventData) == 0 { return nil diff --git a/internal/eventstore/search_query_test.go b/internal/eventstore/search_query_test.go index 3693f66374..ac137a12e6 100644 --- a/internal/eventstore/search_query_test.go +++ b/internal/eventstore/search_query_test.go @@ -226,6 +226,7 @@ func TestSearchQuerybuilderBuild(t *testing.T) { type args struct { columns Columns setters []func(*SearchQueryBuilder) *SearchQueryBuilder + tenant string } type res struct { isErr func(err error) bool @@ -620,6 +621,32 @@ func TestSearchQuerybuilderBuild(t *testing.T) { }, }, }, + { + name: "filter aggregate type and tenant", + args: args{ + columns: ColumnsEvent, + setters: []func(*SearchQueryBuilder) *SearchQueryBuilder{ + testAddQuery( + testSetAggregateTypes("user"), + ), + }, + tenant: "tenant", + }, + res: res{ + isErr: nil, + query: &repository.SearchQuery{ + Columns: repository.ColumnsEvent, + Desc: false, + Limit: 0, + Filters: [][]*repository.Filter{ + { + repository.NewFilter(repository.FieldAggregateType, repository.AggregateType("user"), repository.OperationEquals), + repository.NewFilter(repository.FieldTenant, "tenant", repository.OperationEquals), + }, + }, + }, + }, + }, { name: "column invalid", args: args{ @@ -641,7 +668,7 @@ func TestSearchQuerybuilderBuild(t *testing.T) { for _, f := range tt.args.setters { builder = f(builder) } - query, err := builder.build() + query, err := builder.build(tt.args.tenant) if tt.res.isErr != nil && !tt.res.isErr(err) { t.Errorf("wrong error(%T): %v", err, err) return diff --git a/internal/eventstore/subscription.go b/internal/eventstore/subscription.go index 94e231b989..6204116f60 100644 --- a/internal/eventstore/subscription.go +++ b/internal/eventstore/subscription.go @@ -122,6 +122,7 @@ func mapEventToV1Event(event Event) *models.Event { AggregateType: models.AggregateType(event.Aggregate().Type), AggregateID: event.Aggregate().ID, ResourceOwner: event.Aggregate().ResourceOwner, + Tenant: event.Aggregate().Tenant, EditorService: event.EditorService(), EditorUser: event.EditorUser(), Data: event.DataAsBytes(), diff --git a/internal/eventstore/v1/internal/repository/sql/db_mock_test.go b/internal/eventstore/v1/internal/repository/sql/db_mock_test.go index daf6227929..9254a1f7f4 100644 --- a/internal/eventstore/v1/internal/repository/sql/db_mock_test.go +++ b/internal/eventstore/v1/internal/repository/sql/db_mock_test.go @@ -11,11 +11,11 @@ import ( ) const ( - selectEscaped = `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore\.events WHERE aggregate_type = \$1` + selectEscaped = `SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore\.events WHERE aggregate_type = \$1` ) var ( - eventColumns = []string{"creation_date", "event_type", "event_sequence", "previous_aggregate_sequence", "event_data", "editor_service", "editor_user", "resource_owner", "aggregate_type", "aggregate_id", "aggregate_version"} + eventColumns = []string{"creation_date", "event_type", "event_sequence", "previous_aggregate_sequence", "event_data", "editor_service", "editor_user", "resource_owner", "tenant", "aggregate_type", "aggregate_id", "aggregate_version"} expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence LIMIT \$2`).String() expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence DESC`).String() expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 ORDER BY event_sequence LIMIT \$3`).String() @@ -23,10 +23,10 @@ var ( expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence`).String() expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` + - `\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_aggregate_sequence, previous_aggregate_type_sequence\) ` + - `SELECT \$1, \$2, \$3, \$4, COALESCE\(\$5, now\(\)\), \$6, \$7, \$8, \$9, \$10 ` + + `\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, tenant, previous_aggregate_sequence, previous_aggregate_type_sequence\) ` + + `SELECT \$1, \$2, \$3, \$4, COALESCE\(\$5, now\(\)\), \$6, \$7, \$8, \$9, \$10, \$11 ` + `WHERE EXISTS \(` + - `SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$11 AND aggregate_id = \$12 HAVING MAX\(event_sequence\) = \$13 OR \(\$14::BIGINT IS NULL AND COUNT\(\*\) = 0\)\) ` + + `SELECT 1 FROM eventstore\.events WHERE aggregate_type = \$12 AND aggregate_id = \$13 HAVING MAX\(event_sequence\) = \$14 OR \(\$14::BIGINT IS NULL AND COUNT\(\*\) = 0\)\) ` + `RETURNING event_sequence, creation_date`).String() ) @@ -99,7 +99,7 @@ func (db *dbMock) expectRollback(err error) *dbMock { func (db *dbMock) expectInsertEvent(e *models.Event, returnedSequence uint64) *dbMock { db.mock.ExpectQuery(expectedInsertStatement). WithArgs( - e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence), + e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, e.Tenant, Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), Sequence(e.PreviousSequence), ). WillReturnRows( @@ -113,7 +113,7 @@ func (db *dbMock) expectInsertEvent(e *models.Event, returnedSequence uint64) *d func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock { db.mock.ExpectQuery(expectedInsertStatement). WithArgs( - e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, Sequence(e.PreviousSequence), + e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), Data(e.Data), e.EditorUser, e.EditorService, e.ResourceOwner, e.Tenant, Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence), Sequence(e.PreviousSequence), ). WillReturnError(sql.ErrTxDone) @@ -124,7 +124,7 @@ func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock { func (db *dbMock) expectFilterEventsLimit(aggregateType string, limit uint64, eventCount int) *dbMock { rows := sqlmock.NewRows(eventColumns) for i := 0; i < eventCount; i++ { - rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0") + rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "tenant", "aggType", "aggID", "v1.0.0") } db.mock.ExpectQuery(expectedFilterEventsLimitFormat). WithArgs(aggregateType, limit). @@ -135,7 +135,7 @@ func (db *dbMock) expectFilterEventsLimit(aggregateType string, limit uint64, ev func (db *dbMock) expectFilterEventsDesc(aggregateType string, eventCount int) *dbMock { rows := sqlmock.NewRows(eventColumns) for i := eventCount; i > 0; i-- { - rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0") + rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "tenant", "aggType", "aggID", "v1.0.0") } db.mock.ExpectQuery(expectedFilterEventsDescFormat). WillReturnRows(rows) @@ -145,7 +145,7 @@ func (db *dbMock) expectFilterEventsDesc(aggregateType string, eventCount int) * func (db *dbMock) expectFilterEventsAggregateIDLimit(aggregateType, aggregateID string, limit uint64) *dbMock { rows := sqlmock.NewRows(eventColumns) for i := limit; i > 0; i-- { - rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0") + rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "tenant", "aggType", "aggID", "v1.0.0") } db.mock.ExpectQuery(expectedFilterEventsAggregateIDLimit). WithArgs(aggregateType, aggregateID, limit). @@ -156,7 +156,7 @@ func (db *dbMock) expectFilterEventsAggregateIDLimit(aggregateType, aggregateID func (db *dbMock) expectFilterEventsAggregateIDTypeLimit(aggregateType, aggregateID string, limit uint64) *dbMock { rows := sqlmock.NewRows(eventColumns) for i := limit; i > 0; i-- { - rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0") + rows.AddRow(time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "tenant", "aggType", "aggID", "v1.0.0") } db.mock.ExpectQuery(expectedFilterEventsAggregateIDTypeLimit). WithArgs(aggregateType, aggregateID, limit). diff --git a/internal/eventstore/v1/internal/repository/sql/filter.go b/internal/eventstore/v1/internal/repository/sql/filter.go index 0b365eac77..993e926c76 100644 --- a/internal/eventstore/v1/internal/repository/sql/filter.go +++ b/internal/eventstore/v1/internal/repository/sql/filter.go @@ -26,7 +26,7 @@ func filter(querier Querier, searchQuery *es_models.SearchQueryFactory) (events rows, err := querier.Query(query, values...) if err != nil { - logging.Log("SQL-HP3Uk").WithError(err).Info("query failed") + logging.New().WithError(err).Info("query failed") return nil, errors.ThrowInternal(err, "SQL-IJuyR", "unable to filter events") } defer rows.Close() @@ -55,7 +55,7 @@ func (db *SQL) LatestSequence(ctx context.Context, queryFactory *es_models.Searc sequence := new(Sequence) err := rowScanner(row.Scan, sequence) if err != nil { - logging.Log("SQL-WsxTg").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Info("query failed") + logging.New().WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Info("query failed") return 0, errors.ThrowInternal(err, "SQL-Yczyx", "unable to filter latest sequence") } return uint64(*sequence), nil diff --git a/internal/eventstore/v1/internal/repository/sql/query.go b/internal/eventstore/v1/internal/repository/sql/query.go index 539488cd72..ef4f38c1eb 100644 --- a/internal/eventstore/v1/internal/repository/sql/query.go +++ b/internal/eventstore/v1/internal/repository/sql/query.go @@ -23,6 +23,7 @@ const ( ", editor_service" + ", editor_user" + ", resource_owner" + + ", tenant" + ", aggregate_type" + ", aggregate_id" + ", aggregate_version" + @@ -32,7 +33,7 @@ const ( func buildQuery(queryFactory *es_models.SearchQueryFactory) (query string, limit uint64, values []interface{}, rowScanner func(s scan, dest interface{}) error) { searchQuery, err := queryFactory.Build() if err != nil { - logging.Log("SQL-cshKu").WithError(err).Warn("search query factory invalid") + logging.New().WithError(err).Warn("search query factory invalid") return "", 0, nil, nil } query, rowScanner = prepareColumns(searchQuery.Columns) @@ -116,13 +117,14 @@ func prepareColumns(columns es_models.Columns) (string, func(s scan, dest interf &event.EditorService, &event.EditorUser, &event.ResourceOwner, + &event.Tenant, &event.AggregateType, &event.AggregateID, &event.AggregateVersion, ) if err != nil { - logging.Log("SQL-kn1Sw").WithError(err).Warn("unable to scan row") + logging.New().WithError(err).Warn("unable to scan row") return z_errors.ThrowInternal(err, "SQL-J0hFS", "unable to scan row") } @@ -175,6 +177,8 @@ func getField(field es_models.Field) string { return "event_sequence" case es_models.Field_ResourceOwner: return "resource_owner" + case es_models.Field_Tenant: + return "tenant" case es_models.Field_EditorService: return "editor_service" case es_models.Field_EditorUser: diff --git a/internal/eventstore/v1/internal/repository/sql/query_test.go b/internal/eventstore/v1/internal/repository/sql/query_test.go index b61fa860f6..4a7cae5ec6 100644 --- a/internal/eventstore/v1/internal/repository/sql/query_test.go +++ b/internal/eventstore/v1/internal/repository/sql/query_test.go @@ -80,6 +80,7 @@ func Test_getField(t *testing.T) { es_models.Field_AggregateID: "aggregate_id", es_models.Field_LatestSequence: "event_sequence", es_models.Field_ResourceOwner: "resource_owner", + es_models.Field_Tenant: "tenant", es_models.Field_EditorService: "editor_service", es_models.Field_EditorUser: "editor_user", es_models.Field_EventType: "event_type", @@ -234,8 +235,8 @@ func Test_prepareColumns(t *testing.T) { dest: new(es_models.Event), }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", - dbRow: []interface{}{time.Time{}, es_models.EventType(""), uint64(5), Sequence(0), Data(nil), "", "", "", es_models.AggregateType("user"), "hodor", es_models.Version("")}, + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", + dbRow: []interface{}{time.Time{}, es_models.EventType(""), uint64(5), Sequence(0), Data(nil), "", "", "", "", es_models.AggregateType("user"), "hodor", es_models.Version("")}, expected: es_models.Event{AggregateID: "hodor", AggregateType: "user", Sequence: 5, Data: make(Data, 0)}, }, }, @@ -246,7 +247,7 @@ func Test_prepareColumns(t *testing.T) { dest: new(uint64), }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", dbErr: errors.IsErrorInvalidArgument, }, }, @@ -258,7 +259,7 @@ func Test_prepareColumns(t *testing.T) { dbErr: sql.ErrConnDone, }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events", dbErr: errors.IsInternal, }, }, @@ -429,7 +430,7 @@ func Test_buildQuery(t *testing.T) { queryFactory: es_models.NewSearchQueryFactory("user").OrderDesc(), }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC", rowScanner: true, values: []interface{}{es_models.AggregateType("user")}, }, @@ -440,7 +441,7 @@ func Test_buildQuery(t *testing.T) { queryFactory: es_models.NewSearchQueryFactory("user").Limit(5), }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence LIMIT $2", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence LIMIT $2", rowScanner: true, values: []interface{}{es_models.AggregateType("user"), uint64(5)}, limit: 5, @@ -452,7 +453,7 @@ func Test_buildQuery(t *testing.T) { queryFactory: es_models.NewSearchQueryFactory("user").Limit(5).OrderDesc(), }, res: res{ - query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2", + query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, tenant, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = $1 ORDER BY event_sequence DESC LIMIT $2", rowScanner: true, values: []interface{}{es_models.AggregateType("user"), uint64(5)}, limit: 5, diff --git a/internal/eventstore/v1/models/aggregate.go b/internal/eventstore/v1/models/aggregate.go index a0a5c88779..c6e0849068 100644 --- a/internal/eventstore/v1/models/aggregate.go +++ b/internal/eventstore/v1/models/aggregate.go @@ -23,6 +23,7 @@ type Aggregate struct { editorService string editorUser string resourceOwner string + tenant string Events []*Event Precondition *precondition } @@ -55,6 +56,7 @@ func (a *Aggregate) AppendEvent(typ EventType, payload interface{}) (*Aggregate, EditorService: a.editorService, EditorUser: a.editorUser, ResourceOwner: a.resourceOwner, + Tenant: a.tenant, } a.Events = append(a.Events, e) diff --git a/internal/eventstore/v1/models/aggregate_creator.go b/internal/eventstore/v1/models/aggregate_creator.go index c602741259..31942a9275 100644 --- a/internal/eventstore/v1/models/aggregate_creator.go +++ b/internal/eventstore/v1/models/aggregate_creator.go @@ -20,6 +20,7 @@ func (c *AggregateCreator) NewAggregate(ctx context.Context, id string, typ Aggr ctxData := authz.GetCtxData(ctx) editorUser := ctxData.UserID resourceOwner := ctxData.OrgID + tenant := ctxData.TenantID aggregate := &Aggregate{ ID: id, @@ -30,6 +31,7 @@ func (c *AggregateCreator) NewAggregate(ctx context.Context, id string, typ Aggr editorService: c.serviceName, editorUser: editorUser, resourceOwner: resourceOwner, + tenant: tenant, } for _, opt := range opts { diff --git a/internal/eventstore/v1/models/event.go b/internal/eventstore/v1/models/event.go index d366836b71..a497d5c090 100644 --- a/internal/eventstore/v1/models/event.go +++ b/internal/eventstore/v1/models/event.go @@ -28,6 +28,7 @@ type Event struct { EditorService string EditorUser string ResourceOwner string + Tenant string } func eventData(i interface{}) ([]byte, error) { diff --git a/internal/eventstore/v1/models/field.go b/internal/eventstore/v1/models/field.go index a14057faf7..d4df34c3ea 100644 --- a/internal/eventstore/v1/models/field.go +++ b/internal/eventstore/v1/models/field.go @@ -11,4 +11,5 @@ const ( Field_EditorUser Field_EventType Field_CreationDate + Field_Tenant ) diff --git a/internal/eventstore/v1/models/object.go b/internal/eventstore/v1/models/object.go index 931014f73b..fb5a06e7eb 100644 --- a/internal/eventstore/v1/models/object.go +++ b/internal/eventstore/v1/models/object.go @@ -8,6 +8,7 @@ type ObjectRoot struct { AggregateID string `json:"-"` Sequence uint64 `json:"-"` ResourceOwner string `json:"-"` + Tenant string `json:"-"` CreationDate time.Time `json:"-"` ChangeDate time.Time `json:"-"` } @@ -21,6 +22,9 @@ func (o *ObjectRoot) AppendEvent(event *Event) { if o.ResourceOwner == "" { o.ResourceOwner = event.ResourceOwner } + if o.Tenant == "" { + o.Tenant = event.Tenant + } o.ChangeDate = event.CreationDate if o.CreationDate.IsZero() { diff --git a/internal/eventstore/v1/models/search_query.go b/internal/eventstore/v1/models/search_query.go index 232a17b3ba..e07e510a9e 100644 --- a/internal/eventstore/v1/models/search_query.go +++ b/internal/eventstore/v1/models/search_query.go @@ -17,6 +17,7 @@ type SearchQueryFactory struct { sequenceTo uint64 eventTypes []EventType resourceOwner string + tenant string creationDate time.Time } @@ -62,6 +63,8 @@ func FactoryFromSearchQuery(query *SearchQuery) *SearchQueryFactory { } case Field_ResourceOwner: factory = factory.ResourceOwner(filter.value.(string)) + case Field_Tenant: + factory = factory.Tenant(filter.value.(string)) case Field_EventType: factory = factory.EventTypes(filter.value.([]EventType)...) case Field_EditorService, Field_EditorUser: @@ -120,6 +123,11 @@ func (factory *SearchQueryFactory) ResourceOwner(resourceOwner string) *SearchQu return factory } +func (factory *SearchQueryFactory) Tenant(tenant string) *SearchQueryFactory { + factory.tenant = tenant + return factory +} + func (factory *SearchQueryFactory) CreationDateNewer(time time.Time) *SearchQueryFactory { factory.creationDate = time return factory @@ -151,6 +159,7 @@ func (factory *SearchQueryFactory) Build() (*searchQuery, error) { factory.sequenceToFilter, factory.eventTypeFilter, factory.resourceOwnerFilter, + factory.tenantFilter, factory.creationDateNewerFilter, } { if filter := f(); filter != nil { @@ -222,6 +231,13 @@ func (factory *SearchQueryFactory) resourceOwnerFilter() *Filter { return NewFilter(Field_ResourceOwner, factory.resourceOwner, Operation_Equals) } +func (factory *SearchQueryFactory) tenantFilter() *Filter { + if factory.tenant == "" { + return nil + } + return NewFilter(Field_Tenant, factory.tenant, Operation_Equals) +} + func (factory *SearchQueryFactory) creationDateNewerFilter() *Filter { if factory.creationDate.IsZero() { return nil diff --git a/internal/eventstore/v1/models/search_query_old.go b/internal/eventstore/v1/models/search_query_old.go index b9137c017e..ade859813a 100644 --- a/internal/eventstore/v1/models/search_query_old.go +++ b/internal/eventstore/v1/models/search_query_old.go @@ -69,6 +69,10 @@ func (q *SearchQuery) ResourceOwnerFilter(resourceOwner string) *SearchQuery { return q.setFilter(NewFilter(Field_ResourceOwner, resourceOwner, Operation_Equals)) } +func (q *SearchQuery) TenantFilter(tenant string) *SearchQuery { + return q.setFilter(NewFilter(Field_Tenant, tenant, Operation_Equals)) +} + func (q *SearchQuery) CreationDateNewerFilter(time time.Time) *SearchQuery { return q.setFilter(NewFilter(Field_CreationDate, time, Operation_Greater)) } diff --git a/internal/eventstore/v1/query/handler.go b/internal/eventstore/v1/query/handler.go index 9b56ea55ae..dc745aea60 100755 --- a/internal/eventstore/v1/query/handler.go +++ b/internal/eventstore/v1/query/handler.go @@ -43,7 +43,7 @@ func ReduceEvent(handler Handler, event *models.Event) { }() currentSequence, err := handler.CurrentSequence() if err != nil { - logging.Log("HANDL-BmpkC").WithError(err).Warn("unable to get current sequence") + logging.New().WithError(err).Warn("unable to get current sequence") return } @@ -65,7 +65,7 @@ func ReduceEvent(handler Handler, event *models.Event) { return } if unprocessedEvent.Sequence < currentSequence { - logging.LogWithFields("QUERY-DOYVN", + logging.WithFields( "unprocessed", unprocessedEvent.Sequence, "current", currentSequence, "view", handler.ViewModel()). diff --git a/internal/eventstore/v1/spooler/config.go b/internal/eventstore/v1/spooler/config.go index 9d864be695..1d5d1f02f2 100644 --- a/internal/eventstore/v1/spooler/config.go +++ b/internal/eventstore/v1/spooler/config.go @@ -1,11 +1,12 @@ package spooler import ( - "github.com/caos/zitadel/internal/eventstore/v1" "math/rand" "os" "github.com/caos/logging" + + v1 "github.com/caos/zitadel/internal/eventstore/v1" "github.com/caos/zitadel/internal/eventstore/v1/query" "github.com/caos/zitadel/internal/id" ) @@ -21,7 +22,7 @@ func (c *Config) New() *Spooler { lockID, err := os.Hostname() if err != nil || lockID == "" { lockID, err = id.SonyFlakeGenerator.Next() - logging.Log("SPOOL-bdO56").OnError(err).Panic("unable to generate lockID") + logging.OnError(err).Panic("unable to generate lockID") } //shuffle the handlers for better balance when running multiple pods diff --git a/internal/eventstore/v1/spooler/spooler.go b/internal/eventstore/v1/spooler/spooler.go index 1b632d6d58..0a28808878 100644 --- a/internal/eventstore/v1/spooler/spooler.go +++ b/internal/eventstore/v1/spooler/spooler.go @@ -2,15 +2,14 @@ package spooler import ( "context" - - "github.com/getsentry/sentry-go" - - "github.com/caos/zitadel/internal/eventstore/v1" "strconv" "sync" "time" "github.com/caos/logging" + "github.com/getsentry/sentry-go" + + v1 "github.com/caos/zitadel/internal/eventstore/v1" "github.com/caos/zitadel/internal/eventstore/v1/models" "github.com/caos/zitadel/internal/eventstore/v1/query" "github.com/caos/zitadel/internal/telemetry/tracing" @@ -38,7 +37,7 @@ type spooledHandler struct { } func (s *Spooler) Start() { - defer logging.LogWithFields("SPOOL-N0V1g", "lockerID", s.lockID, "workers", s.workers).Info("spooler started") + defer logging.WithFields("lockerID", s.lockID, "workers", s.workers).Info("spooler started") if s.workers < 1 { return } @@ -116,7 +115,7 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo for i, event := range events { select { case <-ctx.Done(): - logging.LogWithFields("SPOOL-FTKwH", "view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).Debug("context canceled") + logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).Debug("context canceled") return nil default: if err := s.Reduce(event); err != nil { @@ -130,7 +129,7 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo } } err := s.OnSuccess() - logging.LogWithFields("SPOOL-49ods", "view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func") + logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func") return err } @@ -141,7 +140,7 @@ func (s *spooledHandler) query(ctx context.Context) ([]*models.Event, error) { } factory := models.FactoryFromSearchQuery(query) sequence, err := s.eventstore.LatestSequence(ctx, factory) - logging.Log("SPOOL-7SciK").OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("unable to query latest sequence") + logging.OnError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("unable to query latest sequence") var processedSequence uint64 for _, filter := range query.Filters { if filter.GetField() == models.Field_LatestSequence { diff --git a/internal/eventstore/write_model.go b/internal/eventstore/write_model.go index 7ad372fabc..f99df62bf9 100644 --- a/internal/eventstore/write_model.go +++ b/internal/eventstore/write_model.go @@ -10,6 +10,7 @@ type WriteModel struct { ProcessedSequence uint64 `json:"-"` Events []Event `json:"-"` ResourceOwner string `json:"-"` + Tenant string `json:"-"` ChangeDate time.Time `json:"-"` } @@ -32,6 +33,9 @@ func (wm *WriteModel) Reduce() error { if wm.ResourceOwner == "" { wm.ResourceOwner = wm.Events[0].Aggregate().ResourceOwner } + if wm.Tenant == "" { + wm.Tenant = wm.Events[0].Aggregate().Tenant + } wm.ProcessedSequence = wm.Events[len(wm.Events)-1].Sequence() wm.ChangeDate = wm.Events[len(wm.Events)-1].CreationDate()