Files
zitadel/internal/query/execution.go
Tim Möhlmann 2727fa719d perf(actionsv2): execution target router (#10564)
# Which Problems Are Solved

The event execution system currently uses a projection handler that
subscribes to and processes all events for all instances. This creates a
high static cost because the system over-fetches event data, handling
many events that are not needed by most instances. This inefficiency is
also reflected in high "rows returned" metrics in the database.

# How the Problems Are Solved

Eliminate the use of a project handler. Instead, events for which
"execution targets" are defined, are directly pushed to the queue by the
eventstore. A Router is populated in the Instance object in the authz
middleware.

- By joining the execution targets to the instance, no additional
queries are needed anymore.
- As part of the instance object, execution targets are now cached as
well.
- Events are queued within the same transaction, giving transactional
guarantees on delivery.
- Uses the "insert many fast` variant of River. Multiple jobs are queued
in a single round-trip to the database.
- Fix compatibility with PostgreSQL 15

# Additional Changes

- The signing key was stored as plain-text in the river job payload in
the DB. This violated our [Secrets
Storage](https://zitadel.com/docs/concepts/architecture/secrets#secrets-storage)
principle. This change removed the field and only uses the encrypted
version of the signing key.
- Fixed the target ordering from descending to ascending.
- Some minor linter warnings on the use of `io.WriteString()`.

# Additional Context

- Introduced in https://github.com/zitadel/zitadel/pull/9249
- Closes https://github.com/zitadel/zitadel/issues/10553
- Closes https://github.com/zitadel/zitadel/issues/9832
- Closes https://github.com/zitadel/zitadel/issues/10372
- Closes https://github.com/zitadel/zitadel/issues/10492

---------

Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
(cherry picked from commit a9ebc06c77)
2025-09-01 08:16:52 +02:00

288 lines
8.3 KiB
Go

package query
import (
"cmp"
"context"
"database/sql"
_ "embed"
"encoding/json"
"errors"
"slices"
sq "github.com/Masterminds/squirrel"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/query/projection"
exec "github.com/zitadel/zitadel/internal/repository/execution"
"github.com/zitadel/zitadel/internal/zerrors"
)
var (
executionTable = table{
name: projection.ExecutionTable,
instanceIDCol: projection.ExecutionInstanceIDCol,
}
ExecutionColumnID = Column{
name: projection.ExecutionIDCol,
table: executionTable,
}
ExecutionColumnCreationDate = Column{
name: projection.ExecutionCreationDateCol,
table: executionTable,
}
ExecutionColumnChangeDate = Column{
name: projection.ExecutionChangeDateCol,
table: executionTable,
}
ExecutionColumnInstanceID = Column{
name: projection.ExecutionInstanceIDCol,
table: executionTable,
}
executionTargetsTable = table{
name: projection.ExecutionTable + "_" + projection.ExecutionTargetSuffix,
instanceIDCol: projection.ExecutionTargetInstanceIDCol,
}
executionTargetsTableAlias = executionTargetsTable.setAlias("execution_targets")
ExecutionTargetsColumnInstanceID = Column{
name: projection.ExecutionTargetInstanceIDCol,
table: executionTargetsTableAlias,
}
ExecutionTargetsColumnExecutionID = Column{
name: projection.ExecutionTargetExecutionIDCol,
table: executionTargetsTableAlias,
}
executionTargetsListCol = Column{
name: "targets",
table: executionTargetsTableAlias,
}
)
var (
//go:embed execution_targets.sql
executionTargetsQuery string
)
type Executions struct {
SearchResponse
Executions []*Execution
}
func (e *Executions) SetState(s *State) {
e.State = s
}
type Execution struct {
domain.ObjectDetails
Targets []*exec.Target
}
type ExecutionSearchQueries struct {
SearchRequest
Queries []SearchQuery
}
func (q *ExecutionSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
query = q.SearchRequest.toQuery(query)
for _, q := range q.Queries {
query = q.toQuery(query)
}
return query
}
func (q *Queries) SearchExecutions(ctx context.Context, queries *ExecutionSearchQueries) (executions *Executions, err error) {
eq := sq.Eq{
ExecutionColumnInstanceID.identifier(): authz.GetInstance(ctx).InstanceID(),
}
query, scan := prepareExecutionsQuery()
return genericRowsQueryWithState(ctx, q.client, executionTable, combineToWhereStmt(query, queries.toQuery, eq), scan)
}
func (q *Queries) GetExecutionByID(ctx context.Context, id string) (execution *Execution, err error) {
eq := sq.Eq{
ExecutionColumnID.identifier(): id,
ExecutionColumnInstanceID.identifier(): authz.GetInstance(ctx).InstanceID(),
}
query, scan := prepareExecutionQuery()
return genericRowQuery(ctx, q.client, query.Where(eq), scan)
}
func NewExecutionInIDsSearchQuery(values []string) (SearchQuery, error) {
return NewInTextQuery(ExecutionColumnID, values)
}
func NewExecutionTypeSearchQuery(t domain.ExecutionType) (SearchQuery, error) {
return NewTextQuery(ExecutionColumnID, t.String(), TextStartsWith)
}
func NewTargetSearchQuery(target string) (SearchQuery, error) {
data, err := targetItemJSONB(domain.ExecutionTargetTypeTarget, target)
if err != nil {
return nil, err
}
return NewListContains(executionTargetsListCol, data)
}
func NewIncludeSearchQuery(include string) (SearchQuery, error) {
data, err := targetItemJSONB(domain.ExecutionTargetTypeInclude, include)
if err != nil {
return nil, err
}
return NewListContains(executionTargetsListCol, data)
}
// marshall executionTargets into the same JSONB structure as in the SQL queries
func targetItemJSONB(t domain.ExecutionTargetType, targetItem string) ([]byte, error) {
var target *executionTarget
switch t {
case domain.ExecutionTargetTypeTarget:
target = &executionTarget{Target: targetItem}
case domain.ExecutionTargetTypeInclude:
target = &executionTarget{Include: targetItem}
case domain.ExecutionTargetTypeUnspecified:
return nil, nil
default:
return nil, nil
}
return json.Marshal([]*executionTarget{target})
}
func prepareExecutionQuery() (sq.SelectBuilder, func(row *sql.Row) (*Execution, error)) {
return sq.Select(
ExecutionColumnInstanceID.identifier(),
ExecutionColumnID.identifier(),
ExecutionColumnCreationDate.identifier(),
ExecutionColumnChangeDate.identifier(),
executionTargetsListCol.identifier(),
).From(executionTable.identifier()).
Join("(" + executionTargetsQuery + ") AS " + executionTargetsTableAlias.alias + " ON " +
ExecutionTargetsColumnInstanceID.identifier() + " = " + ExecutionColumnInstanceID.identifier() + " AND " +
ExecutionTargetsColumnExecutionID.identifier() + " = " + ExecutionColumnID.identifier(),
).
PlaceholderFormat(sq.Dollar),
scanExecution
}
func prepareExecutionsQuery() (sq.SelectBuilder, func(rows *sql.Rows) (*Executions, error)) {
return sq.Select(
ExecutionColumnInstanceID.identifier(),
ExecutionColumnID.identifier(),
ExecutionColumnCreationDate.identifier(),
ExecutionColumnChangeDate.identifier(),
executionTargetsListCol.identifier(),
countColumn.identifier(),
).From(executionTable.identifier()).
Join("(" + executionTargetsQuery + ") AS " + executionTargetsTableAlias.alias + " ON " +
ExecutionTargetsColumnInstanceID.identifier() + " = " + ExecutionColumnInstanceID.identifier() + " AND " +
ExecutionTargetsColumnExecutionID.identifier() + " = " + ExecutionColumnID.identifier(),
).
PlaceholderFormat(sq.Dollar),
scanExecutions
}
type executionTarget struct {
Position int `json:"position,omitempty"`
Include string `json:"include,omitempty"`
Target string `json:"target,omitempty"`
}
func scanExecution(row *sql.Row) (*Execution, error) {
execution := new(Execution)
targets := make([]byte, 0)
err := row.Scan(
&execution.ResourceOwner,
&execution.ID,
&execution.CreationDate,
&execution.EventDate,
&targets,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zerrors.ThrowNotFound(err, "QUERY-qzn1xycesh", "Errors.Execution.NotFound")
}
return nil, zerrors.ThrowInternal(err, "QUERY-f8sjvm4tb8", "Errors.Internal")
}
executionTargets := make([]*executionTarget, 0)
if err := json.Unmarshal(targets, &executionTargets); err != nil {
return nil, err
}
execution.Targets = make([]*exec.Target, len(executionTargets))
for i := range executionTargets {
if executionTargets[i].Target != "" {
execution.Targets[i] = &exec.Target{Type: domain.ExecutionTargetTypeTarget, Target: executionTargets[i].Target}
}
if executionTargets[i].Include != "" {
execution.Targets[i] = &exec.Target{Type: domain.ExecutionTargetTypeInclude, Target: executionTargets[i].Include}
}
}
return execution, nil
}
func executionTargetsUnmarshal(data []byte) ([]*exec.Target, error) {
executionTargets := make([]*executionTarget, 0)
if err := json.Unmarshal(data, &executionTargets); err != nil {
return nil, err
}
targets := make([]*exec.Target, len(executionTargets))
slices.SortFunc(executionTargets, func(a, b *executionTarget) int {
return cmp.Compare(a.Position, b.Position)
})
for i, item := range executionTargets {
if item.Target != "" {
targets[i] = &exec.Target{Type: domain.ExecutionTargetTypeTarget, Target: item.Target}
}
if item.Include != "" {
targets[i] = &exec.Target{Type: domain.ExecutionTargetTypeInclude, Target: item.Include}
}
}
return targets, nil
}
func scanExecutions(rows *sql.Rows) (*Executions, error) {
executions := make([]*Execution, 0)
var count uint64
for rows.Next() {
execution := new(Execution)
targets := make([]byte, 0)
err := rows.Scan(
&execution.ResourceOwner,
&execution.ID,
&execution.CreationDate,
&execution.EventDate,
&targets,
&count,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zerrors.ThrowNotFound(err, "QUERY-tbrmno85vp", "Errors.Execution.NotFound")
}
return nil, zerrors.ThrowInternal(err, "QUERY-tyw2ydsj84", "Errors.Internal")
}
execution.Targets, err = executionTargetsUnmarshal(targets)
if err != nil {
return nil, err
}
executions = append(executions, execution)
}
if err := rows.Close(); err != nil {
return nil, zerrors.ThrowInternal(err, "QUERY-yhka3fs3mw", "Errors.Query.CloseRows")
}
return &Executions{
Executions: executions,
SearchResponse: SearchResponse{
Count: count,
},
}, nil
}