Files
zitadel/backend/v3/storage/database/repository/org.go

283 lines
9.1 KiB
Go
Raw Normal View History

feat(backend): state persisted objects (#9870) This PR initiates the rework of Zitadel's backend to state-persisted objects. This change is a step towards a more scalable and maintainable architecture. ## Changes * **New `/backend/v3` package**: A new package structure has been introduced to house the reworked backend logic. This includes: * `domain`: Contains the core business logic, commands, and repository interfaces. * `storage`: Implements the repository interfaces for database interactions with new transactional tables. * `telemetry`: Provides logging and tracing capabilities. * **Transactional Tables**: New database tables have been defined for `instances`, `instance_domains`, `organizations`, and `org_domains`. * **Projections**: New projections have been created to populate the new relational tables from the existing event store, ensuring data consistency during the migration. * **Repositories**: New repositories provide an abstraction layer for accessing and manipulating the data in the new tables. * **Setup**: A new setup step for `TransactionalTables` has been added to manage the database migrations for the new tables. This PR lays the foundation for future work to fully transition to state-persisted objects for these components, which will improve performance and simplify data access patterns. This PR initiates the rework of ZITADEL's backend to state-persisted objects. This is a foundational step towards a new architecture that will improve performance and maintainability. The following objects are migrated from event-sourced aggregates to state-persisted objects: * Instances * incl. Domains * Orgs * incl. Domains The structure of the new backend implementation follows the software architecture defined in this [wiki page](https://github.com/zitadel/zitadel/wiki/Software-Architecturel). This PR includes: * The initial implementation of the new transactional repositories for the objects listed above. * Projections to populate the new relational tables from the existing event store. * Adjustments to the build and test process to accommodate the new backend structure. This is a work in progress and further changes will be made to complete the migration. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Iraq Jaber <iraq+github@zitadel.com> Co-authored-by: Iraq <66622793+kkrime@users.noreply.github.com> Co-authored-by: Tim Möhlmann <tim+github@zitadel.com>
2025-09-05 10:54:34 +02:00
package repository
import (
"context"
"encoding/json"
"github.com/zitadel/zitadel/backend/v3/domain"
"github.com/zitadel/zitadel/backend/v3/storage/database"
)
// -------------------------------------------------------------
// repository
// -------------------------------------------------------------
var _ domain.OrganizationRepository = (*org)(nil)
type org struct {
repository
shouldLoadDomains bool
domainRepo domain.OrganizationDomainRepository
}
func OrganizationRepository(client database.QueryExecutor) domain.OrganizationRepository {
return &org{
repository: repository{
client: client,
},
}
}
const queryOrganizationStmt = `SELECT organizations.id, organizations.name, organizations.instance_id, organizations.state, organizations.created_at, organizations.updated_at` +
` , CASE WHEN count(org_domains.domain) > 0 THEN jsonb_agg(json_build_object('domain', org_domains.domain, 'isVerified', org_domains.is_verified, 'isPrimary', org_domains.is_primary, 'validationType', org_domains.validation_type, 'createdAt', org_domains.created_at, 'updatedAt', org_domains.updated_at)) ELSE NULL::JSONB END domains` +
` FROM zitadel.organizations`
// Get implements [domain.OrganizationRepository].
func (o *org) Get(ctx context.Context, opts ...database.QueryOption) (*domain.Organization, error) {
opts = append(opts,
o.joinDomains(),
database.WithGroupBy(o.InstanceIDColumn(), o.IDColumn()),
)
options := new(database.QueryOpts)
for _, opt := range opts {
opt(options)
}
var builder database.StatementBuilder
builder.WriteString(queryOrganizationStmt)
options.Write(&builder)
return scanOrganization(ctx, o.client, &builder)
}
// List implements [domain.OrganizationRepository].
func (o *org) List(ctx context.Context, opts ...database.QueryOption) ([]*domain.Organization, error) {
opts = append(opts,
o.joinDomains(),
database.WithGroupBy(o.InstanceIDColumn(), o.IDColumn()),
)
options := new(database.QueryOpts)
for _, opt := range opts {
opt(options)
}
var builder database.StatementBuilder
builder.WriteString(queryOrganizationStmt)
options.Write(&builder)
return scanOrganizations(ctx, o.client, &builder)
}
func (o *org) joinDomains() database.QueryOption {
columns := make([]database.Condition, 0, 3)
columns = append(columns,
database.NewColumnCondition(o.InstanceIDColumn(), o.Domains(false).InstanceIDColumn()),
database.NewColumnCondition(o.IDColumn(), o.Domains(false).OrgIDColumn()),
)
// If domains should not be joined, we make sure to return null for the domain columns
// the query optimizer of the dialect should optimize this away if no domains are requested
if !o.shouldLoadDomains {
columns = append(columns, database.IsNull(o.domainRepo.OrgIDColumn()))
}
return database.WithLeftJoin(
"zitadel.org_domains",
database.And(columns...),
)
}
const createOrganizationStmt = `INSERT INTO zitadel.organizations (id, name, instance_id, state)` +
` VALUES ($1, $2, $3, $4)` +
` RETURNING created_at, updated_at`
// Create implements [domain.OrganizationRepository].
func (o *org) Create(ctx context.Context, organization *domain.Organization) error {
builder := database.StatementBuilder{}
builder.AppendArgs(organization.ID, organization.Name, organization.InstanceID, organization.State)
builder.WriteString(createOrganizationStmt)
return o.client.QueryRow(ctx, builder.String(), builder.Args()...).Scan(&organization.CreatedAt, &organization.UpdatedAt)
}
// Update implements [domain.OrganizationRepository].
func (o *org) Update(ctx context.Context, id domain.OrgIdentifierCondition, instanceID string, changes ...database.Change) (int64, error) {
if len(changes) == 0 {
return 0, database.ErrNoChanges
}
builder := database.StatementBuilder{}
builder.WriteString(`UPDATE zitadel.organizations SET `)
instanceIDCondition := o.InstanceIDCondition(instanceID)
conditions := []database.Condition{id, instanceIDCondition}
database.Changes(changes).Write(&builder)
writeCondition(&builder, database.And(conditions...))
stmt := builder.String()
rowsAffected, err := o.client.Exec(ctx, stmt, builder.Args()...)
return rowsAffected, err
}
// Delete implements [domain.OrganizationRepository].
func (o *org) Delete(ctx context.Context, id domain.OrgIdentifierCondition, instanceID string) (int64, error) {
builder := database.StatementBuilder{}
builder.WriteString(`DELETE FROM zitadel.organizations`)
instanceIDCondition := o.InstanceIDCondition(instanceID)
conditions := []database.Condition{id, instanceIDCondition}
writeCondition(&builder, database.And(conditions...))
return o.client.Exec(ctx, builder.String(), builder.Args()...)
}
// -------------------------------------------------------------
// changes
// -------------------------------------------------------------
// SetName implements [domain.organizationChanges].
func (o org) SetName(name string) database.Change {
return database.NewChange(o.NameColumn(), name)
}
// SetState implements [domain.organizationChanges].
func (o org) SetState(state domain.OrgState) database.Change {
return database.NewChange(o.StateColumn(), state)
}
// -------------------------------------------------------------
// conditions
// -------------------------------------------------------------
// IDCondition implements [domain.organizationConditions].
func (o org) IDCondition(id string) domain.OrgIdentifierCondition {
return database.NewTextCondition(o.IDColumn(), database.TextOperationEqual, id)
}
// NameCondition implements [domain.organizationConditions].
func (o org) NameCondition(name string) domain.OrgIdentifierCondition {
return database.NewTextCondition(o.NameColumn(), database.TextOperationEqual, name)
}
// InstanceIDCondition implements [domain.organizationConditions].
func (o org) InstanceIDCondition(instanceID string) database.Condition {
return database.NewTextCondition(o.InstanceIDColumn(), database.TextOperationEqual, instanceID)
}
// StateCondition implements [domain.organizationConditions].
func (o org) StateCondition(state domain.OrgState) database.Condition {
return database.NewTextCondition(o.StateColumn(), database.TextOperationEqual, state.String())
}
// -------------------------------------------------------------
// columns
// -------------------------------------------------------------
// IDColumn implements [domain.organizationColumns].
func (org) IDColumn() database.Column {
return database.NewColumn("organizations", "id")
}
// NameColumn implements [domain.organizationColumns].
func (org) NameColumn() database.Column {
return database.NewColumn("organizations", "name")
}
// InstanceIDColumn implements [domain.organizationColumns].
func (org) InstanceIDColumn() database.Column {
return database.NewColumn("organizations", "instance_id")
}
// StateColumn implements [domain.organizationColumns].
func (org) StateColumn() database.Column {
return database.NewColumn("organizations", "state")
}
// CreatedAtColumn implements [domain.organizationColumns].
func (org) CreatedAtColumn() database.Column {
return database.NewColumn("organizations", "created_at")
}
// UpdatedAtColumn implements [domain.organizationColumns].
func (org) UpdatedAtColumn() database.Column {
return database.NewColumn("organizations", "updated_at")
}
// -------------------------------------------------------------
// scanners
// -------------------------------------------------------------
type rawOrganization struct {
*domain.Organization
RawDomains json.RawMessage `json:"domains,omitempty" db:"domains"`
}
func scanOrganization(ctx context.Context, querier database.Querier, builder *database.StatementBuilder) (*domain.Organization, error) {
rows, err := querier.Query(ctx, builder.String(), builder.Args()...)
if err != nil {
return nil, err
}
var org rawOrganization
if err := rows.(database.CollectableRows).CollectExactlyOneRow(&org); err != nil {
return nil, err
}
if len(org.RawDomains) > 0 {
if err := json.Unmarshal(org.RawDomains, &org.Domains); err != nil {
return nil, err
}
}
return org.Organization, nil
}
func scanOrganizations(ctx context.Context, querier database.Querier, builder *database.StatementBuilder) ([]*domain.Organization, error) {
rows, err := querier.Query(ctx, builder.String(), builder.Args()...)
if err != nil {
return nil, err
}
var rawOrgs []*rawOrganization
if err := rows.(database.CollectableRows).Collect(&rawOrgs); err != nil {
return nil, err
}
organizations := make([]*domain.Organization, len(rawOrgs))
for i, org := range rawOrgs {
if len(org.RawDomains) > 0 {
if err := json.Unmarshal(org.RawDomains, &org.Domains); err != nil {
return nil, err
}
}
organizations[i] = org.Organization
}
return organizations, nil
}
// -------------------------------------------------------------
// sub repositories
// -------------------------------------------------------------
// Domains implements [domain.OrganizationRepository].
func (o *org) Domains(shouldLoad bool) domain.OrganizationDomainRepository {
if !o.shouldLoadDomains {
o.shouldLoadDomains = shouldLoad
}
if o.domainRepo != nil {
return o.domainRepo
}
o.domainRepo = &orgDomain{
repository: o.repository,
org: o,
}
return o.domainRepo
}