feat(db): adding org table to relational model

This commit is contained in:
Iraq Jaber
2025-06-12 17:27:49 +02:00
parent a77f88c8fb
commit b37937d333
11 changed files with 678 additions and 75 deletions

View File

@@ -0,0 +1,73 @@
package domain
import (
"context"
"time"
"github.com/zitadel/zitadel/backend/v3/storage/database"
)
var State []string = []string{
"ACTIVE",
"INACTIVE",
}
type Organization struct {
ID string `json:"id,omitempty" db:"id"`
Name string `json:"name,omitempty" db:"name"`
InstanceID string `json:"instance_id,omitempty" db:"instance_id"`
State string `json:"state,omitempty" db:"state"`
CreatedAt time.Time `json:"created_at,omitempty" db:"created_at"`
UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at"`
DeletedAt *time.Time `json:"deleted_at,omitempty" db:"deleted_at"`
}
// organizationColumns define all the columns of the instance table.
type organizationColumns interface {
// IDColumn returns the column for the id field.
IDColumn() database.Column
// NameColumn returns the column for the name field.
NameColumn() database.Column
// InstanceIDColumn returns the column for the default org id field
InstanceIDColumn() database.Column
// StateColumn returns the column for the name field.
StateColumn() database.Column
// CreatedAtColumn returns the column for the created at field.
CreatedAtColumn() database.Column
// UpdatedAtColumn returns the column for the updated at field.
UpdatedAtColumn() database.Column
// DeletedAtColumn returns the column for the deleted at field.
DeletedAtColumn() database.Column
}
// organizationConditions define all the conditions for the instance table.
type organizationConditions interface {
// IDCondition returns an equal filter on the id field.
IDCondition(instanceID string) database.Condition
// NameCondition returns a filter on the name field.
NameCondition(op database.TextOperation, name string) database.Condition
}
// organizationChanges define all the changes for the instance table.
type organizationChanges interface {
// SetName sets the name column.
SetName(name string) database.Change
}
// OrganizationRepository is the interface for the instance repository.
type OrganizationRepository interface {
organizationColumns
organizationConditions
organizationChanges
Get(ctx context.Context, opts ...database.Condition) (*Organization, error)
List(ctx context.Context, opts ...database.Condition) ([]Organization, error)
Create(ctx context.Context, instance *Organization) error
Update(ctx context.Context, condition database.Condition, changes ...database.Change) (int64, error)
Delete(ctx context.Context, condition database.Condition) error
}
type CreateOrganization struct {
Name string `json:"name"`
}

View File

@@ -0,0 +1,16 @@
package migration
import (
_ "embed"
)
var (
//go:embed 002_organization_table/up.sql
up002OrganizationTable string
//go:embed 002_organization_table/down.sql
down002OrganizationTable string
)
func init() {
registerSQLMigration(2, up002OrganizationTable, down002OrganizationTable)
}

View File

@@ -0,0 +1,2 @@
DROP TABLE zitadel.organizations;
DROP Type zitadel.organization_state;

View File

@@ -0,0 +1,14 @@
CREATE TYPE zitadel.organization_state AS ENUM (
'ACTIVE',
'INACTIVE'
);
CREATE TABLE zitadel.organizations(
id TEXT NOT NULL PRIMARY KEY,
name TEXT NOT NULL,
instance_id TEXT NOT NULL,
state zitadel.organization_state NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
deleted_at TIMESTAMP DEFAULT NULL
);

View File

