feat(db): adding org table to relational model (#10066)

# Which Problems Are Solved

As an outcome of [this
issue](https://github.com/zitadel/zitadel/issues/9599) we want to
implement relational tables in Zitadel. For that we use new tables as a
successor of the current tables used by Zitadel in `projections`, `auth`
and `admin` schemas. The new logic is based on [this
proposal](https://github.com/zitadel/zitadel/pull/9870). This issue does
not contain the switch from CQRS to the new tables. This is change will
be implemented in a later stage.

We focus on the most critical tables which is user authentication.

We need a table to manage organizations. 

### organization fields

The following fields must be managed in this table:

- `id`
- `instance_id`
- `name`
- `state` enum (active, inactive)
- `created_at`
- `updated_at`
- `deleted_at`

DISCUSS: should we add a `primary_domain` to this table so that we do
not have to join on domains to return a simple org?

We must ensure the unique constraints for this table matches the current
commands.

### organization repository

The repository must provide the following functions:

Manipulations:
- create
  - `instance_id`
  - `name`
- update
  - `name`
- delete

Queries:
- get returns single organization matching the criteria and pagination,
should return error if multiple were found
- list returns list of organizations matching the criteria, pagination

Criteria are the following:
- by id
- by name

pagination:
- by created_at
- by updated_at
- by name

### organization events

The following events must be applied on the table using a projection
(`internal/query/projection`)

- `org.added` results in create
- `org.changed` sets the `name` field
- `org.deactivated` sets the `state` field
- `org.reactivated` sets the `state` field
- `org.removed` sets the `deleted_at` field
- if answer is yes to discussion: `org.domain.primary.set` sets the
`primary_domain` field
- `instance.removed` sets the the `deleted_at` field if not already set

### acceptance criteria

- [x] migration is implemented and gets executed
- [x] domain interfaces are implemented and documented for service layer
- [x] repository is implemented and implements domain interface
- [x] testing
  - [x] the repository methods
  - [x] events get reduced correctly
  - [x] unique constraints
# Additional Context

Replace this example with links to related issues, discussions, discord
threads, or other sources with more context.
Use the Closing #issue syntax for issues that are resolved with this PR.
- Closes #https://github.com/zitadel/zitadel/issues/9936

---------

Co-authored-by: adlerhurst <27845747+adlerhurst@users.noreply.github.com>
This commit is contained in:
Iraq
2025-07-14 21:27:14 +02:00
committed by GitHub
parent 9595a1bcca
commit 8d020e56bb
28 changed files with 2238 additions and 590 deletions

View File

@@ -0,0 +1,67 @@
//go:build integration
package events_test
import (
"context"
"os"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres"
"github.com/zitadel/zitadel/internal/integration"
v2beta_org "github.com/zitadel/zitadel/pkg/grpc/org/v2beta"
"github.com/zitadel/zitadel/pkg/grpc/system"
)
const ConnString = "host=localhost port=5432 user=zitadel dbname=zitadel sslmode=disable"
var (
dbPool *pgxpool.Pool
CTX context.Context
Instance *integration.Instance
SystemClient system.SystemServiceClient
OrgClient v2beta_org.OrganizationServiceClient
)
var pool database.Pool
func TestMain(m *testing.M) {
os.Exit(func() int {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
Instance = integration.NewInstance(ctx)
CTX = Instance.WithAuthorization(ctx, integration.UserTypeIAMOwner)
SystemClient = integration.SystemClient()
OrgClient = Instance.Client.OrgV2beta
var err error
dbConfig, err := pgxpool.ParseConfig(ConnString)
if err != nil {
panic(err)
}
dbConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
orgState, err := conn.LoadType(ctx, "zitadel.organization_state")
if err != nil {
return err
}
conn.TypeMap().RegisterType(orgState)
return nil
}
dbPool, err = pgxpool.NewWithConfig(context.Background(), dbConfig)
if err != nil {
panic(err)
}
pool = postgres.PGxPool(dbPool)
return m.Run()
}())
}

View File

@@ -1,171 +1,138 @@
//go:build integration
package instance_test
package events_test
import (
"context"
"os"
"testing"
"time"
"github.com/brianvoe/gofakeit/v6"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres"
"github.com/zitadel/zitadel/backend/v3/storage/database/repository"
"github.com/zitadel/zitadel/internal/integration"
"github.com/zitadel/zitadel/pkg/grpc/system"
)
const ConnString = "host=localhost port=5432 user=zitadel dbname=zitadel sslmode=disable"
var (
dbPool *pgxpool.Pool
CTX context.Context
Instance *integration.Instance
SystemClient system.SystemServiceClient
)
var pool database.Pool
func TestMain(m *testing.M) {
os.Exit(func() int {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
defer cancel()
Instance = integration.NewInstance(ctx)
CTX = Instance.WithAuthorization(ctx, integration.UserTypeIAMOwner)
SystemClient = integration.SystemClient()
var err error
dbPool, err = pgxpool.New(context.Background(), ConnString)
if err != nil {
panic(err)
}
pool = postgres.PGxPool(dbPool)
return m.Run()
}())
}
func TestServer_TestInstanceAddReduces(t *testing.T) {
instanceName := gofakeit.Name()
beforeCreate := time.Now()
_, err := SystemClient.CreateInstance(CTX, &system.CreateInstanceRequest{
InstanceName: instanceName,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{},
func TestServer_TestInstanceReduces(t *testing.T) {
t.Run("test instance add reduces", func(t *testing.T) {
instanceName := gofakeit.Name()
beforeCreate := time.Now()
instance, err := SystemClient.CreateInstance(CTX, &system.CreateInstanceRequest{
InstanceName: instanceName,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{},
},
},
},
})
afterCreate := time.Now()
require.NoError(t, err)
instanceRepo := repository.InstanceRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
instance.GetInstanceId(),
)
require.NoError(ttt, err)
// event instance.added
assert.Equal(ttt, instanceName, instance.Name)
// event instance.default.org.set
assert.NotNil(t, instance.DefaultOrgID)
// event instance.iam.project.set
assert.NotNil(t, instance.IAMProjectID)
// event instance.iam.console.set
assert.NotNil(t, instance.ConsoleAppID)
// event instance.default.language.set
assert.NotNil(t, instance.DefaultLanguage)
// event instance.added
assert.WithinRange(t, instance.CreatedAt, beforeCreate, afterCreate)
// event instance.added
assert.WithinRange(t, instance.UpdatedAt, beforeCreate, afterCreate)
assert.Nil(t, instance.DeletedAt)
}, retryDuration, tick)
})
afterCreate := time.Now()
require.NoError(t, err)
instanceRepo := repository.InstanceRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
instanceRepo.NameCondition(database.TextOperationEqual, instanceName),
)
require.NoError(ttt, err)
// event instance.added
require.Equal(ttt, instanceName, instance.Name)
// event instance.default.org.set
require.NotNil(t, instance.DefaultOrgID)
// event instance.iam.project.set
require.NotNil(t, instance.IAMProjectID)
// event instance.iam.console.set
require.NotNil(t, instance.ConsoleAppID)
// event instance.default.language.set
require.NotNil(t, instance.DefaultLanguage)
// event instance.added
assert.WithinRange(t, instance.CreatedAt, beforeCreate, afterCreate)
// event instance.added
assert.WithinRange(t, instance.UpdatedAt, beforeCreate, afterCreate)
require.Nil(t, instance.DeletedAt)
}, retryDuration, tick)
}
func TestServer_TestInstanceUpdateNameReduces(t *testing.T) {
instanceName := gofakeit.Name()
res, err := SystemClient.CreateInstance(CTX, &system.CreateInstanceRequest{
InstanceName: instanceName,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{},
t.Run("test instance update reduces", func(t *testing.T) {
instanceName := gofakeit.Name()
res, err := SystemClient.CreateInstance(CTX, &system.CreateInstanceRequest{
InstanceName: instanceName,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{},
},
},
},
})
require.NoError(t, err)
instanceName += "new"
beforeUpdate := time.Now()
_, err = SystemClient.UpdateInstance(CTX, &system.UpdateInstanceRequest{
InstanceId: res.InstanceId,
InstanceName: instanceName,
})
require.NoError(t, err)
afterUpdate := time.Now()
instanceRepo := repository.InstanceRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
res.InstanceId,
)
require.NoError(ttt, err)
// event instance.changed
assert.Equal(ttt, instanceName, instance.Name)
assert.WithinRange(t, instance.UpdatedAt, beforeUpdate, afterUpdate)
}, retryDuration, tick)
})
require.NoError(t, err)
instanceName += "new"
_, err = SystemClient.UpdateInstance(CTX, &system.UpdateInstanceRequest{
InstanceId: res.InstanceId,
InstanceName: instanceName,
})
require.NoError(t, err)
instanceRepo := repository.InstanceRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
instanceRepo.NameCondition(database.TextOperationEqual, instanceName),
)
require.NoError(ttt, err)
// event instance.changed
require.Equal(ttt, instanceName, instance.Name)
}, retryDuration, tick)
}
func TestServer_TestInstanceDeleteReduces(t *testing.T) {
instanceName := gofakeit.Name()
res, err := SystemClient.CreateInstance(CTX, &system.CreateInstanceRequest{
InstanceName: instanceName,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{},
t.Run("test instance delete reduces", func(t *testing.T) {
instanceName := gofakeit.Name()
res, err := SystemClient.CreateInstance(CTX, &system.CreateInstanceRequest{
InstanceName: instanceName,
Owner: &system.CreateInstanceRequest_Machine_{
Machine: &system.CreateInstanceRequest_Machine{
UserName: "owner",
Name: "owner",
PersonalAccessToken: &system.CreateInstanceRequest_PersonalAccessToken{},
},
},
},
})
require.NoError(t, err)
instanceRepo := repository.InstanceRepository(pool)
// check instance exists
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
res.InstanceId,
)
require.NoError(ttt, err)
assert.Equal(ttt, instanceName, instance.Name)
}, retryDuration, tick)
_, err = SystemClient.RemoveInstance(CTX, &system.RemoveInstanceRequest{
InstanceId: res.InstanceId,
})
require.NoError(t, err)
retryDuration, tick = integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
res.InstanceId,
)
// event instance.removed
assert.Nil(t, instance)
require.NoError(t, err)
}, retryDuration, tick)
})
require.NoError(t, err)
instanceRepo := repository.InstanceRepository(pool)
// check instance exists
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
instanceRepo.NameCondition(database.TextOperationEqual, instanceName),
)
require.NoError(ttt, err)
require.Equal(ttt, instanceName, instance.Name)
}, retryDuration, tick)
_, err = SystemClient.RemoveInstance(CTX, &system.RemoveInstanceRequest{
InstanceId: res.InstanceId,
})
require.NoError(t, err)
retryDuration, tick = integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
instance, err := instanceRepo.Get(CTX,
instanceRepo.NameCondition(database.TextOperationEqual, instanceName),
)
// event instance.removed
require.Nil(t, instance)
require.NoError(ttt, err)
}, retryDuration, tick)
}

