perf(actionsv2): execution target router (#10564)

# Which Problems Are Solved

The event execution system currently uses a projection handler that
subscribes to and processes all events for all instances. This creates a
high static cost because the system over-fetches event data, handling
many events that are not needed by most instances. This inefficiency is
also reflected in high "rows returned" metrics in the database.

# How the Problems Are Solved

Eliminate the use of a project handler. Instead, events for which
"execution targets" are defined, are directly pushed to the queue by the
eventstore. A Router is populated in the Instance object in the authz
middleware.

- By joining the execution targets to the instance, no additional
queries are needed anymore.
- As part of the instance object, execution targets are now cached as
well.
- Events are queued within the same transaction, giving transactional
guarantees on delivery.
- Uses the "insert many fast` variant of River. Multiple jobs are queued
in a single round-trip to the database.
- Fix compatibility with PostgreSQL 15

# Additional Changes

- The signing key was stored as plain-text in the river job payload in
the DB. This violated our [Secrets
Storage](https://zitadel.com/docs/concepts/architecture/secrets#secrets-storage)
principle. This change removed the field and only uses the encrypted
version of the signing key.
- Fixed the target ordering from descending to ascending.
- Some minor linter warnings on the use of `io.WriteString()`.

# Additional Context

- Introduced in https://github.com/zitadel/zitadel/pull/9249
- Closes https://github.com/zitadel/zitadel/issues/10553
- Closes https://github.com/zitadel/zitadel/issues/9832
- Closes https://github.com/zitadel/zitadel/issues/10372
- Closes https://github.com/zitadel/zitadel/issues/10492

---------

Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
(cherry picked from commit a9ebc06c77)
This commit is contained in:
Tim Möhlmann
2025-09-01 08:21:10 +03:00
committed by Livio Spring
parent d0d8e904c4
commit 2727fa719d
76 changed files with 1316 additions and 1815 deletions

View File

@@ -15,10 +15,12 @@ import (
"github.com/zitadel/logging"
"golang.org/x/text/language"
"github.com/zitadel/zitadel/cmd/build"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
target_domain "github.com/zitadel/zitadel/internal/execution/target"
"github.com/zitadel/zitadel/internal/feature"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
@@ -216,7 +218,7 @@ func (q *Queries) InstanceByHost(ctx context.Context, instanceHost, publicHost s
publicDomain := strings.Split(publicHost, ":")[0] // remove possible port
instance, ok := q.caches.instance.Get(ctx, instanceIndexByHost, instanceDomain)
if ok {
if ok && instance.ZitadelVersion == build.Version() {
return instance, instance.checkDomain(instanceDomain, publicDomain)
}
instance, scan := scanAuthzInstance()
@@ -239,14 +241,16 @@ func (q *Queries) InstanceByID(ctx context.Context, id string) (_ authz.Instance
}()
instance, ok := q.caches.instance.Get(ctx, instanceIndexByID, id)
if ok {
if ok && instance.ZitadelVersion == build.Version() {
return instance, nil
}
instance, scan := scanAuthzInstance()
err = q.client.QueryRowContext(ctx, scan, instanceByIDQuery, id)
logging.OnError(err).WithField("instance_id", id).Warn("instance by ID")
if err == nil {
instance.ZitadelVersion = build.Version()
q.caches.instance.Set(ctx, instance)
}
return instance, err
@@ -460,19 +464,21 @@ func prepareInstanceDomainQuery() (sq.SelectBuilder, func(*sql.Rows) (*Instance,
}
type authzInstance struct {
ID string `json:"id,omitempty"`
IAMProjectID string `json:"iam_project_id,omitempty"`
ConsoleID string `json:"console_id,omitempty"`
ConsoleAppID string `json:"console_app_id,omitempty"`
DefaultLang language.Tag `json:"default_lang,omitempty"`
DefaultOrgID string `json:"default_org_id,omitempty"`
CSP csp `json:"csp,omitempty"`
Impersonation bool `json:"impersonation,omitempty"`
IsBlocked *bool `json:"is_blocked,omitempty"`
LogRetention *time.Duration `json:"log_retention,omitempty"`
Feature feature.Features `json:"feature,omitempty"`
ExternalDomains database.TextArray[string] `json:"external_domains,omitempty"`
TrustedDomains database.TextArray[string] `json:"trusted_domains,omitempty"`
ID string `json:"id,omitempty"`
IAMProjectID string `json:"iam_project_id,omitempty"`
ConsoleID string `json:"console_id,omitempty"`
ConsoleAppID string `json:"console_app_id,omitempty"`
DefaultLang language.Tag `json:"default_lang,omitempty"`
DefaultOrgID string `json:"default_org_id,omitempty"`
CSP csp `json:"csp,omitempty"`
Impersonation bool `json:"impersonation,omitempty"`
IsBlocked *bool `json:"is_blocked,omitempty"`
LogRetention *time.Duration `json:"log_retention,omitempty"`
Feature feature.Features `json:"feature,omitempty"`
ExternalDomains database.TextArray[string] `json:"external_domains,omitempty"`
TrustedDomains database.TextArray[string] `json:"trusted_domains,omitempty"`
ExecutionTargets target_domain.Router `json:"execution_targets,omitzero"`
ZitadelVersion string `json:"zitadel_version,omitempty"`
}
type csp struct {
@@ -527,6 +533,10 @@ func (i *authzInstance) Features() feature.Features {
return i.Feature
}
func (i *authzInstance) ExecutionRouter() target_domain.Router {
return i.ExecutionTargets
}
var errPublicDomain = "public domain %q not trusted"
func (i *authzInstance) checkDomain(instanceDomain, publicDomain string) error {
@@ -562,6 +572,7 @@ func scanAuthzInstance() (*authzInstance, func(row *sql.Row) error) {
auditLogRetention database.NullDuration
block sql.NullBool
features []byte
executionTargetsBytes []byte
)
err := row.Scan(
&instance.ID,
@@ -578,6 +589,7 @@ func scanAuthzInstance() (*authzInstance, func(row *sql.Row) error) {
&features,
&instance.ExternalDomains,
&instance.TrustedDomains,
&executionTargetsBytes,
)
if errors.Is(err, sql.ErrNoRows) {
return zerrors.ThrowNotFound(nil, "QUERY-1kIjX", "Errors.IAM.NotFound")
@@ -600,6 +612,13 @@ func scanAuthzInstance() (*authzInstance, func(row *sql.Row) error) {
if err = json.Unmarshal(features, &instance.Feature); err != nil {
return zerrors.ThrowInternal(err, "QUERY-Po8ki", "Errors.Internal")
}
if len(executionTargetsBytes) > 0 {
var targets []target_domain.Target
if err := json.Unmarshal(executionTargetsBytes, &targets); err != nil {
return zerrors.ThrowInternal(err, "QUERY-aeKa2", "Errors.Internal")
}
instance.ExecutionTargets = target_domain.NewRouter(targets)
}
return nil
}
}
@@ -616,6 +635,8 @@ func (c *Caches) registerInstanceInvalidation() {
invalidate = cacheInvalidationFunc(c.instance, instanceIndexByID, getResourceOwner)
projection.LimitsProjection.RegisterCacheInvalidation(invalidate)
projection.RestrictionsProjection.RegisterCacheInvalidation(invalidate)
projection.ExecutionProjection.RegisterCacheInvalidation(invalidate)
projection.TargetProjection.RegisterCacheInvalidation(invalidate)
// System feature update should invalidate all instances, so Truncate the cache.
projection.SystemFeatureProjection.RegisterCacheInvalidation(func(ctx context.Context, _ []*eventstore.Aggregate) {