@@ -1,6 +1,6 @@
//go:build integration
package instance_test
package events_test
import (
"context"
@@ -9,9 +9,11 @@ import (
"time"
"github.com/brianvoe/gofakeit/v6"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/pkg/grpc/org/v2"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/dialect/postgres"
@@ -27,6 +29,7 @@ var (
CTX context.Context
Instance *integration.Instance
SystemClient system.SystemServiceClient
OrgClient org.OrganizationServiceClient
)
var pool database.Pool
@@ -40,9 +43,23 @@ func TestMain(m *testing.M) {
CTX = Instance.WithAuthorization(ctx, integration.UserTypeIAMOwner)
SystemClient = integration.SystemClient()
OrgClient = Instance.Client.OrgV2
var err error
dbPool, err = pgxpool.New(context.Background(), ConnString)
dbConfig, err := pgxpool.ParseConfig(ConnString)
if err != nil {
panic(err)
}
dbConfig.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
orgState, err := conn.LoadType(ctx, "zitadel.organization_state")
if err != nil {
return err
}
conn.TypeMap().RegisterType(orgState)
return nil
}
dbPool, err = pgxpool.NewWithConfig(context.Background(), dbConfig)
if err != nil {
panic(err)
}

View File

@@ -0,0 +1,157 @@
//go:build integration
package events_test
import (
"fmt"
"testing"
"time"
"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zitadel/zitadel/backend/v3/storage/database"
"github.com/zitadel/zitadel/backend/v3/storage/database/repository"
"github.com/zitadel/zitadel/internal/integration"
"github.com/zitadel/zitadel/pkg/grpc/org/v2"
)
// const ConnString = "host=localhost port=5432 user=zitadel dbname=zitadel sslmode=disable"
// var (
// dbPool *pgxpool.Pool
// CTX context.Context
// Organization *integration.Organization
// SystemClient system.SystemServiceClient
// )
// var pool database.Pool
// func TestMain(m *testing.M) {
// os.Exit(func() int {
// ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
// defer cancel()
// Organization = integration.NewOrganization(ctx)
// CTX = Organization.WithAuthorization(ctx, integration.UserTypeIAMOwner)
// SystemClient = integration.SystemClient()
// var err error
// dbPool, err = pgxpool.New(context.Background(), ConnString)
// if err != nil {
// panic(err)
// }
// pool = postgres.PGxPool(dbPool)
// return m.Run()
// }())
// }
func TestServer_TestOrganizationAddReduces(t *testing.T) {
orgName := gofakeit.Name()
// beforeCreate := time.Now()
_, err := OrgClient.AddOrganization(CTX, &org.AddOrganizationRequest{
Name: orgName,
})
require.NoError(t, err)
// afterCreate := time.Now()
orgRepo := repository.OrgRepository(pool)
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(t, err)
fmt.Printf("@@ >>>>>>>>>>>>>>>>>>>>>>>>>>>> organization = %+v\n", organization)
retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
organization, err := orgRepo.Get(CTX,
orgRepo.NameCondition(database.TextOperationEqual, orgName),
)
require.NoError(ttt, err)
fmt.Printf("@@ >>>>>>>>>>>>>>>>>>>>>>>>>>>> organization = %+v\n", organization)
// // event instance.added
// require.Equal(ttt, instanceName, organization.Name)
// // event instance.default.org.set
// require.NotNil(t, organization.DefaultOrgID)
// // event instance.iam.project.set
// require.NotNil(t, organization.IAMProjectID)
// // event instance.iam.console.set
// require.NotNil(t, organization.ConsoleAppID)
// // event instance.default.language.set
// require.NotNil(t, organization.DefaultLanguage)
// // event instance.added
// assert.WithinRange(t, organization.CreatedAt, beforeCreate, afterCreate)
// // event instance.added
// assert.WithinRange(t, organization.UpdatedAt, beforeCreate, afterCreate)
// require.Nil(t, organization.DeletedAt)
}, retryDuration, tick)
}
// func TestServer_TestOrganizationUpdateNameReduces(t *testing.T) {
// instanceName := gofakeit.Name()
// res, err := SystemClient.CreateOrganization(CTX, &system.CreateOrganizationRequest{
// OrganizationName: instanceName,
// Owner: &system.CreateOrganizationRequest_Machine_{
// Machine: &system.CreateOrganizationRequest_Machine{
// UserName: "owner",
// Name: "owner",
// PersonalAccessToken: &system.CreateOrganizationRequest_PersonalAccessToken{},
// },
// },
// })
// require.NoError(t, err)
// instanceName += "new"
// _, err = SystemClient.UpdateOrganization(CTX, &system.UpdateOrganizationRequest{
// OrganizationId: res.OrganizationId,
// OrganizationName: instanceName,
// })
// require.NoError(t, err)
// instanceRepo := repository.OrganizationRepository(pool)
// retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
// assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
// instance, err := instanceRepo.Get(CTX,
// instanceRepo.NameCondition(database.TextOperationEqual, instanceName),
// )
// require.NoError(ttt, err)
// // event instance.changed
// require.Equal(ttt, instanceName, instance.Name)
// }, retryDuration, tick)
// }
// func TestServer_TestOrganizationDeleteReduces(t *testing.T) {
// instanceName := gofakeit.Name()
// res, err := SystemClient.CreateOrganization(CTX, &system.CreateOrganizationRequest{
// OrganizationName: instanceName,
// Owner: &system.CreateOrganizationRequest_Machine_{
// Machine: &system.CreateOrganizationRequest_Machine{
// UserName: "owner",
// Name: "owner",
// PersonalAccessToken: &system.CreateOrganizationRequest_PersonalAccessToken{},
// },
// },
// })
// require.NoError(t, err)
// _, err = SystemClient.RemoveOrganization(CTX, &system.RemoveOrganizationRequest{
// OrganizationId: res.OrganizationId,
// })
// require.NoError(t, err)
// instanceRepo := repository.OrganizationRepository(pool)
// retryDuration, tick := integration.WaitForAndTickWithMaxDuration(CTX, time.Minute)
// assert.EventuallyWithT(t, func(ttt *assert.CollectT) {
// instance, err := instanceRepo.Get(CTX,
// instanceRepo.NameCondition(database.TextOperationEqual, instanceName),
// )
// // event instance.removed
// require.Nil(t, instance)
// require.NoError(ttt, err)
// }, retryDuration, tick)
// }

