fixup! feat(db): adding org table to relational model

This commit is contained in:
Iraq Jaber
2025-06-13 11:52:32 +02:00
parent b37937d333
commit 3e67e80c75
5 changed files with 180 additions and 121 deletions

View File

@@ -8,7 +8,7 @@ CREATE TABLE zitadel.organizations(
name TEXT NOT NULL, name TEXT NOT NULL,
instance_id TEXT NOT NULL, instance_id TEXT NOT NULL,
state zitadel.organization_state NOT NULL, state zitadel.organization_state NOT NULL,
created_at TIMESTAMP DEFAULT NOW(), created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW(),
deleted_at TIMESTAMP DEFAULT NULL deleted_at TIMESTAMPTZ DEFAULT NULL
); );

View File

@@ -19,6 +19,7 @@ import (
"github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres" "github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres"
"github.com/zitadel/zitadel/backend/v3/storage/database/repository" "github.com/zitadel/zitadel/backend/v3/storage/database/repository"
"github.com/zitadel/zitadel/internal/integration" "github.com/zitadel/zitadel/internal/integration"
mgmt "github.com/zitadel/zitadel/pkg/grpc/management"
"github.com/zitadel/zitadel/pkg/grpc/system" "github.com/zitadel/zitadel/pkg/grpc/system"
) )
@@ -30,6 +31,7 @@ var (
Instance *integration.Instance Instance *integration.Instance
SystemClient system.SystemServiceClient SystemClient system.SystemServiceClient
OrgClient org.OrganizationServiceClient OrgClient org.OrganizationServiceClient
MgmtClient mgmt.ManagementServiceClient
) )
var pool database.Pool var pool database.Pool
@@ -44,6 +46,7 @@ func TestMain(m *testing.M) {
CTX = Instance.WithAuthorization(ctx, integration.UserTypeIAMOwner) CTX = Instance.WithAuthorization(ctx, integration.UserTypeIAMOwner)
SystemClient = integration.SystemClient() SystemClient = integration.SystemClient()
OrgClient = Instance.Client.OrgV2 OrgClient = Instance.Client.OrgV2
MgmtClient = Instance.Client.Mgmt
var err error var err error
dbConfig, err := pgxpool.ParseConfig(ConnString) dbConfig, err := pgxpool.ParseConfig(ConnString)

View File

@@ -3,7 +3,6 @@
package events_test package events_test
import ( import (
"fmt"
"testing" "testing"
"time" "time"
@@ -11,147 +10,182 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/backend/v3/domain"
"github.com/zitadel/zitadel/backend/v3/storage/database" "github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/repository" "github.com/zitadel/zitadel/backend/v3/storage/database/repository"
"github.com/zitadel/zitadel/internal/integration" "github.com/zitadel/zitadel/internal/integration"
"github.com/zitadel/zitadel/pkg/grpc/management"
"github.com/zitadel/zitadel/pkg/grpc/org/v2" "github.com/zitadel/zitadel/pkg/grpc/org/v2"
) )
// const ConnString = "host=localhost port=5432 user=zitadel dbname=zitadel sslmode=disable"
// var (
// dbPool *pgxpool.Pool
// CTX context.Context
// Organization *integration.Organization
// SystemClient system.SystemServiceClient
// )
// var pool database.Pool
// func TestMain(m *testing.M) {
// os.Exit(func() int {
// ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
// defer cancel()
// Organization = integration.NewOrganization(ctx)
// CTX = Organization.WithAuthorization(ctx, integration.UserTypeIAMOwner)
// SystemClient = integration.SystemClient()
// var err error
// dbPool, err = pgxpool.New(context.Background(), ConnString)
// if err != nil {
// panic(err)
// }
// pool = postgres.PGxPool(dbPool)
// return m.Run()
// }())
// }
func TestServer_TestOrganizationAddReduces(t *testing.T) { func TestServer_TestOrganizationAddReduces(t *testing.T) {
beforeCreate := time.Now()
orgName := gofakeit.Name() orgName := gofakeit.Name()
// beforeCreate := time.Now()
_, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{ _, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{
Name: orgName, Name: orgName,
}) })
require.NoError(t, err) require.NoError(t, err)
// afterCreate := time.Now() afterCreate := time.Now()
orgRepo := repository.OrgRepository(pool) orgRepo := repository.OrgRepository(pool)
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
fmt.Printf("@@ >>>>>>>>>>>>>>>>>>>>>>>>>>>> organization = %+v\n", organization)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute) retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) { assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX, organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName), orgRepo.NameCondition(database.TextOperationEqual, orgName),
) )
require.NoError(ttt, err) require.NoError(t, err)
fmt.Printf("@@ >>>>>>>>>>>>>>>>>>>>>>>>>>>> organization = %+v\n", organization)
// // event instance.added activeState := domain.State[0]
// require.Equal(ttt, instanceName, organization.Name)
// // event instance.default.org.set // event org.added
// require.NotNil(t, organization.DefaultOrgID) require.NotNil(t, organization.ID)
// // event instance.iam.project.set require.Equal(t, orgName, organization.Name)
// require.NotNil(t, organization.IAMProjectID) require.NotNil(t, organization.InstanceID)
// // event instance.iam.console.set require.Equal(t, activeState, organization.State)
// require.NotNil(t, organization.ConsoleAppID) assert.WithinRange(t, organization.CreatedAt, beforeCreate, afterCreate)
// // event instance.default.language.set assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
// require.NotNil(t, organization.DefaultLanguage) require.Nil(t, organization.DeletedAt)
// // event instance.added
// assert.WithinRange(t, organization.CreatedAt, beforeCreate, afterCreate)
// // event instance.added
// assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
// require.Nil(t, organization.DeletedAt)
}, retryDuration, tick) }, retryDuration, tick)
} }
// func TestServer_TestOrganizationUpdateNameReduces(t *testing.T) { func TestServer_TestOrganizationChangeReduces(t *testing.T) {
// instanceName := gofakeit.Name() beforeCreate := time.Now()
// res, err := SystemClient.CreateOrganization(CTX, &system.CreateOrganizationRequest{ orgName := gofakeit.Name()
// OrganizationName: instanceName,
// Owner: &system.CreateOrganizationRequest_Machine_{
// Machine: &system.CreateOrganizationRequest_Machine{
// UserName: "owner",
// Name: "owner",
// PersonalAccessToken: &system.CreateOrganizationRequest_PersonalAccessToken{},
// },
// },
// })
// require.NoError(t, err)
// instanceName += "new" // 1. create org
// _, err = SystemClient.UpdateOrganization(CTX, &system.UpdateOrganizationRequest{ _, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{
// OrganizationId: res.OrganizationId, Name: orgName,
// OrganizationName: instanceName, })
// }) require.NoError(t, err)
// require.NoError(t, err)
// instanceRepo := repository.OrganizationRepository(pool) // 2. update org name
// retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute) orgName = orgName + "_new"
// assert.EventuallyWithT(t, func(ttt *assert.CollectT) { _, err = MgmtClient.UpdateOrg(CTX, &management.UpdateOrgRequest{
// instance, err := instanceRepo.Get(CTX, Name: orgName,
// instanceRepo.NameCondition(database.TextOperationEqual, instanceName), })
// ) require.NoError(t, err)
// require.NoError(ttt, err) afterCreate := time.Now()
// // event instance.changed
// require.Equal(ttt, instanceName, instance.Name)
// }, retryDuration, tick)
// }
// func TestServer_TestOrganizationDeleteReduces(t *testing.T) { orgRepo := repository.OrgRepository(pool)
// instanceName := gofakeit.Name()
// res, err := SystemClient.CreateOrganization(CTX, &system.CreateOrganizationRequest{
// OrganizationName: instanceName,
// Owner: &system.CreateOrganizationRequest_Machine_{
// Machine: &system.CreateOrganizationRequest_Machine{
// UserName: "owner",
// Name: "owner",
// PersonalAccessToken: &system.CreateOrganizationRequest_PersonalAccessToken{},
// },
// },
// })
// require.NoError(t, err)
// _, err = SystemClient.RemoveOrganization(CTX, &system.RemoveOrganizationRequest{ retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
// OrganizationId: res.OrganizationId, assert.EventuallyWithT(t, func(t *assert.CollectT) {
// }) organization, err := orgRepo.Get(CTX,
// require.NoError(t, err) orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
// instanceRepo := repository.OrganizationRepository(pool) // event org.changed
// retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute) require.Equal(t, orgName, organization.Name)
// assert.EventuallyWithT(t, func(ttt *assert.CollectT) { assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
// instance, err := instanceRepo.Get(CTX, }, retryDuration, tick)
// instanceRepo.NameCondition(database.TextOperationEqual, instanceName), }
// )
// // event instance.removed func TestServer_TestOrganizationDeactivateReduces(t *testing.T) {
// require.Nil(t, instance) beforeCreate := time.Now()
// require.NoError(ttt, err) orgName := gofakeit.Name()
// }, retryDuration, tick)
// } // 1. create org
organization, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. deactivate org name
_ = Instance.DeactivateOrganization(CTX, organization.OrganizationId)
require.NoError(t, err)
afterCreate := time.Now()
orgRepo := repository.OrgRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
deactiveState := domain.State[1]
// event org.deactivate
require.Equal(t, orgName, organization.Name)
require.Equal(t, deactiveState, organization.State)
assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
}, retryDuration, tick)
}
func TestServer_TestOrganizationActivateReduces(t *testing.T) {
beforeCreate := time.Now()
orgName := gofakeit.Name()
// 1. create org
organization, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. deactivate org name
_ = Instance.DeactivateOrganization(CTX, organization.OrganizationId)
require.NoError(t, err)
// 2. activate org name
_ = Instance.ReactivateOrganization(CTX, organization.OrganizationId)
require.NoError(t, err)
afterCreate := time.Now()
orgRepo := repository.OrgRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
deactiveState := domain.State[1]
// event org.reactivate
require.Equal(t, orgName, organization.Name)
require.Equal(t, deactiveState, organization.State)
assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
}, retryDuration, tick)
}
func TestServer_TestOrganizationRemoveReduces(t *testing.T) {
orgName := gofakeit.Name()
// 1. create org
organization, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. check org retrivable
orgRepo := repository.OrgRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
require.Equal(t, orgName, organization.Name)
}, retryDuration, tick)
// 3. delete org
_ = Instance.RemoveOrganization(CTX, organization.OrganizationId)
require.NoError(t, err)
retryDuration, tick = integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
// event org.remove
require.Nil(t, organization)
}, retryDuration, tick)
}

