mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-24 03:47:09 +00:00
perf(actionsv2): execution target router (#10564)
# Which Problems Are Solved
The event execution system currently uses a projection handler that
subscribes to and processes all events for all instances. This creates a
high static cost because the system over-fetches event data, handling
many events that are not needed by most instances. This inefficiency is
also reflected in high "rows returned" metrics in the database.
# How the Problems Are Solved
Eliminate the use of a project handler. Instead, events for which
"execution targets" are defined, are directly pushed to the queue by the
eventstore. A Router is populated in the Instance object in the authz
middleware.
- By joining the execution targets to the instance, no additional
queries are needed anymore.
- As part of the instance object, execution targets are now cached as
well.
- Events are queued within the same transaction, giving transactional
guarantees on delivery.
- Uses the "insert many fast` variant of River. Multiple jobs are queued
in a single round-trip to the database.
- Fix compatibility with PostgreSQL 15
# Additional Changes
- The signing key was stored as plain-text in the river job payload in
the DB. This violated our [Secrets
Storage](https://zitadel.com/docs/concepts/architecture/secrets#secrets-storage)
principle. This change removed the field and only uses the encrypted
version of the signing key.
- Fixed the target ordering from descending to ascending.
- Some minor linter warnings on the use of `io.WriteString()`.
# Additional Context
- Introduced in https://github.com/zitadel/zitadel/pull/9249
- Closes https://github.com/zitadel/zitadel/issues/10553
- Closes https://github.com/zitadel/zitadel/issues/9832
- Closes https://github.com/zitadel/zitadel/issues/10372
- Closes https://github.com/zitadel/zitadel/issues/10492
---------
Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com>
(cherry picked from commit a9ebc06c77)
This commit is contained in:
@@ -15,48 +15,10 @@ import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/structpb"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/domain"
|
||||
"github.com/zitadel/zitadel/internal/execution"
|
||||
target_domain "github.com/zitadel/zitadel/internal/execution/target"
|
||||
)
|
||||
|
||||
var _ execution.Target = &mockExecutionTarget{}
|
||||
|
||||
type mockExecutionTarget struct {
|
||||
InstanceID string
|
||||
ExecutionID string
|
||||
TargetID string
|
||||
TargetType domain.TargetType
|
||||
Endpoint string
|
||||
Timeout time.Duration
|
||||
InterruptOnError bool
|
||||
SigningKey string
|
||||
}
|
||||
|
||||
func (e *mockExecutionTarget) SetEndpoint(endpoint string) {
|
||||
e.Endpoint = endpoint
|
||||
}
|
||||
func (e *mockExecutionTarget) IsInterruptOnError() bool {
|
||||
return e.InterruptOnError
|
||||
}
|
||||
func (e *mockExecutionTarget) GetEndpoint() string {
|
||||
return e.Endpoint
|
||||
}
|
||||
func (e *mockExecutionTarget) GetTargetType() domain.TargetType {
|
||||
return e.TargetType
|
||||
}
|
||||
func (e *mockExecutionTarget) GetTimeout() time.Duration {
|
||||
return e.Timeout
|
||||
}
|
||||
func (e *mockExecutionTarget) GetTargetID() string {
|
||||
return e.TargetID
|
||||
}
|
||||
func (e *mockExecutionTarget) GetExecutionID() string {
|
||||
return e.ExecutionID
|
||||
}
|
||||
func (e *mockExecutionTarget) GetSigningKey() string {
|
||||
return e.SigningKey
|
||||
}
|
||||
|
||||
func newMockContentRequest(content string) proto.Message {
|
||||
return &structpb.Struct{
|
||||
Fields: map[string]*structpb.Value{
|
||||
@@ -92,7 +54,7 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
|
||||
executionTargets []execution.Target
|
||||
executionTargets []target_domain.Target
|
||||
targets []target
|
||||
fullMethod string
|
||||
req interface{}
|
||||
@@ -123,7 +85,7 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{},
|
||||
executionTargets: []target_domain.Target{},
|
||||
req: newMockContentRequest("request"),
|
||||
},
|
||||
res{
|
||||
@@ -135,12 +97,11 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
},
|
||||
@@ -157,14 +118,12 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -186,15 +145,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
|
||||
@@ -217,15 +174,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Second,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -247,15 +202,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Second,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -272,15 +225,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -302,14 +253,12 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeAsync,
|
||||
TargetType: target_domain.TargetTypeAsync,
|
||||
Timeout: time.Second,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -331,14 +280,12 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeAsync,
|
||||
TargetType: target_domain.TargetTypeAsync,
|
||||
Timeout: time.Minute,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -360,15 +307,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeWebhook,
|
||||
TargetType: target_domain.TargetTypeWebhook,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -389,15 +334,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeWebhook,
|
||||
TargetType: target_domain.TargetTypeWebhook,
|
||||
Timeout: time.Second,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -419,15 +362,13 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeWebhook,
|
||||
TargetType: target_domain.TargetTypeWebhook,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -449,33 +390,27 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target1",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target2",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target3",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
|
||||
@@ -510,33 +445,27 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target1",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target2",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Second,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target3",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Second,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -577,8 +506,7 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
target.respBody,
|
||||
)
|
||||
|
||||
et := tt.args.executionTargets[i].(*mockExecutionTarget)
|
||||
et.SetEndpoint(url)
|
||||
tt.args.executionTargets[i].Endpoint = url
|
||||
closeFuncs[i] = closeF
|
||||
}
|
||||
|
||||
@@ -587,6 +515,7 @@ func Test_executeTargetsForGRPCFullMethod_request(t *testing.T) {
|
||||
tt.args.executionTargets,
|
||||
tt.args.fullMethod,
|
||||
tt.args.req,
|
||||
nil,
|
||||
)
|
||||
|
||||
if tt.res.wantErr {
|
||||
@@ -640,7 +569,7 @@ func testServerCall(
|
||||
http.Error(w, "error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if _, err := io.WriteString(w, string(resp)); err != nil {
|
||||
if _, err := w.Write(resp); err != nil {
|
||||
http.Error(w, "error", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
@@ -661,7 +590,7 @@ func Test_executeTargetsForGRPCFullMethod_response(t *testing.T) {
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
|
||||
executionTargets []execution.Target
|
||||
executionTargets []target_domain.Target
|
||||
targets []target
|
||||
fullMethod string
|
||||
req interface{}
|
||||
@@ -694,7 +623,7 @@ func Test_executeTargetsForGRPCFullMethod_response(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{},
|
||||
executionTargets: []target_domain.Target{},
|
||||
req: newMockContentRequest("request"),
|
||||
resp: newMockContentRequest("response"),
|
||||
},
|
||||
@@ -707,15 +636,13 @@ func Test_executeTargetsForGRPCFullMethod_response(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "request./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -738,15 +665,13 @@ func Test_executeTargetsForGRPCFullMethod_response(t *testing.T) {
|
||||
args{
|
||||
ctx: context.Background(),
|
||||
fullMethod: "/service/method",
|
||||
executionTargets: []execution.Target{
|
||||
&mockExecutionTarget{
|
||||
InstanceID: "instance",
|
||||
executionTargets: []target_domain.Target{
|
||||
{
|
||||
ExecutionID: "response./zitadel.session.v2.SessionService/SetSession",
|
||||
TargetID: "target",
|
||||
TargetType: domain.TargetTypeCall,
|
||||
TargetType: target_domain.TargetTypeCall,
|
||||
Timeout: time.Minute,
|
||||
InterruptOnError: true,
|
||||
SigningKey: "signingkey",
|
||||
},
|
||||
},
|
||||
targets: []target{
|
||||
@@ -776,8 +701,7 @@ func Test_executeTargetsForGRPCFullMethod_response(t *testing.T) {
|
||||
target.respBody,
|
||||
)
|
||||
|
||||
et := tt.args.executionTargets[i].(*mockExecutionTarget)
|
||||
et.SetEndpoint(url)
|
||||
tt.args.executionTargets[i].Endpoint = url
|
||||
closeFuncs[i] = closeF
|
||||
}
|
||||
|
||||
@@ -787,6 +711,7 @@ func Test_executeTargetsForGRPCFullMethod_response(t *testing.T) {
|
||||
tt.args.fullMethod,
|
||||
tt.args.req,
|
||||
tt.args.resp,
|
||||
nil,
|
||||
)
|
||||
|
||||
if tt.res.wantErr {
|
||||
|
||||
Reference in New Issue
Block a user