perf(milestones): refactor (#8788)
Some checks are pending
ZITADEL CI/CD / core (push) Waiting to run
ZITADEL CI/CD / console (push) Waiting to run
ZITADEL CI/CD / version (push) Waiting to run
ZITADEL CI/CD / compile (push) Blocked by required conditions
ZITADEL CI/CD / core-unit-test (push) Blocked by required conditions
ZITADEL CI/CD / core-integration-test (push) Blocked by required conditions
ZITADEL CI/CD / lint (push) Blocked by required conditions
ZITADEL CI/CD / container (push) Blocked by required conditions
ZITADEL CI/CD / e2e (push) Blocked by required conditions
ZITADEL CI/CD / release (push) Blocked by required conditions
Code Scanning / CodeQL-Build (go) (push) Waiting to run
Code Scanning / CodeQL-Build (javascript) (push) Waiting to run

# Which Problems Are Solved

Milestones used existing events from a number of aggregates. OIDC
session is one of them. We noticed in load-tests that the reduction of
the oidc_session.added event into the milestone projection is a costly
business with payload based conditionals. A milestone is reached once,
but even then we remain subscribed to the OIDC events. This requires the
projections.current_states to be updated continuously.


# How the Problems Are Solved

The milestone creation is refactored to use dedicated events instead.
The command side decides when a milestone is reached and creates the
reached event once for each milestone when required.

# Additional Changes

In order to prevent reached milestones being created twice, a migration
script is provided. When the old `projections.milestones` table exist,
the state is read from there and `v2` milestone aggregate events are
created, with the original reached and pushed dates.

# Additional Context

- Closes https://github.com/zitadel/zitadel/issues/8800
This commit is contained in:
Tim Möhlmann
2024-10-28 09:29:34 +01:00
committed by GitHub
parent 54f1c0bc50
commit 32bad3feb3
46 changed files with 1612 additions and 756 deletions

View File

@@ -163,6 +163,7 @@ func projections(
}
commands, err := command.StartCommands(
es,
config.Caches,
config.SystemDefaults,
config.InternalAuthZ.RolePermissionMappings,
staticStorage,

View File

@@ -65,6 +65,7 @@ func (mig *FirstInstance) Execute(ctx context.Context, _ eventstore.Event) error
}
cmd, err := command.StartCommands(mig.es,
nil,
mig.defaults,
mig.zitadelRoles,
nil,

118
cmd/setup/36.go Normal file
View File

@@ -0,0 +1,118 @@
package setup
import (
"context"
_ "embed"
"errors"
"fmt"
"slices"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/milestone"
)
var (
//go:embed 36.sql
getProjectedMilestones string
)
type FillV2Milestones struct {
dbClient *database.DB
eventstore *eventstore.Eventstore
}
type instanceMilestone struct {
Type milestone.Type
Reached time.Time
Pushed *time.Time
}
func (mig *FillV2Milestones) Execute(ctx context.Context, _ eventstore.Event) error {
im, err := mig.getProjectedMilestones(ctx)
if err != nil {
return err
}
return mig.pushEventsByInstance(ctx, im)
}
func (mig *FillV2Milestones) getProjectedMilestones(ctx context.Context) (map[string][]instanceMilestone, error) {
type row struct {
InstanceID string
Type milestone.Type
Reached time.Time
Pushed *time.Time
}
rows, _ := mig.dbClient.Pool.Query(ctx, getProjectedMilestones)
scanned, err := pgx.CollectRows(rows, pgx.RowToStructByPos[row])
var pgError *pgconn.PgError
// catch ERROR: relation "projections.milestones" does not exist
if errors.As(err, &pgError) && pgError.SQLState() == "42P01" {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("milestones get: %w", err)
}
milestoneMap := make(map[string][]instanceMilestone)
for _, s := range scanned {
milestoneMap[s.InstanceID] = append(milestoneMap[s.InstanceID], instanceMilestone{
Type: s.Type,
Reached: s.Reached,
Pushed: s.Pushed,
})
}
return milestoneMap, nil
}
// pushEventsByInstance creates the v2 milestone events by instance.
// This prevents we will try to push 6*N(instance) events in one push.
func (mig *FillV2Milestones) pushEventsByInstance(ctx context.Context, milestoneMap map[string][]instanceMilestone) error {
// keep a deterministic order by instance ID.
order := make([]string, 0, len(milestoneMap))
for k := range milestoneMap {
order = append(order, k)
}
slices.Sort(order)
for _, instanceID := range order {
logging.WithFields("instance_id", instanceID, "migration", mig.String()).Info("filter existing milestone events")
// because each Push runs in a separate TX, we need to make sure that events
// from a partially executed migration are pushed again.
model := command.NewMilestonesReachedWriteModel(instanceID)
if err := mig.eventstore.FilterToQueryReducer(ctx, model); err != nil {
return fmt.Errorf("milestones filter: %w", err)
}
if model.InstanceCreated {
logging.WithFields("instance_id", instanceID, "migration", mig.String()).Info("milestone events already migrated")
continue // This instance was migrated, skip
}
logging.WithFields("instance_id", instanceID, "migration", mig.String()).Info("push milestone events")
aggregate := milestone.NewInstanceAggregate(instanceID)
cmds := make([]eventstore.Command, 0, len(milestoneMap[instanceID])*2)
for _, m := range milestoneMap[instanceID] {
cmds = append(cmds, milestone.NewReachedEventWithDate(ctx, aggregate, m.Type, &m.Reached))
if m.Pushed != nil {
cmds = append(cmds, milestone.NewPushedEventWithDate(ctx, aggregate, m.Type, nil, "", m.Pushed))
}
}
if _, err := mig.eventstore.Push(ctx, cmds...); err != nil {
return fmt.Errorf("milestones push: %w", err)
}
}
return nil
}
func (mig *FillV2Milestones) String() string {
return "36_fill_v2_milestones"
}

4
cmd/setup/36.sql Normal file
View File

@@ -0,0 +1,4 @@
SELECT instance_id, type, reached_date, last_pushed_date
FROM projections.milestones
WHERE reached_date IS NOT NULL
ORDER BY instance_id, reached_date;

View File

@@ -122,6 +122,7 @@ type Steps struct {
s33SMSConfigs3TwilioAddVerifyServiceSid *SMSConfigs3TwilioAddVerifyServiceSid
s34AddCacheSchema *AddCacheSchema
s35AddPositionToIndexEsWm *AddPositionToIndexEsWm
s36FillV2Milestones *FillV2Milestones
}
func MustNewSteps(v *viper.Viper) *Steps {

View File

@@ -33,6 +33,7 @@ func (mig *externalConfigChange) Check(lastRun map[string]interface{}) bool {
func (mig *externalConfigChange) Execute(ctx context.Context, _ eventstore.Event) error {
cmd, err := command.StartCommands(
mig.es,
nil,
mig.defaults,
nil,
nil,

View File

@@ -165,6 +165,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s33SMSConfigs3TwilioAddVerifyServiceSid = &SMSConfigs3TwilioAddVerifyServiceSid{dbClient: esPusherDBClient}
steps.s34AddCacheSchema = &AddCacheSchema{dbClient: queryDBClient}
steps.s35AddPositionToIndexEsWm = &AddPositionToIndexEsWm{dbClient: esPusherDBClient}
steps.s36FillV2Milestones = &FillV2Milestones{dbClient: queryDBClient, eventstore: eventstoreClient}
err = projection.Create(ctx, projectionDBClient, eventstoreClient, config.Projections, nil, nil, nil)
logging.OnError(err).Fatal("unable to start projections")
@@ -209,6 +210,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
steps.s30FillFieldsForOrgDomainVerified,
steps.s34AddCacheSchema,
steps.s35AddPositionToIndexEsWm,
steps.s36FillV2Milestones,
} {
mustExecuteMigration(ctx, eventstoreClient, step, "migration failed")
}
@@ -390,6 +392,7 @@ func initProjections(
}
commands, err := command.StartCommands(
eventstoreClient,
config.Caches,
config.SystemDefaults,
config.InternalAuthZ.RolePermissionMappings,
staticStorage,

View File

@@ -224,6 +224,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
}
commands, err := command.StartCommands(
eventstoreClient,
config.Caches,
config.SystemDefaults,
config.InternalAuthZ.RolePermissionMappings,
storage,