View File

@@ -277,6 +277,24 @@ func (i *Instance) DeactivateOrganization(ctx context.Context, orgID string) *mg
return resp return resp
} }
func (i *Instance) ReactivateOrganization(ctx context.Context, orgID string) *mgmt.ReactivateOrgResponse {
resp, err := i.Client.Mgmt.ReactivateOrg(
SetOrgID(ctx, orgID),
&mgmt.ReactivateOrgRequest{},
)
logging.OnError(err).Fatal("reactivate org")
return resp
}
func (i *Instance) RemoveOrganization(ctx context.Context, orgID string) *mgmt.RemoveOrgResponse {
resp, err := i.Client.Mgmt.RemoveOrg(
SetOrgID(ctx, orgID),
&mgmt.RemoveOrgRequest{},
)
logging.OnError(err).Fatal("reactivate org")
return resp
}
func SetOrgID(ctx context.Context, orgID string) context.Context { func SetOrgID(ctx context.Context, orgID string) context.Context {
md, ok := metadata.FromOutgoingContext(ctx) md, ok := metadata.FromOutgoingContext(ctx)
if !ok { if !ok {

View File

@@ -138,10 +138,13 @@ func (p *orgRelationalProjection) reduceOrgRelationalDeactivated(event eventstor
if !ok { if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-BApK5", "reduce.wrong.event.type %s", org.OrgDeactivatedEventType) return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-BApK5", "reduce.wrong.event.type %s", org.OrgDeactivatedEventType)
} }
// need to convert old state (int) to new state (enum)
state := repoDomain.State[domain.OrgStateInactive-1]
return handler.NewUpdateStatement( return handler.NewUpdateStatement(
e, e,
[]handler.Column{ []handler.Column{
handler.NewCol(State, domain.OrgStateInactive), handler.NewCol(State, state),
handler.NewCol(UpdatedAt, e.CreationDate()), handler.NewCol(UpdatedAt, e.CreationDate()),
}, },
[]handler.Condition{ []handler.Condition{
@@ -197,6 +200,7 @@ func (p *orgRelationalProjection) reduceOrgRelationalRemoved(event eventstore.Ev
return handler.NewUpdateStatement( return handler.NewUpdateStatement(
e, e,
[]handler.Column{ []handler.Column{
handler.NewCol(UpdatedAt, e.CreationDate()),
handler.NewCol(DeletedAt, e.CreationDate()), handler.NewCol(DeletedAt, e.CreationDate()),
}, },
[]handler.Condition{ []handler.Condition{