mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-06 16:22:13 +00:00
# Which Problems Are Solved
The event execution system currently uses a projection handler that
subscribes to and processes all events for all instances. This creates a
high static cost because the system over-fetches event data, handling
many events that are not needed by most instances. This inefficiency is
also reflected in high "rows returned" metrics in the database.
# How the Problems Are Solved
Eliminate the use of a project handler. Instead, events for which
"execution targets" are defined, are directly pushed to the queue by the
eventstore. A Router is populated in the Instance object in the authz
middleware.
- By joining the execution targets to the instance, no additional
queries are needed anymore.
- As part of the instance object, execution targets are now cached as
well.
- Events are queued within the same transaction, giving transactional
guarantees on delivery.
- Uses the "insert many fast` variant of River. Multiple jobs are queued
in a single round-trip to the database.
- Fix compatibility with PostgreSQL 15
# Additional Changes
- The signing key was stored as plain-text in the river job payload in
the DB. This violated our [Secrets
Storage](https://zitadel.com/docs/concepts/architecture/secrets#secrets-storage)
principle. This change removed the field and only uses the encrypted
version of the signing key.
- Fixed the target ordering from descending to ascending.
- Some minor linter warnings on the use of `io.WriteString()`.
# Additional Context
- Introduced in https://github.com/zitadel/zitadel/pull/9249
- Closes https://github.com/zitadel/zitadel/issues/10553
- Closes https://github.com/zitadel/zitadel/issues/9832
- Closes https://github.com/zitadel/zitadel/issues/10372
- Closes https://github.com/zitadel/zitadel/issues/10492
---------
Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
(cherry picked from commit a9ebc06c77)
127 lines
4.1 KiB
Go
127 lines
4.1 KiB
Go
package action
|
|
|
|
import (
|
|
"context"
|
|
|
|
"connectrpc.com/connect"
|
|
"github.com/muhlemmer/gu"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
|
|
"github.com/zitadel/zitadel/internal/api/authz"
|
|
"github.com/zitadel/zitadel/internal/command"
|
|
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
|
|
target_domain "github.com/zitadel/zitadel/internal/execution/target"
|
|
action "github.com/zitadel/zitadel/pkg/grpc/action/v2beta"
|
|
)
|
|
|
|
func (s *Server) CreateTarget(ctx context.Context, req *connect.Request[action.CreateTargetRequest]) (*connect.Response[action.CreateTargetResponse], error) {
|
|
add := createTargetToCommand(req.Msg)
|
|
instanceID := authz.GetInstance(ctx).InstanceID()
|
|
createdAt, err := s.command.AddTarget(ctx, add, instanceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var creationDate *timestamppb.Timestamp
|
|
if !createdAt.IsZero() {
|
|
creationDate = timestamppb.New(createdAt)
|
|
}
|
|
return connect.NewResponse(&action.CreateTargetResponse{
|
|
Id: add.AggregateID,
|
|
CreationDate: creationDate,
|
|
SigningKey: add.SigningKey,
|
|
}), nil
|
|
}
|
|
|
|
func (s *Server) UpdateTarget(ctx context.Context, req *connect.Request[action.UpdateTargetRequest]) (*connect.Response[action.UpdateTargetResponse], error) {
|
|
instanceID := authz.GetInstance(ctx).InstanceID()
|
|
update := updateTargetToCommand(req.Msg)
|
|
changedAt, err := s.command.ChangeTarget(ctx, update, instanceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var changeDate *timestamppb.Timestamp
|
|
if !changedAt.IsZero() {
|
|
changeDate = timestamppb.New(changedAt)
|
|
}
|
|
return connect.NewResponse(&action.UpdateTargetResponse{
|
|
ChangeDate: changeDate,
|
|
SigningKey: update.SigningKey,
|
|
}), nil
|
|
}
|
|
|
|
func (s *Server) DeleteTarget(ctx context.Context, req *connect.Request[action.DeleteTargetRequest]) (*connect.Response[action.DeleteTargetResponse], error) {
|
|
instanceID := authz.GetInstance(ctx).InstanceID()
|
|
deletedAt, err := s.command.DeleteTarget(ctx, req.Msg.GetId(), instanceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var deletionDate *timestamppb.Timestamp
|
|
if !deletedAt.IsZero() {
|
|
deletionDate = timestamppb.New(deletedAt)
|
|
}
|
|
return connect.NewResponse(&action.DeleteTargetResponse{
|
|
DeletionDate: deletionDate,
|
|
}), nil
|
|
}
|
|
|
|
func createTargetToCommand(req *action.CreateTargetRequest) *command.AddTarget {
|
|
var (
|
|
targetType target_domain.TargetType
|
|
interruptOnError bool
|
|
)
|
|
switch t := req.GetTargetType().(type) {
|
|
case *action.CreateTargetRequest_RestWebhook:
|
|
targetType = target_domain.TargetTypeWebhook
|
|
interruptOnError = t.RestWebhook.InterruptOnError
|
|
case *action.CreateTargetRequest_RestCall:
|
|
targetType = target_domain.TargetTypeCall
|
|
interruptOnError = t.RestCall.InterruptOnError
|
|
case *action.CreateTargetRequest_RestAsync:
|
|
targetType = target_domain.TargetTypeAsync
|
|
}
|
|
return &command.AddTarget{
|
|
Name: req.GetName(),
|
|
TargetType: targetType,
|
|
Endpoint: req.GetEndpoint(),
|
|
Timeout: req.GetTimeout().AsDuration(),
|
|
InterruptOnError: interruptOnError,
|
|
}
|
|
}
|
|
|
|
func updateTargetToCommand(req *action.UpdateTargetRequest) *command.ChangeTarget {
|
|
expirationSigningKey := false
|
|
// TODO handle expiration, currently only immediate expiration is supported
|
|
if req.GetExpirationSigningKey() != nil {
|
|
expirationSigningKey = true
|
|
}
|
|
|
|
if req == nil {
|
|
return nil
|
|
}
|
|
target := &command.ChangeTarget{
|
|
ObjectRoot: models.ObjectRoot{
|
|
AggregateID: req.GetId(),
|
|
},
|
|
Name: req.Name,
|
|
Endpoint: req.Endpoint,
|
|
ExpirationSigningKey: expirationSigningKey,
|
|
}
|
|
if req.TargetType != nil {
|
|
switch t := req.GetTargetType().(type) {
|
|
case *action.UpdateTargetRequest_RestWebhook:
|
|
target.TargetType = gu.Ptr(target_domain.TargetTypeWebhook)
|
|
target.InterruptOnError = gu.Ptr(t.RestWebhook.InterruptOnError)
|
|
case *action.UpdateTargetRequest_RestCall:
|
|
target.TargetType = gu.Ptr(target_domain.TargetTypeCall)
|
|
target.InterruptOnError = gu.Ptr(t.RestCall.InterruptOnError)
|
|
case *action.UpdateTargetRequest_RestAsync:
|
|
target.TargetType = gu.Ptr(target_domain.TargetTypeAsync)
|
|
target.InterruptOnError = gu.Ptr(false)
|
|
}
|
|
}
|
|
if req.Timeout != nil {
|
|
target.Timeout = gu.Ptr(req.GetTimeout().AsDuration())
|
|
}
|
|
return target
|
|
}
|