View File

@@ -2,8 +2,11 @@ package repository
import (
"context"
"errors"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/zitadel/backend/v3/domain"
"github.com/zitadel/zitadel/backend/v3/storage/database"
)
@@ -12,11 +15,13 @@ import (
// repository
// -------------------------------------------------------------
var _ domain.OrganizationRepository = (*org)(nil)
type org struct {
repository
}
func OrgRepository(client database.QueryExecutor) domain.OrgRepository {
func OrgRepository(client database.QueryExecutor) domain.OrganizationRepository {
return &org{
repository: repository{
client: client,
@@ -24,116 +29,208 @@ func OrgRepository(client database.QueryExecutor) domain.OrgRepository {
}
}
// Create implements [domain.OrgRepository].
func (o *org) Create(ctx context.Context, org *domain.Org) error {
org.CreatedAt = time.Now()
org.UpdatedAt = org.CreatedAt
return nil
const queryOrganizationStmt = `SELECT id, name, instance_id, state, created_at, updated_at, deleted_at` +
` FROM zitadel.organizations`
// Get implements [domain.OrganizationRepository].
func (o *org) Get(ctx context.Context, opts ...database.Condition) (*domain.Organization, error) {
builder := database.StatementBuilder{}
builder.WriteString(queryOrganizationStmt)
// return only non deleted isntances
opts = append(opts, database.IsNull(o.DeletedAtColumn()))
andCondition := database.And(opts...)
o.writeCondition(&builder, andCondition)
rows, err := o.client.Query(ctx, builder.String(), builder.Args()...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanOrganization(rows)
}
// Delete implements [domain.OrgRepository].
func (o *org) Delete(ctx context.Context, condition database.Condition) error {
return nil
// List implements [domain.OrganizationRepository].
func (o *org) List(ctx context.Context, opts ...database.Condition) ([]domain.Organization, error) {
builder := database.StatementBuilder{}
builder.WriteString(queryOrganizationStmt)
// return only non deleted isntances
opts = append(opts, database.IsNull(o.DeletedAtColumn()))
andCondition := database.And(opts...)
o.writeCondition(&builder, andCondition)
rows, err := o.client.Query(ctx, builder.String(), builder.Args()...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanOrganizations(rows)
}
// Get implements [domain.OrgRepository].
func (o *org) Get(ctx context.Context, opts ...database.QueryOption) (*domain.Org, error) {
panic("unimplemented")
const createOrganizationStmt = `INSERT INTO zitadel.organizations (id, name, instance_id, state)` +
` VALUES ($1, $2, $3, $4)` +
` RETURNING created_at, updated_at`
// Create implements [domain.OrganizationRepository].
func (o *org) Create(ctx context.Context, organization *domain.Organization) error {
builder := database.StatementBuilder{}
builder.AppendArgs(organization.ID, organization.Name, organization.InstanceID, organization.State)
builder.WriteString(createOrganizationStmt)
err := o.client.QueryRow(ctx, builder.String(), builder.Args()...).Scan(&organization.CreatedAt, &organization.UpdatedAt)
if err != nil {
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) {
// constraint violation
if pgErr.Code == "23514" {
if pgErr.ConstraintName == "organizations_name_check" {
return errors.New("instnace name not provided")
}
if pgErr.ConstraintName == "organizations_id_check" {
return errors.New("instnace id not provided")
}
}
// duplicate
if pgErr.Code == "23505" {
if pgErr.ConstraintName == "organizations_pkey" {
return errors.New("instnace id already exists")
}
}
}
}
return err
}
// List implements [domain.OrgRepository].
func (o *org) List(ctx context.Context, opts ...database.QueryOption) ([]*domain.Org, error) {
panic("unimplemented")
// Update implements [domain.OrganizationRepository].
func (o org) Update(ctx context.Context, condition database.Condition, changes ...database.Change) (int64, error) {
builder := database.StatementBuilder{}
builder.WriteString(`UPDATE zitadel.organizations SET `)
database.Changes(changes).Write(&builder)
o.writeCondition(&builder, condition)
stmt := builder.String()
rowsAffected, err := o.client.Exec(ctx, stmt, builder.Args()...)
return rowsAffected, err
}
// Update implements [domain.OrgRepository].
func (o *org) Update(ctx context.Context, condition database.Condition, changes ...database.Change) error {
panic("unimplemented")
}
// Delete implements [domain.OrganizationRepository].
func (o org) Delete(ctx context.Context, condition database.Condition) error {
if condition == nil {
return errors.New("Delete must contain a condition") // (otherwise ALL organizations will be deleted)
}
builder := database.StatementBuilder{}
builder.WriteString(`UPDATE zitadel.organizations SET deleted_at = $1`)
builder.AppendArgs(time.Now())
func (o *org) Member() domain.MemberRepository {
return &orgMember{o}
}
func (o *org) Domain() domain.DomainRepository {
return &orgDomain{o}
o.writeCondition(&builder, condition)
_, err := o.client.Exec(ctx, builder.String(), builder.Args()...)
return err
}
// -------------------------------------------------------------
// changes
// -------------------------------------------------------------
// SetName implements [domain.orgChanges].
func (o *org) SetName(name string) database.Change {
return database.NewChange(o.NameColumn(), name)
}
// SetState implements [domain.orgChanges].
func (o *org) SetState(state domain.OrgState) database.Change {
return database.NewChange(o.StateColumn(), state)
// SetName implements [domain.organizationChanges].
func (i org) SetName(name string) database.Change {
return database.NewChange(i.NameColumn(), name)
}
// -------------------------------------------------------------
// conditions
// -------------------------------------------------------------
// IDCondition implements [domain.orgConditions].
func (o *org) IDCondition(orgID string) database.Condition {
return database.NewTextCondition(o.IDColumn(), database.TextOperationEqual, orgID)
// IDCondition implements [domain.organizationConditions].
func (o org) IDCondition(id string) database.Condition {
return database.NewTextCondition(o.IDColumn(), database.TextOperationEqual, id)
}
// InstanceIDCondition implements [domain.orgConditions].
func (o *org) InstanceIDCondition(instanceID string) database.Condition {
return database.NewTextCondition(o.InstanceIDColumn(), database.TextOperationEqual, instanceID)
}
// NameCondition implements [domain.orgConditions].
func (o *org) NameCondition(op database.TextOperation, name string) database.Condition {
// NameCondition implements [domain.organizationConditions].
func (o org) NameCondition(op database.TextOperation, name string) database.Condition {
return database.NewTextCondition(o.NameColumn(), op, name)
}
// StateCondition implements [domain.orgConditions].
func (o *org) StateCondition(op database.NumberOperation, state domain.OrgState) database.Condition {
return database.NewNumberCondition(o.StateColumn(), op, state)
}
// -------------------------------------------------------------
// columns
// -------------------------------------------------------------
// CreatedAtColumn implements [domain.orgColumns].
func (o *org) CreatedAtColumn() database.Column {
return database.NewColumn("created_at")
}
// DeletedAtColumn implements [domain.orgColumns].
func (o *org) DeletedAtColumn() database.Column {
return database.NewColumn("deleted_at")
}
// IDColumn implements [domain.orgColumns].
func (o *org) IDColumn() database.Column {
// IDColumn implements [domain.organizationColumns].
func (org) IDColumn() database.Column {
return database.NewColumn("id")
}
// InstanceIDColumn implements [domain.orgColumns].
func (o *org) InstanceIDColumn() database.Column {
return database.NewColumn("instance_id")
}
// NameColumn implements [domain.orgColumns].
func (o *org) NameColumn() database.Column {
// NameColumn implements [domain.organizationColumns].
func (org) NameColumn() database.Column {
return database.NewColumn("name")
}
// StateColumn implements [domain.orgColumns].
func (o *org) StateColumn() database.Column {
// InstanceIDColumn implements [domain.organizationColumns].
func (org) InstanceIDColumn() database.Column {
return database.NewColumn("instance_id")
}
// StateColumn implements [domain.organizationColumns].
func (org) StateColumn() database.Column {
return database.NewColumn("state")
}
// UpdatedAtColumn implements [domain.orgColumns].
func (o *org) UpdatedAtColumn() database.Column {
// CreatedAtColumn implements [domain.organizationColumns].
func (org) CreatedAtColumn() database.Column {
return database.NewColumn("created_at")
}
// UpdatedAtColumn implements [domain.organizationColumns].
func (org) UpdatedAtColumn() database.Column {
return database.NewColumn("updated_at")
}
var _ domain.OrgRepository = (*org)(nil)
// DeletedAtColumn implements [domain.organizationColumns].
func (org) DeletedAtColumn() database.Column {
return database.NewColumn("deleted_at")
}
func (o *org) writeCondition(
builder *database.StatementBuilder,
condition database.Condition,
) {
if condition == nil {
return
}
builder.WriteString(" WHERE ")
condition.Write(builder)
}
func scanOrganization(rows database.Rows) (*domain.Organization, error) {
organization, err := pgx.CollectOneRow[domain.Organization](rows, pgx.RowToStructByNameLax[domain.Organization])
if err != nil {
// if no results returned, this is not a error
// it just means the organization was not found
// the caller should check if the returned organization is nil
if err.Error() == "no rows in result set" {
return nil, nil
}
return nil, err
}
return &organization, nil
}
func scanOrganizations(rows database.Rows) ([]domain.Organization, error) {
organizations, err := pgx.CollectRows[domain.Organization](rows, pgx.RowToStructByNameLax[domain.Organization])
if err != nil {
// if no results returned, this is not a error
// it just means the organization was not found
// the caller should check if the returned organization is nil
if err.Error() == "no rows in result set" {
return nil, nil
}
return nil, err
}
return organizations, nil
}

View File

@@ -87,6 +87,22 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) {
return err
}
}
// Checken & Egg problem:
// conn.Exec(ctx, "CREATE SCHEMA IF NOT EXISTS zitadel")
// conn.Exec(ctx, `
// CREATE TYPE zitadel.organization_state AS ENUM (
// 'ACTIVE',
// 'INACTIVE'
// )`,
// )
// orgState, err := conn.LoadType(ctx, "zitadel.organization_state")
// if err != nil {
// return err
// }
// conn.TypeMap().RegisterType(orgState)
return nil
}
}

View File

@@ -0,0 +1,207 @@
package projection
import (
"context"
repoDomain "github.com/zitadel/zitadel/backend/v3/domain"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/org"
"github.com/zitadel/zitadel/internal/zerrors"
)
const (
OrgRelationProjectionTable = "zitadel.organizations"
)
type orgRelationalProjection struct{}
func (*orgRelationalProjection) Name() string {
return OrgRelationProjectionTable
}
func newOrgRelationalProjection(ctx context.Context, config handler.Config) *handler.Handler {
return handler.NewHandler(ctx, &config, new(orgRelationalProjection))
}
// Init implements [handler.initializer]
func (p *orgRelationalProjection) Init() *old_handler.Check {
return handler.NewTableCheck(
handler.NewTable([]*handler.InitColumn{
handler.NewColumn(OrgColumnID, handler.ColumnTypeText),
handler.NewColumn(OrgColumnName, handler.ColumnTypeText),
handler.NewColumn(OrgColumnInstanceID, handler.ColumnTypeText),
handler.NewColumn(State, handler.ColumnTypeEnum),
handler.NewColumn(CreatedAt, handler.ColumnTypeTimestamp),
handler.NewColumn(UpdatedAt, handler.ColumnTypeTimestamp),
handler.NewColumn(DeletedAt, handler.ColumnTypeTimestamp),
},
handler.NewPrimaryKey(OrgColumnInstanceID, OrgColumnID),
// handler.WithIndex(handler.NewIndex("domain", []string{OrgColumnDomain})),
handler.WithIndex(handler.NewIndex("name", []string{OrgColumnName})),
),
)
}
func (p *orgRelationalProjection) Reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{
{
Aggregate: org.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: org.OrgAddedEventType,
Reduce: p.reduceOrgRelationalAdded,
},
{
Event: org.OrgChangedEventType,
Reduce: p.reduceOrgRelationalChanged,
},
{
Event: org.OrgDeactivatedEventType,
Reduce: p.reduceOrgRelationalDeactivated,
},
{
Event: org.OrgReactivatedEventType,
Reduce: p.reduceOrgRelationalReactivated,
},
{
Event: org.OrgRemovedEventType,
Reduce: p.reduceOrgRelationalRemoved,
},
// TODO
// {
// Event: org.OrgDomainPrimarySetEventType,
// Reduce: p.reducePrimaryDomainSetRelational,
// },
},
},
{
Aggregate: instance.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: instance.InstanceRemovedEventType,
Reduce: reduceInstanceRemovedHelper(OrgColumnInstanceID),
},
},
},
}
}
func (p *orgRelationalProjection) reduceOrgRelationalAdded(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*org.OrgAddedEvent)
if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-uYq5R", "reduce.wrong.event.type %s", org.OrgAddedEventType)
}
// need to convert old state (int) to new state (enum)
state := repoDomain.State[domain.OrgStateActive-1]
return handler.NewCreateStatement(
e,
[]handler.Column{
handler.NewCol(OrgColumnID, e.Aggregate().ID),
handler.NewCol(OrgColumnName, e.Name),
handler.NewCol(OrgColumnInstanceID, e.Aggregate().InstanceID),
handler.NewCol(State, state),
handler.NewCol(CreatedAt, e.CreationDate()),
handler.NewCol(UpdatedAt, e.CreationDate()),
},
), nil
}
func (p *orgRelationalProjection) reduceOrgRelationalChanged(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*org.OrgChangedEvent)
if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-Bg9om", "reduce.wrong.event.type %s", org.OrgChangedEventType)
}
if e.Name == "" {
return handler.NewNoOpStatement(e), nil
}
return handler.NewUpdateStatement(
e,
[]handler.Column{
handler.NewCol(OrgColumnName, e.Name),
handler.NewCol(UpdatedAt, e.CreationDate()),
},
[]handler.Condition{
handler.NewCond(OrgColumnID, e.Aggregate().ID),
handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID),
},
), nil
}
func (p *orgRelationalProjection) reduceOrgRelationalDeactivated(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*org.OrgDeactivatedEvent)
if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-BApK5", "reduce.wrong.event.type %s", org.OrgDeactivatedEventType)
}
return handler.NewUpdateStatement(
e,
[]handler.Column{
handler.NewCol(State, domain.OrgStateInactive),
handler.NewCol(UpdatedAt, e.CreationDate()),
},
[]handler.Condition{
handler.NewCond(OrgColumnID, e.Aggregate().ID),
handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID),
},
), nil
}
func (p *orgRelationalProjection) reduceOrgRelationalReactivated(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*org.OrgReactivatedEvent)
if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-o38DE", "reduce.wrong.event.type %s", org.OrgReactivatedEventType)
}
return handler.NewUpdateStatement(
e,
[]handler.Column{
handler.NewCol(State, domain.OrgStateActive),
handler.NewCol(UpdatedAt, e.CreationDate()),
},
[]handler.Condition{
handler.NewCond(OrgColumnID, e.Aggregate().ID),
handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID),
},
), nil
}
// TODO
// func (p *orgRelationalProjection) reducePrimaryDomainSetRelational(event eventstore.Event) (*handler.Statement, error) {
// e, ok := event.(*org.DomainPrimarySetEvent)
// if !ok {
// return nil, zerrors.ThrowInvalidArgumentf(nil, "HANDL-3Tbkt", "reduce.wrong.event.type %s", org.OrgDomainPrimarySetEventType)
// }
// return handler.NewUpdateStatement(
// e,
// []handler.Column{
// handler.NewCol(OrgColumnChangeDate, e.CreationDate()),
// handler.NewCol(OrgColumnSequence, e.Sequence()),
// handler.NewCol(OrgColumnDomain, e.Domain),
// },
// []handler.Condition{
// handler.NewCond(OrgColumnID, e.Aggregate().ID),
// handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID),
// },
// ), nil
// }
func (p *orgRelationalProjection) reduceOrgRelationalRemoved(event eventstore.Event) (*handler.Statement, error) {
e, ok := event.(*org.OrgRemovedEvent)
if !ok {
return nil, zerrors.ThrowInvalidArgumentf(nil, "PROJE-DGm9g", "reduce.wrong.event.type %s", org.OrgRemovedEventType)
}
return handler.NewUpdateStatement(
e,
[]handler.Column{
handler.NewCol(DeletedAt, e.CreationDate()),
},
[]handler.Condition{
handler.NewCond(OrgColumnID, e.Aggregate().ID),
handler.NewCond(OrgColumnInstanceID, e.Aggregate().InstanceID),
},
), nil
}

