diff --git a/backend/v3/domain/organization.go b/backend/v3/domain/organization.go new file mode 100644 index 0000000000..3d99aaacb2 --- /dev/null +++ b/backend/v3/domain/organization.go @@ -0,0 +1,73 @@ +package domain + +import ( + "context" + "time" + + "github.com/zitadel/zitadel/backend/v3/storage/database" +) + +var State []string = []string{ + "ACTIVE", + "INACTIVE", +} + +type Organization struct { + ID string `json:"id,omitempty" db:"id"` + Name string `json:"name,omitempty" db:"name"` + InstanceID string `json:"instance_id,omitempty" db:"instance_id"` + State string `json:"state,omitempty" db:"state"` + CreatedAt time.Time `json:"created_at,omitempty" db:"created_at"` + UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at"` + DeletedAt *time.Time `json:"deleted_at,omitempty" db:"deleted_at"` +} + +// organizationColumns define all the columns of the instance table. +type organizationColumns interface { + // IDColumn returns the column for the id field. + IDColumn() database.Column + // NameColumn returns the column for the name field. + NameColumn() database.Column + // InstanceIDColumn returns the column for the default org id field + InstanceIDColumn() database.Column + // StateColumn returns the column for the name field. + StateColumn() database.Column + // CreatedAtColumn returns the column for the created at field. + CreatedAtColumn() database.Column + // UpdatedAtColumn returns the column for the updated at field. + UpdatedAtColumn() database.Column + // DeletedAtColumn returns the column for the deleted at field. + DeletedAtColumn() database.Column +} + +// organizationConditions define all the conditions for the instance table. +type organizationConditions interface { + // IDCondition returns an equal filter on the id field. + IDCondition(instanceID string) database.Condition + // NameCondition returns a filter on the name field. + NameCondition(op database.TextOperation, name string) database.Condition +} + +// organizationChanges define all the changes for the instance table. +type organizationChanges interface { + // SetName sets the name column. + SetName(name string) database.Change +} + +// OrganizationRepository is the interface for the instance repository. +type OrganizationRepository interface { + organizationColumns + organizationConditions + organizationChanges + + Get(ctx context.Context, opts ...database.Condition) (*Organization, error) + List(ctx context.Context, opts ...database.Condition) ([]Organization, error) + + Create(ctx context.Context, instance *Organization) error + Update(ctx context.Context, condition database.Condition, changes ...database.Change) (int64, error) + Delete(ctx context.Context, condition database.Condition) error +} + +type CreateOrganization struct { + Name string `json:"name"` +} diff --git a/backend/v3/storage/database/dialect/postgres/migration/002_organization_table.go b/backend/v3/storage/database/dialect/postgres/migration/002_organization_table.go new file mode 100644 index 0000000000..2f0a04eee0 --- /dev/null +++ b/backend/v3/storage/database/dialect/postgres/migration/002_organization_table.go @@ -0,0 +1,16 @@ +package migration + +import ( + _ "embed" +) + +var ( + //go:embed 002_organization_table/up.sql + up002OrganizationTable string + //go:embed 002_organization_table/down.sql + down002OrganizationTable string +) + +func init() { + registerSQLMigration(2, up002OrganizationTable, down002OrganizationTable) +} diff --git a/backend/v3/storage/database/dialect/postgres/migration/002_organization_table/down.sql b/backend/v3/storage/database/dialect/postgres/migration/002_organization_table/down.sql new file mode 100644 index 0000000000..654858cdac --- /dev/null +++ b/backend/v3/storage/database/dialect/postgres/migration/002_organization_table/down.sql @@ -0,0 +1,2 @@ +DROP TABLE zitadel.organizations; +DROP Type zitadel.organization_state; diff --git a/backend/v3/storage/database/dialect/postgres/migration/002_organization_table/up.sql b/backend/v3/storage/database/dialect/postgres/migration/002_organization_table/up.sql new file mode 100644 index 0000000000..16024775b7 --- /dev/null +++ b/backend/v3/storage/database/dialect/postgres/migration/002_organization_table/up.sql @@ -0,0 +1,14 @@ +CREATE TYPE zitadel.organization_state AS ENUM ( + 'ACTIVE', + 'INACTIVE' +); + +CREATE TABLE zitadel.organizations( + id TEXT NOT NULL PRIMARY KEY, + name TEXT NOT NULL, + instance_id TEXT NOT NULL, + state zitadel.organization_state NOT NULL, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW(), + deleted_at TIMESTAMP DEFAULT NULL +); diff --git a/backend/v3/storage/database/events_testing/instance_test.go b/backend/v3/storage/database/events_testing/instance_test.go index 0446130431..ea0c2f5519 100644 --- a/backend/v3/storage/database/events_testing/instance_test.go +++ b/backend/v3/storage/database/events_testing/instance_test.go @@ -1,6 +1,6 @@ //go:build integration -package instance_test +package events_test import ( "context" @@ -9,9 +9,11 @@ import ( "time" "github.com/brianvoe/gofakeit/v6" + "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/zitadel/zitadel/pkg/grpc/org/v2" "github.com/zitadel/zitadel/backend/v3/storage/database" "github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres" @@ -27,6 +29,7 @@ var ( CTX context.Context Instance *integration.Instance SystemClient system.SystemServiceClient + OrgClient org.OrganizationServiceClient ) var pool database.Pool @@ -40,9 +43,23 @@ func TestMain(m *testing.M) { CTX = Instance.WithAuthorization(ctx, integration.UserTypeIAMOwner) SystemClient = integration.SystemClient() + OrgClient = Instance.Client.OrgV2 var err error - dbPool, err = pgxpool.New(context.Background(), ConnString) + dbConfig, err := pgxpool.ParseConfig(ConnString) + if err != nil { + panic(err) + } + dbConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + orgState, err := conn.LoadType(ctx, "zitadel.organization_state") + if err != nil { + return err + } + conn.TypeMap().RegisterType(orgState) + return nil + } + + dbPool, err = pgxpool.NewWithConfig(context.Background(), dbConfig) if err != nil { panic(err) } diff --git a/backend/v3/storage/database/events_testing/organization_test.go b/backend/v3/storage/database/events_testing/organization_test.go new file mode 100644 index 0000000000..6b1c75cc76 --- /dev/null +++ b/backend/v3/storage/database/events_testing/organization_test.go @@ -0,0 +1,157 @@ +//go:build integration + +package events_test + +import ( + "fmt" + "testing" + "time" + + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zitadel/zitadel/backend/v3/storage/database" + "github.com/zitadel/zitadel/backend/v3/storage/database/repository" + "github.com/zitadel/zitadel/internal/integration" + "github.com/zitadel/zitadel/pkg/grpc/org/v2" +) + +// const ConnString = "host=localhost port=5432 user=zitadel dbname=zitadel sslmode=disable" + +// var ( +// dbPool *pgxpool.Pool +// CTX context.Context +// Organization *integration.Organization +// SystemClient system.SystemServiceClient +// ) + +// var pool database.Pool + +// func TestMain(m *testing.M) { +// os.Exit(func() int { +// ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) +// defer cancel() + +// Organization = integration.NewOrganization(ctx) + +// CTX = Organization.WithAuthorization(ctx, integration.UserTypeIAMOwner) +// SystemClient = integration.SystemClient() + +// var err error +// dbPool, err = pgxpool.New(context.Background(), ConnString) +// if err != nil { +// panic(err) +// } + +// pool = postgres.PGxPool(dbPool) + +// return m.Run() +// }()) +// } + +func TestServer_TestOrganizationAddReduces(t *testing.T) { + orgName := gofakeit.Name() + // beforeCreate := time.Now() + + _, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{ + Name: orgName, + }) + require.NoError(t, err) + // afterCreate := time.Now() + + orgRepo := repository.OrgRepository(pool) + organization, err := orgRepo.Get(CTX, + orgRepo.NameCondition(database.TextOperationEqual, orgName), + ) + require.NoError(t, err) + fmt.Printf("@@ >>>>>>>>>>>>>>>>>>>>>>>>>>>> organization = %+v\n", organization) + + retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute) + assert.EventuallyWithT(t, func(ttt *assert.CollectT) { + organization, err := orgRepo.Get(CTX, + orgRepo.NameCondition(database.TextOperationEqual, orgName), + ) + require.NoError(ttt, err) + fmt.Printf("@@ >>>>>>>>>>>>>>>>>>>>>>>>>>>> organization = %+v\n", organization) + // // event instance.added + // require.Equal(ttt, instanceName, organization.Name) + // // event instance.default.org.set + // require.NotNil(t, organization.DefaultOrgID) + // // event instance.iam.project.set + // require.NotNil(t, organization.IAMProjectID) + // // event instance.iam.console.set + // require.NotNil(t, organization.ConsoleAppID) + // // event instance.default.language.set + // require.NotNil(t, organization.DefaultLanguage) + // // event instance.added + // assert.WithinRange(t, organization.CreatedAt, beforeCreate, afterCreate) + // // event instance.added + // assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate) + // require.Nil(t, organization.DeletedAt) + }, retryDuration, tick) +} + +// func TestServer_TestOrganizationUpdateNameReduces(t *testing.T) { +// instanceName := gofakeit.Name() +// res, err := SystemClient.CreateOrganization(CTX, &system.CreateOrganizationRequest{ +// OrganizationName: instanceName, +// Owner: &system.CreateOrganizationRequest_Machine_{ +// Machine: &system.CreateOrganizationRequest_Machine{ +// UserName: "owner", +// Name: "owner", +// PersonalAccessToken: &system.CreateOrganizationRequest_PersonalAccessToken{}, +// }, +// }, +// }) +// require.NoError(t, err) + +// instanceName += "new" +// _, err = SystemClient.UpdateOrganization(CTX, &system.UpdateOrganizationRequest{ +// OrganizationId: res.OrganizationId, +// OrganizationName: instanceName, +// }) +// require.NoError(t, err) + +// instanceRepo := repository.OrganizationRepository(pool) +// retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute) +// assert.EventuallyWithT(t, func(ttt *assert.CollectT) { +// instance, err := instanceRepo.Get(CTX, +// instanceRepo.NameCondition(database.TextOperationEqual, instanceName), +// ) +// require.NoError(ttt, err) +// // event instance.changed +// require.Equal(ttt, instanceName, instance.Name) +// }, retryDuration, tick) +// } + +// func TestServer_TestOrganizationDeleteReduces(t *testing.T) { +// instanceName := gofakeit.Name() +// res, err := SystemClient.CreateOrganization(CTX, &system.CreateOrganizationRequest{ +// OrganizationName: instanceName, +// Owner: &system.CreateOrganizationRequest_Machine_{ +// Machine: &system.CreateOrganizationRequest_Machine{ +// UserName: "owner", +// Name: "owner", +// PersonalAccessToken: &system.CreateOrganizationRequest_PersonalAccessToken{}, +// }, +// }, +// }) +// require.NoError(t, err) + +// _, err = SystemClient.RemoveOrganization(CTX, &system.RemoveOrganizationRequest{ +// OrganizationId: res.OrganizationId, +// }) +// require.NoError(t, err) + +// instanceRepo := repository.OrganizationRepository(pool) +// retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute) +// assert.EventuallyWithT(t, func(ttt *assert.CollectT) { +// instance, err := instanceRepo.Get(CTX, +// instanceRepo.NameCondition(database.TextOperationEqual, instanceName), +// ) +// // event instance.removed +// require.Nil(t, instance) +// require.NoError(ttt, err) +// }, retryDuration, tick) +// } diff --git a/backend/v3/storage/database/repository/org.go b/backend/v3/storage/database/repository/org.go index 2dea176730..2da2bf2f29 100644 --- a/backend/v3/storage/database/repository/org.go +++ b/backend/v3/storage/database/repository/org.go @@ -2,8 +2,11 @@ package repository import ( "context" + "errors" "time" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/zitadel/zitadel/backend/v3/domain" "github.com/zitadel/zitadel/backend/v3/storage/database" ) @@ -12,11 +15,13 @@ import ( // repository // ------------------------------------------------------------- +var _ domain.OrganizationRepository = (*org)(nil) + type org struct { repository } -func OrgRepository(client database.QueryExecutor) domain.OrgRepository { +func OrgRepository(client database.QueryExecutor) domain.OrganizationRepository { return &org{ repository: repository{ client: client, @@ -24,116 +29,208 @@ func OrgRepository(client database.QueryExecutor) domain.OrgRepository { } } -// Create implements [domain.OrgRepository]. -func (o *org) Create(ctx context.Context, org *domain.Org) error { - org.CreatedAt = time.Now() - org.UpdatedAt = org.CreatedAt - return nil +const queryOrganizationStmt = `SELECT id, name, instance_id, state, created_at, updated_at, deleted_at` + + ` FROM zitadel.organizations` + +// Get implements [domain.OrganizationRepository]. +func (o *org) Get(ctx context.Context, opts ...database.Condition) (*domain.Organization, error) { + builder := database.StatementBuilder{} + + builder.WriteString(queryOrganizationStmt) + + // return only non deleted isntances + opts = append(opts, database.IsNull(o.DeletedAtColumn())) + andCondition := database.And(opts...) + o.writeCondition(&builder, andCondition) + + rows, err := o.client.Query(ctx, builder.String(), builder.Args()...) + if err != nil { + return nil, err + } + defer rows.Close() + + return scanOrganization(rows) } -// Delete implements [domain.OrgRepository]. -func (o *org) Delete(ctx context.Context, condition database.Condition) error { - return nil +// List implements [domain.OrganizationRepository]. +func (o *org) List(ctx context.Context, opts ...database.Condition) ([]domain.Organization, error) { + builder := database.StatementBuilder{} + + builder.WriteString(queryOrganizationStmt) + + // return only non deleted isntances + opts = append(opts, database.IsNull(o.DeletedAtColumn())) + andCondition := database.And(opts...) + o.writeCondition(&builder, andCondition) + + rows, err := o.client.Query(ctx, builder.String(), builder.Args()...) + if err != nil { + return nil, err + } + defer rows.Close() + + return scanOrganizations(rows) } -// Get implements [domain.OrgRepository]. -func (o *org) Get(ctx context.Context, opts ...database.QueryOption) (*domain.Org, error) { - panic("unimplemented") +const createOrganizationStmt = `INSERT INTO zitadel.organizations (id, name, instance_id, state)` + + ` VALUES ($1, $2, $3, $4)` + + ` RETURNING created_at, updated_at` + +// Create implements [domain.OrganizationRepository]. +func (o *org) Create(ctx context.Context, organization *domain.Organization) error { + builder := database.StatementBuilder{} + builder.AppendArgs(organization.ID, organization.Name, organization.InstanceID, organization.State) + builder.WriteString(createOrganizationStmt) + + err := o.client.QueryRow(ctx, builder.String(), builder.Args()...).Scan(&organization.CreatedAt, &organization.UpdatedAt) + if err != nil { + var pgErr *pgconn.PgError + if errors.As(err, &pgErr) { + // constraint violation + if pgErr.Code == "23514" { + if pgErr.ConstraintName == "organizations_name_check" { + return errors.New("instnace name not provided") + } + if pgErr.ConstraintName == "organizations_id_check" { + return errors.New("instnace id not provided") + } + } + // duplicate + if pgErr.Code == "23505" { + if pgErr.ConstraintName == "organizations_pkey" { + return errors.New("instnace id already exists") + } + } + } + } + return err } -// List implements [domain.OrgRepository]. -func (o *org) List(ctx context.Context, opts ...database.QueryOption) ([]*domain.Org, error) { - panic("unimplemented") +// Update implements [domain.OrganizationRepository]. +func (o org) Update(ctx context.Context, condition database.Condition, changes ...database.Change) (int64, error) { + builder := database.StatementBuilder{} + builder.WriteString(`UPDATE zitadel.organizations SET `) + database.Changes(changes).Write(&builder) + o.writeCondition(&builder, condition) + + stmt := builder.String() + + rowsAffected, err := o.client.Exec(ctx, stmt, builder.Args()...) + return rowsAffected, err } -// Update implements [domain.OrgRepository]. -func (o *org) Update(ctx context.Context, condition database.Condition, changes ...database.Change) error { - panic("unimplemented") -} +// Delete implements [domain.OrganizationRepository]. +func (o org) Delete(ctx context.Context, condition database.Condition) error { + if condition == nil { + return errors.New("Delete must contain a condition") // (otherwise ALL organizations will be deleted) + } + builder := database.StatementBuilder{} + builder.WriteString(`UPDATE zitadel.organizations SET deleted_at = $1`) + builder.AppendArgs(time.Now()) -func (o *org) Member() domain.MemberRepository { - return &orgMember{o} -} - -func (o *org) Domain() domain.DomainRepository { - return &orgDomain{o} + o.writeCondition(&builder, condition) + _, err := o.client.Exec(ctx, builder.String(), builder.Args()...) + return err } // ------------------------------------------------------------- // changes // ------------------------------------------------------------- -// SetName implements [domain.orgChanges]. -func (o *org) SetName(name string) database.Change { - return database.NewChange(o.NameColumn(), name) -} - -// SetState implements [domain.orgChanges]. -func (o *org) SetState(state domain.OrgState) database.Change { - return database.NewChange(o.StateColumn(), state) +// SetName implements [domain.organizationChanges]. +func (i org) SetName(name string) database.Change { + return database.NewChange(i.NameColumn(), name) } // ------------------------------------------------------------- // conditions // ------------------------------------------------------------- -// IDCondition implements [domain.orgConditions]. -func (o *org) IDCondition(orgID string) database.Condition { - return database.NewTextCondition(o.IDColumn(), database.TextOperationEqual, orgID) +// IDCondition implements [domain.organizationConditions]. +func (o org) IDCondition(id string) database.Condition { + return database.NewTextCondition(o.IDColumn(), database.TextOperationEqual, id) } -// InstanceIDCondition implements [domain.orgConditions]. -func (o *org) InstanceIDCondition(instanceID string) database.Condition { - return database.NewTextCondition(o.InstanceIDColumn(), database.TextOperationEqual, instanceID) -} - -// NameCondition implements [domain.orgConditions]. -func (o *org) NameCondition(op database.TextOperation, name string) database.Condition { +// NameCondition implements [domain.organizationConditions]. +func (o org) NameCondition(op database.TextOperation, name string) database.Condition { return database.NewTextCondition(o.NameColumn(), op, name) } -// StateCondition implements [domain.orgConditions]. -func (o *org) StateCondition(op database.NumberOperation, state domain.OrgState) database.Condition { - return database.NewNumberCondition(o.StateColumn(), op, state) -} - // ------------------------------------------------------------- // columns // ------------------------------------------------------------- -// CreatedAtColumn implements [domain.orgColumns]. -func (o *org) CreatedAtColumn() database.Column { - return database.NewColumn("created_at") -} - -// DeletedAtColumn implements [domain.orgColumns]. -func (o *org) DeletedAtColumn() database.Column { - return database.NewColumn("deleted_at") -} - -// IDColumn implements [domain.orgColumns]. -func (o *org) IDColumn() database.Column { +// IDColumn implements [domain.organizationColumns]. +func (org) IDColumn() database.Column { return database.NewColumn("id") } -// InstanceIDColumn implements [domain.orgColumns]. -func (o *org) InstanceIDColumn() database.Column { - return database.NewColumn("instance_id") -} - -// NameColumn implements [domain.orgColumns]. -func (o *org) NameColumn() database.Column { +// NameColumn implements [domain.organizationColumns]. +func (org) NameColumn() database.Column { return database.NewColumn("name") } -// StateColumn implements [domain.orgColumns]. -func (o *org) StateColumn() database.Column { +// InstanceIDColumn implements [domain.organizationColumns]. +func (org) InstanceIDColumn() database.Column { + return database.NewColumn("instance_id") +} + +// StateColumn implements [domain.organizationColumns]. +func (org) StateColumn() database.Column { return database.NewColumn("state") } -// UpdatedAtColumn implements [domain.orgColumns]. -func (o *org) UpdatedAtColumn() database.Column { +// CreatedAtColumn implements [domain.organizationColumns]. +func (org) CreatedAtColumn() database.Column { + return database.NewColumn("created_at") +} + +// UpdatedAtColumn implements [domain.organizationColumns]. +func (org) UpdatedAtColumn() database.Column { return database.NewColumn("updated_at") } -var _ domain.OrgRepository = (*org)(nil) +// DeletedAtColumn implements [domain.organizationColumns]. +func (org) DeletedAtColumn() database.Column { + return database.NewColumn("deleted_at") +} + +func (o *org) writeCondition( + builder *database.StatementBuilder, + condition database.Condition, +) { + if condition == nil { + return + } + builder.WriteString(" WHERE ") + condition.Write(builder) +} + +func scanOrganization(rows database.Rows) (*domain.Organization, error) { + organization, err := pgx.CollectOneRow[domain.Organization](rows, pgx.RowToStructByNameLax[domain.Organization]) + if err != nil { + // if no results returned, this is not a error + // it just means the organization was not found + // the caller should check if the returned organization is nil + if err.Error() == "no rows in result set" { + return nil, nil + } + return nil, err + } + + return &organization, nil +} + +func scanOrganizations(rows database.Rows) ([]domain.Organization, error) { + organizations, err := pgx.CollectRows[domain.Organization](rows, pgx.RowToStructByNameLax[domain.Organization]) + if err != nil { + // if no results returned, this is not a error + // it just means the organization was not found + // the caller should check if the returned organization is nil + if err.Error() == "no rows in result set" { + return nil, nil + } + return nil, err + } + return organizations, nil +} diff --git a/internal/database/postgres/pg.go b/internal/database/postgres/pg.go index 2f8bb29e17..73353dbd5b 100644 --- a/internal/database/postgres/pg.go +++ b/internal/database/postgres/pg.go @@ -87,6 +87,22 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { return err } } + + // Checken & Egg problem: + // conn.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS zitadel") + // conn.Exec(ctx, ` + // CREATE TYPE zitadel.organization_state AS ENUM ( + // 'ACTIVE', + // 'INACTIVE' + // )`, + // ) + + // orgState, err := conn.LoadType(ctx, "zitadel.organization_state") + // if err != nil { + // return err + // } + // conn.TypeMap().RegisterType(orgState) + return nil } } diff --git a/internal/query/projection/org_relational.go b/internal/query/projection/org_relational.go new file mode 100644 index 0000000000..81d9ac69ed --- /dev/null +++ b/internal/query/projection/org_relational.go @@ -0,0 +1,207 @@ +package projection + +import ( + "context" + + repoDomain "github.com/zitadel/zitadel/backend/v3/domain" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/eventstore" + old_handler "github.com/zitadel/zitadel/internal/eventstore/handler" + "github.com/zitadel/zitadel/internal/eventstore/handler/v2" + "github.com/zitadel/zitadel/internal/repository/instance" + "github.com/zitadel/zitadel/internal/repository/org" + "github.com/zitadel/zitadel/internal/zerrors" +) + +const ( + OrgRelationProjectionTable = "zitadel.organizations" +) + +type orgRelationalProjection struct{} + +func (*orgRelationalProjection) Name() string { + return OrgRelationProjectionTable +} + +func newOrgRelationalProjection(ctx context.Context, config handler.Config) *handler.Handler { + return handler.NewHandler(ctx, &config, new(orgRelationalProjection)) +} + +// Init implements [handler.initializer] +func (p *orgRelationalProjection) Init() *old_handler.Check { + return handler.NewTableCheck( + handler.NewTable([]*handler.InitColumn{ + handler.NewColumn(OrgColumnID, handler.ColumnTypeText), + handler.NewColumn(OrgColumnName, handler.ColumnTypeText), + handler.NewColumn(OrgColumnInstanceID, handler.ColumnTypeText), + handler.NewColumn(State, handler.ColumnTypeEnum), + handler.NewColumn(CreatedAt, handler.ColumnTypeTimestamp), + handler.NewColumn(UpdatedAt, handler.ColumnTypeTimestamp), + handler.NewColumn(DeletedAt, handler.ColumnTypeTimestamp), + }, + handler.NewPrimaryKey(OrgColumnInstanceID, OrgColumnID), + // handler.WithIndex(handler.NewIndex("domain", []string{OrgColumnDomain})), + handler.WithIndex(handler.NewIndex("name", []string{OrgColumnName})), + ), + ) +} + +func (p *orgRelationalProjection) Reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{ + { + Aggregate: org.AggregateType, + EventReducers: []handler.EventReducer{ + { + Event: org.OrgAddedEventType, + Reduce: p.reduceOrgRelationalAdded, + }, + { + Event: org.OrgChangedEventType, + Reduce: p.reduceOrgRelationalChanged, + }, + { + Event: org.OrgDeactivatedEventType, + Reduce: p.reduceOrgRelationalDeactivated, + }, + { + Event: org.OrgReactivatedEventType, + Reduce: p.reduceOrgRelationalReactivated, + }, + { + Event: org.OrgRemovedEventType, + Reduce: p.reduceOrgRelationalRemoved, + }, + // TODO + // { + // Event: org.OrgDomainPrimarySetEventType, + // Reduce: p.reducePrimaryDomainSetRelational, + // }, + }, + }, + { + Aggregate: instance.AggregateType, + EventReducers: []handler.EventReducer{ + { + Event: instance.InstanceRemovedEventType, + Reduce: reduceInstanceRemovedHelper(OrgColumnInstanceID), + }, + }, + }, + } +} + +func (p *orgRelationalProjection) reduceOrgRelationalAdded(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*org.OrgAddedEvent) + if !ok { + return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-uYq5R", "reduce.wrong.event.type %s", org.OrgAddedEventType) + } + + // need to convert old state (int) to new state (enum) + state := repoDomain.State[domain.OrgStateActive-1] + + return handler.NewCreateStatement( + e, + []handler.Column{ + handler.NewCol(OrgColumnID, e.Aggregate().ID), + handler.NewCol(OrgColumnName, e.Name), + handler.NewCol(OrgColumnInstanceID, e.Aggregate().InstanceID), + handler.NewCol(State, state), + handler.NewCol(CreatedAt, e.CreationDate()), + handler.NewCol(UpdatedAt, e.CreationDate()), + }, + ), nil +} + +func (p *orgRelationalProjection) reduceOrgRelationalChanged(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*org.OrgChangedEvent) + if !ok { + return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Bg9om", "reduce.wrong.event.type %s", org.OrgChangedEventType) + } + if e.Name == "" { + return handler.NewNoOpStatement(e), nil + } + return handler.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(OrgColumnName, e.Name), + handler.NewCol(UpdatedAt, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(OrgColumnID, e.Aggregate().ID), + handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID), + }, + ), nil +} + +func (p *orgRelationalProjection) reduceOrgRelationalDeactivated(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*org.OrgDeactivatedEvent) + if !ok { + return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-BApK5", "reduce.wrong.event.type %s", org.OrgDeactivatedEventType) + } + return handler.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(State, domain.OrgStateInactive), + handler.NewCol(UpdatedAt, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(OrgColumnID, e.Aggregate().ID), + handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID), + }, + ), nil +} + +func (p *orgRelationalProjection) reduceOrgRelationalReactivated(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*org.OrgReactivatedEvent) + if !ok { + return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-o38DE", "reduce.wrong.event.type %s", org.OrgReactivatedEventType) + } + return handler.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(State, domain.OrgStateActive), + handler.NewCol(UpdatedAt, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(OrgColumnID, e.Aggregate().ID), + handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID), + }, + ), nil +} + +// TODO +// func (p *orgRelationalProjection) reducePrimaryDomainSetRelational(event eventstore.Event) (*handler.Statement, error) { +// e, ok := event.(*org.DomainPrimarySetEvent) +// if !ok { +// return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-3Tbkt", "reduce.wrong.event.type %s", org.OrgDomainPrimarySetEventType) +// } +// return handler.NewUpdateStatement( +// e, +// []handler.Column{ +// handler.NewCol(OrgColumnChangeDate, e.CreationDate()), +// handler.NewCol(OrgColumnSequence, e.Sequence()), +// handler.NewCol(OrgColumnDomain, e.Domain), +// }, +// []handler.Condition{ +// handler.NewCond(OrgColumnID, e.Aggregate().ID), +// handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID), +// }, +// ), nil +// } + +func (p *orgRelationalProjection) reduceOrgRelationalRemoved(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*org.OrgRemovedEvent) + if !ok { + return nil, zerrors.ThrowInvalidArgumentf(nil, "PROJE-DGm9g", "reduce.wrong.event.type %s", org.OrgRemovedEventType) + } + return handler.NewUpdateStatement( + e, + []handler.Column{ + handler.NewCol(DeletedAt, e.CreationDate()), + }, + []handler.Condition{ + handler.NewCond(OrgColumnID, e.Aggregate().ID), + handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID), + }, + ), nil +} diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 7b700750f8..ba4861975e 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -56,6 +56,7 @@ var ( UserAuthMethodProjection *handler.Handler InstanceProjection *handler.Handler InstanceRelationalProjection *handler.Handler + OrganizationRelationalProjection *handler.Handler SecretGeneratorProjection *handler.Handler SMTPConfigProjection *handler.Handler SMSConfigProjection *handler.Handler @@ -151,7 +152,8 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore, UserMetadataProjection = newUserMetadataProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["user_metadata"])) UserAuthMethodProjection = newUserAuthMethodProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["user_auth_method"])) InstanceProjection = newInstanceProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instances"])) - InstanceRelationalProjection = newInstanceRelationalProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instances_relational"])) + InstanceRelationalProjection = newInstanceRelationalProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["organizations_relational"])) + OrganizationRelationalProjection = newOrgRelationalProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instances_relational"])) SecretGeneratorProjection = newSecretGeneratorProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["secret_generators"])) SMTPConfigProjection = newSMTPConfigProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["smtp_configs"])) SMSConfigProjection = newSMSConfigProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["sms_config"])) @@ -307,6 +309,7 @@ func newProjectionsList() { UserAuthMethodProjection, InstanceProjection, InstanceRelationalProjection, + OrganizationRelationalProjection, SecretGeneratorProjection, SMTPConfigProjection, SMSConfigProjection, diff --git a/internal/query/projection/relational_common.go b/internal/query/projection/relational_common.go index 0140ea3559..7b0d957661 100644 --- a/internal/query/projection/relational_common.go +++ b/internal/query/projection/relational_common.go @@ -1,6 +1,7 @@ package projection const ( + State = "state" CreatedAt = "created_at" UpdatedAt = "updated_at" DeletedAt = "deleted_at"