View File

@@ -0,0 +1,218 @@
//go:build integration
package events_test
import (
"testing"
"time"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/backend/v3/domain"
"github.com/zitadel/zitadel/backend/v3/storage/database/repository"
"github.com/zitadel/zitadel/internal/integration"
v2beta_org "github.com/zitadel/zitadel/pkg/grpc/org/v2beta"
)
func TestServer_TestOrganizationReduces(t *testing.T) {
instanceID := Instance.ID()
t.Run("test org add reduces", func(t *testing.T) {
beforeCreate := time.Now()
orgName := gofakeit.Name()
_, err := OrgClient.CreateOrganization(CTX, &v2beta_org.CreateOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
afterCreate := time.Now()
orgRepo := repository.OrganizationRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(tt *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(tt, err)
// event org.added
assert.NotNil(t, organization.ID)
assert.Equal(t, orgName, organization.Name)
assert.NotNil(t, organization.InstanceID)
assert.Equal(t, domain.OrgStateActive.String(), organization.State)
assert.WithinRange(t, organization.CreatedAt, beforeCreate, afterCreate)
assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
assert.Nil(t, organization.DeletedAt)
}, retryDuration, tick)
})
t.Run("test org change reduces", func(t *testing.T) {
orgName := gofakeit.Name()
// 1. create org
organization, err := OrgClient.CreateOrganization(CTX, &v2beta_org.CreateOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. update org name
beforeUpdate := time.Now()
orgName = orgName + "_new"
_, err = OrgClient.UpdateOrganization(CTX, &v2beta_org.UpdateOrganizationRequest{
Id: organization.Id,
Name: orgName,
})
require.NoError(t, err)
afterUpdate := time.Now()
orgRepo := repository.OrganizationRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(t, err)
// event org.changed
assert.Equal(t, orgName, organization.Name)
assert.WithinRange(t, organization.UpdatedAt, beforeUpdate, afterUpdate)
}, retryDuration, tick)
})
t.Run("test org deactivate reduces", func(t *testing.T) {
orgName := gofakeit.Name()
// 1. create org
organization, err := OrgClient.CreateOrganization(CTX, &v2beta_org.CreateOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. deactivate org name
beforeDeactivate := time.Now()
_, err = OrgClient.DeactivateOrganization(CTX, &v2beta_org.DeactivateOrganizationRequest{
Id: organization.Id,
})
require.NoError(t, err)
afterDeactivate := time.Now()
orgRepo := repository.OrganizationRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(t, err)
// event org.deactivate
assert.Equal(t, orgName, organization.Name)
assert.Equal(t, domain.OrgStateInactive.String(), organization.State)
assert.WithinRange(t, organization.UpdatedAt, beforeDeactivate, afterDeactivate)
}, retryDuration, tick)
})
t.Run("test org activate reduces", func(t *testing.T) {
orgName := gofakeit.Name()
// 1. create org
organization, err := OrgClient.CreateOrganization(CTX, &v2beta_org.CreateOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. deactivate org name
_, err = OrgClient.DeactivateOrganization(CTX, &v2beta_org.DeactivateOrganizationRequest{
Id: organization.Id,
})
require.NoError(t, err)
orgRepo := repository.OrganizationRepository(pool)
// 3. check org deactivated
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(t, err)
assert.Equal(t, orgName, organization.Name)
assert.Equal(t, domain.OrgStateInactive.String(), organization.State)
}, retryDuration, tick)
// 4. activate org name
beforeActivate := time.Now()
_, err = OrgClient.ActivateOrganization(CTX, &v2beta_org.ActivateOrganizationRequest{
Id: organization.Id,
})
require.NoError(t, err)
afterActivate := time.Now()
retryDuration, tick = integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(t, err)
// event org.reactivate
assert.Equal(t, orgName, organization.Name)
assert.Equal(t, domain.OrgStateActive.String(), organization.State)
assert.WithinRange(t, organization.UpdatedAt, beforeActivate, afterActivate)
}, retryDuration, tick)
})
t.Run("test org remove reduces", func(t *testing.T) {
orgName := gofakeit.Name()
// 1. create org
organization, err := OrgClient.CreateOrganization(CTX, &v2beta_org.CreateOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// 2. check org retrivable
orgRepo := repository.OrganizationRepository(pool)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(t, err)
if organization == nil {
assert.Fail(t, "this error is here because of a race condition")
}
assert.Equal(t, orgName, organization.Name)
}, retryDuration, tick)
// 3. delete org
_, err = OrgClient.DeleteOrganization(CTX, &v2beta_org.DeleteOrganizationRequest{
Id: organization.Id,
})
require.NoError(t, err)
retryDuration, tick = integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(orgName),
instanceID,
)
require.NoError(t, err)
// event org.remove
assert.Nil(t, organization)
}, retryDuration, tick)
})
}