View File

@@ -56,6 +56,7 @@ var (
UserAuthMethodProjection *handler.Handler
InstanceProjection *handler.Handler
InstanceRelationalProjection *handler.Handler
OrganizationRelationalProjection *handler.Handler
SecretGeneratorProjection *handler.Handler
SMTPConfigProjection *handler.Handler
SMSConfigProjection *handler.Handler
@@ -151,7 +152,8 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore,
UserMetadataProjection = newUserMetadataProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["user_metadata"]))
UserAuthMethodProjection = newUserAuthMethodProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["user_auth_method"]))
InstanceProjection = newInstanceProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instances"]))
InstanceRelationalProjection = newInstanceRelationalProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instances_relational"]))
InstanceRelationalProjection = newInstanceRelationalProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["organizations_relational"]))
OrganizationRelationalProjection = newOrgRelationalProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instances_relational"]))
SecretGeneratorProjection = newSecretGeneratorProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["secret_generators"]))
SMTPConfigProjection = newSMTPConfigProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["smtp_configs"]))
SMSConfigProjection = newSMSConfigProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["sms_config"]))
@@ -307,6 +309,7 @@ func newProjectionsList() {
UserAuthMethodProjection,
InstanceProjection,
InstanceRelationalProjection,
OrganizationRelationalProjection,
SecretGeneratorProjection,
SMTPConfigProjection,
SMSConfigProjection,

View File

@@ -1,6 +1,7 @@
package projection
const (
State = "state"
CreatedAt = "created_at"
UpdatedAt = "updated_at"
DeletedAt = "deleted_at"