From 7613dfa6ec3cd8e7fa90b5f48094706a192d8727 Mon Sep 17 00:00:00 2001 From: Stefan Benz <46600784+stebenz@users.noreply.github.com> Date: Wed, 26 Mar 2025 10:35:16 +0100 Subject: [PATCH] feat: execute actions v2 on events (#9249) --- cmd/defaults.yaml | 20 +- cmd/mirror/projections.go | 2 +- cmd/setup/config.go | 2 + cmd/setup/setup.go | 1 - cmd/start/config.go | 2 + cmd/start/start.go | 27 +- .../integration_test/execution_target_test.go | 296 ++++++++--- .../v2beta/integration_test/execution_test.go | 89 ++-- .../v2beta/integration_test/query_test.go | 20 +- internal/command/action_v2_execution_test.go | 1 + internal/command/command.go | 15 +- internal/execution/ctx.go | 19 + internal/execution/gen_mock.go | 4 + internal/execution/handlers.go | 156 ++++++ internal/execution/handlers_test.go | 487 ++++++++++++++++++ internal/execution/mock/queries.mock.go | 72 +++ internal/execution/mock/queue.mock.go | 61 +++ internal/execution/projections.go | 36 ++ internal/execution/target_test.go | 85 +++ internal/execution/worker.go | 90 ++++ internal/execution/worker_test.go | 288 +++++++++++ internal/integration/action.go | 102 ++++ internal/integration/client.go | 2 +- .../handlers/notification_worker.go | 17 +- internal/notification/projections.go | 7 +- internal/repository/execution/queue.go | 71 +++ 26 files changed, 1811 insertions(+), 161 deletions(-) create mode 100644 internal/execution/ctx.go create mode 100644 internal/execution/gen_mock.go create mode 100644 internal/execution/handlers.go create mode 100644 internal/execution/handlers_test.go create mode 100644 internal/execution/mock/queries.mock.go create mode 100644 internal/execution/mock/queue.mock.go create mode 100644 internal/execution/projections.go create mode 100644 internal/execution/target_test.go create mode 100644 internal/execution/worker.go create mode 100644 internal/execution/worker_test.go create mode 100644 internal/integration/action.go create mode 100644 internal/repository/execution/queue.go diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index b4f490eefb..f85033069a 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -421,13 +421,25 @@ Notifications: # The amount of workers processing the notification request events. # If set to 0, no notification request events will be handled. This can be useful when running in # multi binary / pod setup and allowing only certain executables to process the events. - Workers: 1 # ZITADEL_NOTIFIACATIONS_WORKERS + Workers: 1 # ZITADEL_NOTIFICATIONS_WORKERS # The maximum duration a job can do it's work before it is considered as failed. - TransactionDuration: 10s # ZITADEL_NOTIFIACATIONS_TRANSACTIONDURATION + TransactionDuration: 10s # ZITADEL_NOTIFICATIONS_TRANSACTIONDURATION # Automatically cancel the notification after the amount of failed attempts - MaxAttempts: 3 # ZITADEL_NOTIFIACATIONS_MAXATTEMPTS + MaxAttempts: 3 # ZITADEL_NOTIFICATIONS_MAXATTEMPTS # Automatically cancel the notification if it cannot be handled within a specific time - MaxTtl: 5m # ZITADEL_NOTIFIACATIONS_MAXTTL + MaxTtl: 5m # ZITADEL_NOTIFICATIONS_MAXTTL + +Executions: + # The amount of workers processing the execution request events. + # If set to 0, no execution request events will be handled. This can be useful when running in + # multi binary / pod setup and allowing only certain executables to process the events. + Workers: 1 # ZITADEL_EXECUTIONS_WORKERS + # The maximum duration a job can do it's work before it is considered as failed. + # This maximum duration is prioritized in case that the sum of the target's timeouts is higher, + # to limit the runtime of a singular execution. + TransactionDuration: 10s # ZITADEL_EXECUTIONS_TRANSACTIONDURATION + # Automatically cancel the notification if it cannot be handled within a specific time + MaxTtl: 5m # ZITADEL_EXECUTIONS_MAXTTL Auth: # See Projections.BulkLimit diff --git a/cmd/mirror/projections.go b/cmd/mirror/projections.go index d149da7413..14b93b52c8 100644 --- a/cmd/mirror/projections.go +++ b/cmd/mirror/projections.go @@ -223,7 +223,6 @@ func projections( keys.SMS, keys.OIDC, config.OIDC.DefaultBackChannelLogoutLifetime, - client, nil, ) @@ -305,6 +304,7 @@ func execProjections(ctx context.Context, instances <-chan string, failedInstanc failedInstances <- instance continue } + logging.WithFields("instance", instance).Info("projections done") } wg.Done() diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 7abbeb9821..1c5e03cca3 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -22,6 +22,7 @@ import ( "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/execution" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/notification/handlers" "github.com/zitadel/zitadel/internal/query/projection" @@ -43,6 +44,7 @@ type Config struct { Machine *id.Config Projections projection.Config Notifications handlers.WorkerConfig + Executions execution.WorkerConfig Eventstore *eventstore.Config InitProjections InitProjections diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 9d02e84df9..6d5a9357cf 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -495,7 +495,6 @@ func startCommandsQueries( keys.SMS, keys.OIDC, config.OIDC.DefaultBackChannelLogoutLifetime, - dbClient, q, ) diff --git a/cmd/start/config.go b/cmd/start/config.go index cab39b6c85..589086b801 100644 --- a/cmd/start/config.go +++ b/cmd/start/config.go @@ -27,6 +27,7 @@ import ( "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/execution" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/logstore" "github.com/zitadel/zitadel/internal/notification/handlers" @@ -56,6 +57,7 @@ type Config struct { Profiler profiler.Config Projections projection.Config Notifications handlers.WorkerConfig + Executions execution.WorkerConfig Auth auth_es.Config Admin admin_es.Config UserAgentCookie *middleware.UserAgentCookieConfig diff --git a/cmd/start/start.go b/cmd/start/start.go index cb53afc896..84dda13c54 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -81,13 +81,14 @@ import ( "github.com/zitadel/zitadel/internal/eventstore" old_es "github.com/zitadel/zitadel/internal/eventstore/repository/sql" new_es "github.com/zitadel/zitadel/internal/eventstore/v3" + "github.com/zitadel/zitadel/internal/execution" "github.com/zitadel/zitadel/internal/i18n" "github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/integration/sink" "github.com/zitadel/zitadel/internal/logstore" "github.com/zitadel/zitadel/internal/logstore/emitters/access" - "github.com/zitadel/zitadel/internal/logstore/emitters/execution" - "github.com/zitadel/zitadel/internal/logstore/emitters/stdout" + emit_execution "github.com/zitadel/zitadel/internal/logstore/emitters/execution" + emit_stdout "github.com/zitadel/zitadel/internal/logstore/emitters/stdout" "github.com/zitadel/zitadel/internal/logstore/record" "github.com/zitadel/zitadel/internal/net" "github.com/zitadel/zitadel/internal/notification" @@ -256,11 +257,12 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server defer closeSink() clock := clockpkg.New() - actionsExecutionStdoutEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, &logstore.EmitterConfig{Enabled: config.LogStore.Execution.Stdout.Enabled}, stdout.NewStdoutEmitter[*record.ExecutionLog]()) + actionsExecutionStdoutEmitter, err := logstore.NewEmitter(ctx, clock, &logstore.EmitterConfig{Enabled: config.LogStore.Execution.Stdout.Enabled}, emit_stdout.NewStdoutEmitter[*record.ExecutionLog]()) if err != nil { return err } - actionsExecutionDBEmitter, err := logstore.NewEmitter[*record.ExecutionLog](ctx, clock, config.Quotas.Execution, execution.NewDatabaseLogStorage(dbClient, commands, queries)) + + actionsExecutionDBEmitter, err := logstore.NewEmitter(ctx, clock, config.Quotas.Execution, emit_execution.NewDatabaseLogStorage(dbClient, commands, queries)) if err != nil { return err } @@ -296,11 +298,20 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server keys.SMS, keys.OIDC, config.OIDC.DefaultBackChannelLogoutLifetime, - dbClient, q, ) notification.Start(ctx) + execution.Register( + ctx, + config.Projections.Customizations["executions"], + config.Executions, + queries, + eventstoreClient.EventTypes(), + q, + ) + execution.Start(ctx) + if err = q.Start(ctx); err != nil { return err } @@ -391,16 +402,16 @@ func startAPIs( return nil, err } - accessStdoutEmitter, err := logstore.NewEmitter[*record.AccessLog](ctx, clock, &logstore.EmitterConfig{Enabled: config.LogStore.Access.Stdout.Enabled}, stdout.NewStdoutEmitter[*record.AccessLog]()) + accessStdoutEmitter, err := logstore.NewEmitter(ctx, clock, &logstore.EmitterConfig{Enabled: config.LogStore.Access.Stdout.Enabled}, emit_stdout.NewStdoutEmitter[*record.AccessLog]()) if err != nil { return nil, err } - accessDBEmitter, err := logstore.NewEmitter[*record.AccessLog](ctx, clock, &config.Quotas.Access.EmitterConfig, access.NewDatabaseLogStorage(dbClient, commands, queries)) + accessDBEmitter, err := logstore.NewEmitter(ctx, clock, &config.Quotas.Access.EmitterConfig, access.NewDatabaseLogStorage(dbClient, commands, queries)) if err != nil { return nil, err } - accessSvc := logstore.New[*record.AccessLog](queries, accessDBEmitter, accessStdoutEmitter) + accessSvc := logstore.New(queries, accessDBEmitter, accessStdoutEmitter) exhaustedCookieHandler := http_util.NewCookieHandler( http_util.WithUnsecure(), http_util.WithNonHttpOnly(), diff --git a/internal/api/grpc/action/v2beta/integration_test/execution_target_test.go b/internal/api/grpc/action/v2beta/integration_test/execution_target_test.go index 25bf571918..6e3ab76fac 100644 --- a/internal/api/grpc/action/v2beta/integration_test/execution_target_test.go +++ b/internal/api/grpc/action/v2beta/integration_test/execution_target_test.go @@ -5,12 +5,8 @@ package action_test import ( "context" "encoding/base64" - "encoding/json" - "io" "net/http" - "net/http/httptest" "net/url" - "reflect" "strings" "testing" "time" @@ -59,7 +55,7 @@ func TestServer_ExecutionTarget(t *testing.T) { tests := []struct { name string ctx context.Context - dep func(context.Context, *action.GetTargetRequest, *action.GetTargetResponse) (func(), error) + dep func(context.Context, *action.GetTargetRequest, *action.GetTargetResponse) (closeF func(), calledF func() bool) clean func(context.Context) req *action.GetTargetRequest want *action.GetTargetResponse @@ -68,7 +64,7 @@ func TestServer_ExecutionTarget(t *testing.T) { { name: "GetTarget, request and response, ok", ctx: isolatedIAMOwnerCTX, - dep: func(ctx context.Context, request *action.GetTargetRequest, response *action.GetTargetResponse) (func(), error) { + dep: func(ctx context.Context, request *action.GetTargetRequest, response *action.GetTargetResponse) (func(), func() bool) { orgID := instance.DefaultOrg.Id projectID := "" @@ -84,7 +80,7 @@ func TestServer_ExecutionTarget(t *testing.T) { wantRequest := &middleware.ContextInfoRequest{FullMethod: fullMethod, InstanceID: instance.ID(), OrgID: orgID, ProjectID: projectID, UserID: userID, Request: request} changedRequest := &action.GetTargetRequest{Id: targetCreated.GetId()} // replace original request with different targetID - urlRequest, closeRequest := testServerCall(wantRequest, 0, http.StatusOK, changedRequest) + urlRequest, closeRequest, calledRequest, _ := integration.TestServerCall(wantRequest, 0, http.StatusOK, changedRequest) targetRequest := waitForTarget(ctx, t, instance, urlRequest, domain.TargetTypeCall, false) @@ -102,7 +98,7 @@ func TestServer_ExecutionTarget(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), Endpoint: targetCreatedURL, SigningKey: targetCreated.GetSigningKey(), }, @@ -118,7 +114,7 @@ func TestServer_ExecutionTarget(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), Endpoint: targetCreatedURL, SigningKey: targetCreated.GetSigningKey(), } @@ -146,14 +142,22 @@ func TestServer_ExecutionTarget(t *testing.T) { Response: expectedResponse, } // after request with different targetID, return changed response - targetResponseURL, closeResponse := testServerCall(wantResponse, 0, http.StatusOK, changedResponse) + targetResponseURL, closeResponse, calledResponse, _ := integration.TestServerCall(wantResponse, 0, http.StatusOK, changedResponse) targetResponse := waitForTarget(ctx, t, instance, targetResponseURL, domain.TargetTypeCall, false) waitForExecutionOnCondition(ctx, t, instance, conditionResponseFullMethod(fullMethod), executionTargetsSingleTarget(targetResponse.GetId())) return func() { - closeRequest() - closeResponse() - }, nil + closeRequest() + closeResponse() + }, func() bool { + if calledRequest() != 1 { + return false + } + if calledResponse() != 1 { + return false + } + return true + } }, clean: func(ctx context.Context) { instance.DeleteExecution(ctx, t, conditionRequestFullMethod(fullMethod)) @@ -171,22 +175,24 @@ func TestServer_ExecutionTarget(t *testing.T) { { name: "GetTarget, request, interrupt", ctx: isolatedIAMOwnerCTX, - dep: func(ctx context.Context, request *action.GetTargetRequest, response *action.GetTargetResponse) (func(), error) { + dep: func(ctx context.Context, request *action.GetTargetRequest, response *action.GetTargetResponse) (func(), func() bool) { orgID := instance.DefaultOrg.Id projectID := "" userID := instance.Users.Get(integration.UserTypeIAMOwner).ID // request received by target wantRequest := &middleware.ContextInfoRequest{FullMethod: fullMethod, InstanceID: instance.ID(), OrgID: orgID, ProjectID: projectID, UserID: userID, Request: request} - urlRequest, closeRequest := testServerCall(wantRequest, 0, http.StatusInternalServerError, &action.GetTargetRequest{Id: "notchanged"}) + urlRequest, closeRequest, calledRequest, _ := integration.TestServerCall(wantRequest, 0, http.StatusInternalServerError, &action.GetTargetRequest{Id: "notchanged"}) targetRequest := waitForTarget(ctx, t, instance, urlRequest, domain.TargetTypeCall, true) waitForExecutionOnCondition(ctx, t, instance, conditionRequestFullMethod(fullMethod), executionTargetsSingleTarget(targetRequest.GetId())) // GetTarget with used target request.Id = targetRequest.GetId() return func() { - closeRequest() - }, nil + closeRequest() + }, func() bool { + return calledRequest() == 1 + } }, clean: func(ctx context.Context) { instance.DeleteExecution(ctx, t, conditionRequestFullMethod(fullMethod)) @@ -197,7 +203,7 @@ func TestServer_ExecutionTarget(t *testing.T) { { name: "GetTarget, response, interrupt", ctx: isolatedIAMOwnerCTX, - dep: func(ctx context.Context, request *action.GetTargetRequest, response *action.GetTargetResponse) (func(), error) { + dep: func(ctx context.Context, request *action.GetTargetRequest, response *action.GetTargetResponse) (func(), func() bool) { orgID := instance.DefaultOrg.Id projectID := "" userID := instance.Users.Get(integration.UserTypeIAMOwner).ID @@ -223,7 +229,7 @@ func TestServer_ExecutionTarget(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), Endpoint: targetCreatedURL, SigningKey: targetCreated.GetSigningKey(), }, @@ -251,13 +257,15 @@ func TestServer_ExecutionTarget(t *testing.T) { Response: expectedResponse, } // after request with different targetID, return changed response - targetResponseURL, closeResponse := testServerCall(wantResponse, 0, http.StatusInternalServerError, changedResponse) + targetResponseURL, closeResponse, calledResponse, _ := integration.TestServerCall(wantResponse, 0, http.StatusInternalServerError, changedResponse) targetResponse := waitForTarget(ctx, t, instance, targetResponseURL, domain.TargetTypeCall, true) waitForExecutionOnCondition(ctx, t, instance, conditionResponseFullMethod(fullMethod), executionTargetsSingleTarget(targetResponse.GetId())) return func() { - closeResponse() - }, nil + closeResponse() + }, func() bool { + return calledResponse() == 1 + } }, clean: func(ctx context.Context) { instance.DeleteExecution(ctx, t, conditionResponseFullMethod(fullMethod)) @@ -268,12 +276,10 @@ func TestServer_ExecutionTarget(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if tt.dep != nil { - close, err := tt.dep(tt.ctx, tt.req, tt.want) - require.NoError(t, err) - defer close() - } - retryDuration, tick := integration.WaitForAndTickWithMaxDuration(isolatedIAMOwnerCTX, time.Minute) + closeF, calledF := tt.dep(tt.ctx, tt.req, tt.want) + defer closeF() + + retryDuration, tick := integration.WaitForAndTickWithMaxDuration(tt.ctx, time.Minute) require.EventuallyWithT(t, func(ttt *assert.CollectT) { got, err := instance.Client.ActionV2beta.GetTarget(tt.ctx, tt.req) if tt.wantErr { @@ -288,6 +294,182 @@ func TestServer_ExecutionTarget(t *testing.T) { if tt.clean != nil { tt.clean(tt.ctx) } + require.True(t, calledF()) + }) + } +} + +func TestServer_ExecutionTarget_Event(t *testing.T) { + instance := integration.NewInstance(CTX) + ensureFeatureEnabled(t, instance) + isolatedIAMOwnerCTX := instance.WithAuthorization(CTX, integration.UserTypeIAMOwner) + + event := "session.added" + urlRequest, closeF, calledF, resetF := integration.TestServerCall(nil, 0, http.StatusOK, nil) + defer closeF() + + targetResponse := waitForTarget(isolatedIAMOwnerCTX, t, instance, urlRequest, domain.TargetTypeWebhook, true) + waitForExecutionOnCondition(isolatedIAMOwnerCTX, t, instance, conditionEvent(event), executionTargetsSingleTarget(targetResponse.GetId())) + + tests := []struct { + name string + ctx context.Context + eventCount int + expectedCalls int + clean func(context.Context) + wantErr bool + }{ + { + name: "event, 1 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 1, + expectedCalls: 1, + }, + { + name: "event, 5 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 5, + expectedCalls: 5, + }, + { + name: "event, 50 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 50, + expectedCalls: 50, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // reset the count of the target + resetF() + + for i := 0; i < tt.eventCount; i++ { + _, err := instance.Client.SessionV2.CreateSession(tt.ctx, &session.CreateSessionRequest{}) + require.NoError(t, err) + } + + // wait for called target + retryDuration, tick := integration.WaitForAndTickWithMaxDuration(tt.ctx, time.Minute) + require.EventuallyWithT(t, func(ttt *assert.CollectT) { + assert.True(ttt, calledF() == tt.expectedCalls) + }, retryDuration, tick, "timeout waiting for expected execution result") + }) + } +} + +func TestServer_ExecutionTarget_Event_LongerThanTargetTimeout(t *testing.T) { + instance := integration.NewInstance(CTX) + ensureFeatureEnabled(t, instance) + isolatedIAMOwnerCTX := instance.WithAuthorization(CTX, integration.UserTypeIAMOwner) + + event := "session.added" + // call takes longer than timeout of target + urlRequest, closeF, calledF, resetF := integration.TestServerCall(nil, 5*time.Second, http.StatusOK, nil) + defer closeF() + + targetResponse := waitForTarget(isolatedIAMOwnerCTX, t, instance, urlRequest, domain.TargetTypeWebhook, true) + waitForExecutionOnCondition(isolatedIAMOwnerCTX, t, instance, conditionEvent(event), executionTargetsSingleTarget(targetResponse.GetId())) + + tests := []struct { + name string + ctx context.Context + eventCount int + expectedCalls int + clean func(context.Context) + wantErr bool + }{ + { + name: "event, 1 session.added, error logs", + ctx: isolatedIAMOwnerCTX, + eventCount: 1, + expectedCalls: 1, + }, + { + name: "event, 5 session.added, error logs", + ctx: isolatedIAMOwnerCTX, + eventCount: 5, + expectedCalls: 5, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // reset the count of the target + resetF() + + for i := 0; i < tt.eventCount; i++ { + _, err := instance.Client.SessionV2.CreateSession(tt.ctx, &session.CreateSessionRequest{}) + require.NoError(t, err) + } + + // wait for called target + retryDuration, tick := integration.WaitForAndTickWithMaxDuration(tt.ctx, time.Minute) + require.EventuallyWithT(t, func(ttt *assert.CollectT) { + assert.True(ttt, calledF() == tt.expectedCalls) + }, retryDuration, tick, "timeout waiting for expected execution result") + }) + } +} + +func TestServer_ExecutionTarget_Event_LongerThanTransactionTimeout(t *testing.T) { + instance := integration.NewInstance(CTX) + ensureFeatureEnabled(t, instance) + isolatedIAMOwnerCTX := instance.WithAuthorization(CTX, integration.UserTypeIAMOwner) + + event := "session.added" + urlRequest, closeF, calledF, resetF := integration.TestServerCall(nil, 1*time.Second, http.StatusOK, nil) + defer closeF() + + targetResponse := waitForTarget(isolatedIAMOwnerCTX, t, instance, urlRequest, domain.TargetTypeWebhook, true) + waitForExecutionOnCondition(isolatedIAMOwnerCTX, t, instance, conditionEvent(event), executionTargetsSingleTarget(targetResponse.GetId())) + + tests := []struct { + name string + ctx context.Context + eventCount int + expectedCalls int + clean func(context.Context) + wantErr bool + }{ + { + name: "event, 1 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 1, + expectedCalls: 1, + }, + { + name: "event, 5 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 5, + expectedCalls: 5, + }, + { + name: "event, 5 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 5, + expectedCalls: 5, + }, + { + name: "event, 20 session.added, ok", + ctx: isolatedIAMOwnerCTX, + eventCount: 20, + expectedCalls: 20, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // reset the count of the target + resetF() + + for i := 0; i < tt.eventCount; i++ { + _, err := instance.Client.SessionV2.CreateSession(tt.ctx, &session.CreateSessionRequest{}) + require.NoError(t, err) + } + + // wait for called target + retryDuration, tick := integration.WaitForAndTickWithMaxDuration(tt.ctx, time.Minute) + require.EventuallyWithT(t, func(ttt *assert.CollectT) { + assert.True(ttt, calledF() == tt.expectedCalls) + }, retryDuration, tick, "timeout waiting for expected execution result") }) } } @@ -383,50 +565,16 @@ func conditionResponseFullMethod(fullMethod string) *action.Condition { } } -func testServerCall( - reqBody interface{}, - sleep time.Duration, - statusCode int, - respBody interface{}, -) (string, func()) { - handler := func(w http.ResponseWriter, r *http.Request) { - data, err := json.Marshal(reqBody) - if err != nil { - http.Error(w, "error, marshall: "+err.Error(), http.StatusInternalServerError) - return - } - - sentBody, err := io.ReadAll(r.Body) - if err != nil { - http.Error(w, "error, read body: "+err.Error(), http.StatusInternalServerError) - return - } - if !reflect.DeepEqual(data, sentBody) { - http.Error(w, "error, equal:\n"+string(data)+"\nsent:\n"+string(sentBody), http.StatusInternalServerError) - return - } - if statusCode != http.StatusOK { - http.Error(w, "error, statusCode", statusCode) - return - } - - time.Sleep(sleep) - - w.Header().Set("Content-Type", "application/json") - resp, err := json.Marshal(respBody) - if err != nil { - http.Error(w, "error", http.StatusInternalServerError) - return - } - if _, err := io.WriteString(w, string(resp)); err != nil { - http.Error(w, "error", http.StatusInternalServerError) - return - } +func conditionEvent(event string) *action.Condition { + return &action.Condition{ + ConditionType: &action.Condition_Event{ + Event: &action.EventExecution{ + Condition: &action.EventExecution_Event{ + Event: event, + }, + }, + }, } - - server := httptest.NewServer(http.HandlerFunc(handler)) - - return server.URL, server.Close } func conditionFunction(function string) *action.Condition { @@ -634,7 +782,7 @@ func expectPreUserinfoExecution(ctx context.Context, t *testing.T, instance *int } expectedContextInfo := contextInfoForUserOIDC(instance, "function/preuserinfo", userResp, userEmail, userPhone) - targetURL, closeF := testServerCall(expectedContextInfo, 0, http.StatusOK, response) + targetURL, closeF, _, _ := integration.TestServerCall(expectedContextInfo, 0, http.StatusOK, response) targetResp := waitForTarget(ctx, t, instance, targetURL, domain.TargetTypeCall, true) waitForExecutionOnCondition(ctx, t, instance, conditionFunction("preuserinfo"), executionTargetsSingleTarget(targetResp.GetId())) @@ -940,7 +1088,7 @@ func expectPreAccessTokenExecution(ctx context.Context, t *testing.T, instance * } expectedContextInfo := contextInfoForUserOIDC(instance, "function/preaccesstoken", userResp, userEmail, userPhone) - targetURL, closeF := testServerCall(expectedContextInfo, 0, http.StatusOK, response) + targetURL, closeF, _, _ := integration.TestServerCall(expectedContextInfo, 0, http.StatusOK, response) targetResp := waitForTarget(ctx, t, instance, targetURL, domain.TargetTypeCall, true) waitForExecutionOnCondition(ctx, t, instance, conditionFunction("preaccesstoken"), executionTargetsSingleTarget(targetResp.GetId())) @@ -1106,7 +1254,7 @@ func expectPreSAMLResponseExecution(ctx context.Context, t *testing.T, instance } expectedContextInfo := contextInfoForUserSAML(instance, "function/presamlresponse", userResp, userEmail, userPhone) - targetURL, closeF := testServerCall(expectedContextInfo, 0, http.StatusOK, response) + targetURL, closeF, _, _ := integration.TestServerCall(expectedContextInfo, 0, http.StatusOK, response) targetResp := waitForTarget(ctx, t, instance, targetURL, domain.TargetTypeCall, true) waitForExecutionOnCondition(ctx, t, instance, conditionFunction("presamlresponse"), executionTargetsSingleTarget(targetResp.GetId())) diff --git a/internal/api/grpc/action/v2beta/integration_test/execution_test.go b/internal/api/grpc/action/v2beta/integration_test/execution_test.go index 0897c90215..3af419d97b 100644 --- a/internal/api/grpc/action/v2beta/integration_test/execution_test.go +++ b/internal/api/grpc/action/v2beta/integration_test/execution_test.go @@ -484,27 +484,23 @@ func TestServer_SetExecution_Event(t *testing.T) { }, wantErr: true, }, - /* - //TODO event existing check - - { - name: "event, not existing", - ctx: isolatedIAMOwnerCTX, - req: &action.SetExecutionRequest{ - Condition: &action.Condition{ - ConditionType: &action.Condition_Event{ - Event: &action.EventExecution{ - Condition: &action.EventExecution_Event{ - Event: "xxx", - }, + { + name: "event, not existing", + ctx: isolatedIAMOwnerCTX, + req: &action.SetExecutionRequest{ + Condition: &action.Condition{ + ConditionType: &action.Condition_Event{ + Event: &action.EventExecution{ + Condition: &action.EventExecution_Event{ + Event: "user.human.notexisting", }, }, }, - Targets: []string{targetResp.GetId()}, }, - wantErr: true, + Targets: executionTargetsSingleTarget(targetResp.GetId()), }, - */ + wantErr: true, + }, { name: "event, ok", ctx: isolatedIAMOwnerCTX, @@ -513,7 +509,7 @@ func TestServer_SetExecution_Event(t *testing.T) { ConditionType: &action.Condition_Event{ Event: &action.EventExecution{ Condition: &action.EventExecution_Event{ - Event: "xxx", + Event: "user.human.added", }, }, }, @@ -522,36 +518,49 @@ func TestServer_SetExecution_Event(t *testing.T) { }, wantSetDate: true, }, - /* - // TODO: - - { - name: "group, not existing", - ctx: isolatedIAMOwnerCTX, - req: &action.SetExecutionRequest{ - Condition: &action.Condition{ - ConditionType: &action.Condition_Event{ - Event: &action.EventExecution{ - Condition: &action.EventExecution_Group{ - Group: "xxx", - }, - }, - }, - }, - Targets: []string{targetResp.GetId()}, - }, - wantErr: true, - }, - */ { - name: "group, ok", + name: "group, not existing", ctx: isolatedIAMOwnerCTX, req: &action.SetExecutionRequest{ Condition: &action.Condition{ ConditionType: &action.Condition_Event{ Event: &action.EventExecution{ Condition: &action.EventExecution_Group{ - Group: "xxx", + Group: "user.notexisting", + }, + }, + }, + }, + Targets: executionTargetsSingleTarget(targetResp.GetId()), + }, + wantErr: true, + }, + { + name: "group, level 1, ok", + ctx: isolatedIAMOwnerCTX, + req: &action.SetExecutionRequest{ + Condition: &action.Condition{ + ConditionType: &action.Condition_Event{ + Event: &action.EventExecution{ + Condition: &action.EventExecution_Group{ + Group: "user", + }, + }, + }, + }, + Targets: executionTargetsSingleTarget(targetResp.GetId()), + }, + wantSetDate: true, + }, + { + name: "group, level 2, ok", + ctx: isolatedIAMOwnerCTX, + req: &action.SetExecutionRequest{ + Condition: &action.Condition{ + ConditionType: &action.Condition_Event{ + Event: &action.EventExecution{ + Condition: &action.EventExecution_Group{ + Group: "user.human", }, }, }, diff --git a/internal/api/grpc/action/v2beta/integration_test/query_test.go b/internal/api/grpc/action/v2beta/integration_test/query_test.go index c4ac9f93fe..c5159d39da 100644 --- a/internal/api/grpc/action/v2beta/integration_test/query_test.go +++ b/internal/api/grpc/action/v2beta/integration_test/query_test.go @@ -72,7 +72,7 @@ func TestServer_GetTarget(t *testing.T) { TargetType: &action.Target_RestWebhook{ RestWebhook: &action.RESTWebhook{}, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -99,7 +99,7 @@ func TestServer_GetTarget(t *testing.T) { TargetType: &action.Target_RestAsync{ RestAsync: &action.RESTAsync{}, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -128,7 +128,7 @@ func TestServer_GetTarget(t *testing.T) { InterruptOnError: true, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -157,7 +157,7 @@ func TestServer_GetTarget(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -186,7 +186,7 @@ func TestServer_GetTarget(t *testing.T) { InterruptOnError: true, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -293,7 +293,7 @@ func TestServer_ListTargets(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -333,7 +333,7 @@ func TestServer_ListTargets(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, @@ -388,7 +388,7 @@ func TestServer_ListTargets(t *testing.T) { TargetType: &action.Target_RestAsync{ RestAsync: &action.RESTAsync{}, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, { Endpoint: "https://example.com", @@ -397,7 +397,7 @@ func TestServer_ListTargets(t *testing.T) { InterruptOnError: true, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, { Endpoint: "https://example.com", @@ -406,7 +406,7 @@ func TestServer_ListTargets(t *testing.T) { InterruptOnError: false, }, }, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), }, }, }, diff --git a/internal/command/action_v2_execution_test.go b/internal/command/action_v2_execution_test.go index 6833125a0a..d41ea3f2d5 100644 --- a/internal/command/action_v2_execution_test.go +++ b/internal/command/action_v2_execution_test.go @@ -20,6 +20,7 @@ func existsMock(exists bool) func(method string) bool { return exists } } + func TestCommands_SetExecutionRequest(t *testing.T) { type fields struct { eventstore func(t *testing.T) *eventstore.Eventstore diff --git a/internal/command/command.go b/internal/command/command.go index 486e044a41..b0e67ad52e 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -9,7 +9,9 @@ import ( "fmt" "math/big" "net/http" + "slices" "strconv" + "strings" "sync" "time" @@ -177,10 +179,15 @@ func StartCommands( defaultSecretGenerators: defaultSecretGenerators, samlCertificateAndKeyGenerator: samlCertificateAndKeyGenerator(defaults.KeyConfig.CertificateSize, defaults.KeyConfig.CertificateLifetime), webKeyGenerator: crypto.GenerateEncryptedWebKey, - // always true for now until we can check with an eventlist - EventExisting: func(event string) bool { return true }, - // always true for now until we can check with an eventlist - EventGroupExisting: func(group string) bool { return true }, + EventExisting: func(value string) bool { + return slices.Contains(es.EventTypes(), value) + }, + EventGroupExisting: func(group string) bool { + return slices.ContainsFunc(es.EventTypes(), func(value string) bool { + return strings.HasPrefix(value, group) + }, + ) + }, GrpcServiceExisting: func(service string) bool { return false }, GrpcMethodExisting: func(method string) bool { return false }, ActionFunctionExisting: domain.ActionFunctionExists(), diff --git a/internal/execution/ctx.go b/internal/execution/ctx.go new file mode 100644 index 0000000000..9e6bac3e30 --- /dev/null +++ b/internal/execution/ctx.go @@ -0,0 +1,19 @@ +package execution + +import ( + "context" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ExecutionUserID = "EXECUTION" + +func HandlerContext(event *eventstore.Aggregate) context.Context { + ctx := authz.WithInstanceID(context.Background(), event.InstanceID) + return authz.SetCtxData(ctx, authz.CtxData{UserID: ExecutionUserID, OrgID: event.ResourceOwner}) +} + +func ContextWithExecuter(ctx context.Context, aggregate *eventstore.Aggregate) context.Context { + return authz.SetCtxData(ctx, authz.CtxData{UserID: ExecutionUserID, OrgID: aggregate.ResourceOwner}) +} diff --git a/internal/execution/gen_mock.go b/internal/execution/gen_mock.go new file mode 100644 index 0000000000..93eebfbb02 --- /dev/null +++ b/internal/execution/gen_mock.go @@ -0,0 +1,4 @@ +package execution + +//go:generate mockgen -package mock -destination ./mock/queries.mock.go github.com/zitadel/zitadel/internal/execution Queries +//go:generate mockgen -package mock -destination ./mock/queue.mock.go github.com/zitadel/zitadel/internal/execution Queue diff --git a/internal/execution/handlers.go b/internal/execution/handlers.go new file mode 100644 index 0000000000..7ffb4cc6ff --- /dev/null +++ b/internal/execution/handlers.go @@ -0,0 +1,156 @@ +package execution + +import ( + "context" + "encoding/json" + "slices" + "strings" + + "github.com/riverqueue/river" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler/v2" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/queue" + exec_repo "github.com/zitadel/zitadel/internal/repository/execution" +) + +const ( + HandlerTable = "projections.execution_handler" +) + +type Queue interface { + Insert(ctx context.Context, args river.JobArgs, opts ...queue.InsertOpt) error +} + +type Queries interface { + TargetsByExecutionID(ctx context.Context, ids []string) (execution []*query.ExecutionTarget, err error) + InstanceByID(ctx context.Context, id string) (instance authz.Instance, err error) +} + +type eventHandler struct { + eventTypes []string + aggregateTypeFromEventType func(typ eventstore.EventType) eventstore.AggregateType + query Queries + queue Queue +} + +func NewEventHandler( + ctx context.Context, + config handler.Config, + eventTypes []string, + aggregateTypeFromEventType func(typ eventstore.EventType) eventstore.AggregateType, + query Queries, + queue Queue, +) *handler.Handler { + return handler.NewHandler(ctx, &config, &eventHandler{ + eventTypes: eventTypes, + aggregateTypeFromEventType: aggregateTypeFromEventType, + query: query, + queue: queue, + }) +} + +func (u *eventHandler) Name() string { + return HandlerTable +} + +func (u *eventHandler) Reducers() []handler.AggregateReducer { + aggList := make(map[eventstore.AggregateType][]eventstore.EventType) + for _, eventType := range u.eventTypes { + aggType := u.aggregateTypeFromEventType(eventstore.EventType(eventType)) + aggEventTypes := aggList[aggType] + if !slices.Contains(aggEventTypes, eventstore.EventType(eventType)) { + aggList[aggType] = append(aggList[aggType], eventstore.EventType(eventType)) + } + } + + aggReducers := make([]handler.AggregateReducer, 0, len(aggList)) + for aggType, aggEventTypes := range aggList { + eventReducers := make([]handler.EventReducer, len(aggEventTypes)) + for j, eventType := range aggEventTypes { + eventReducers[j] = handler.EventReducer{ + Event: eventType, + Reduce: u.reduce, + } + } + aggReducers = append(aggReducers, handler.AggregateReducer{ + Aggregate: aggType, + EventReducers: eventReducers, + }) + } + return aggReducers +} + +func groupsFromEventType(s string) []string { + parts := strings.Split(s, ".") + groups := make([]string, len(parts)) + for i := range parts { + groups[i] = strings.Join(parts[:i+1], ".") + if i < len(parts)-1 { + groups[i] += ".*" + } + } + slices.Reverse(groups) + return groups +} + +func idsForEventType(eventType string) []string { + ids := make([]string, 0) + for _, group := range groupsFromEventType(eventType) { + ids = append(ids, + exec_repo.ID(domain.ExecutionTypeEvent, group), + ) + } + return append(ids, + exec_repo.IDAll(domain.ExecutionTypeEvent), + ) +} + +func (u *eventHandler) reduce(e eventstore.Event) (*handler.Statement, error) { + ctx := HandlerContext(e.Aggregate()) + + targets, err := u.query.TargetsByExecutionID(ctx, idsForEventType(string(e.Type()))) + if err != nil { + return nil, err + } + + // no execution from worker necessary + if len(targets) == 0 { + return handler.NewNoOpStatement(e), nil + } + + return handler.NewStatement(e, func(ex handler.Executer, projectionName string) error { + ctx := HandlerContext(e.Aggregate()) + req, err := NewRequest(e, targets) + if err != nil { + return err + } + return u.queue.Insert(ctx, + req, + queue.WithQueueName(exec_repo.QueueName), + ) + }), nil +} + +func NewRequest(e eventstore.Event, targets []*query.ExecutionTarget) (*exec_repo.Request, error) { + targetsData, err := json.Marshal(targets) + if err != nil { + return nil, err + } + eventData, err := json.Marshal(e) + if err != nil { + return nil, err + } + return &exec_repo.Request{ + Aggregate: e.Aggregate(), + Sequence: e.Sequence(), + EventType: e.Type(), + CreatedAt: e.CreatedAt(), + UserID: e.Creator(), + EventData: eventData, + TargetsData: targetsData, + }, nil +} diff --git a/internal/execution/handlers_test.go b/internal/execution/handlers_test.go new file mode 100644 index 0000000000..de220abcc0 --- /dev/null +++ b/internal/execution/handlers_test.go @@ -0,0 +1,487 @@ +package execution + +import ( + "database/sql" + "encoding/json" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/repository" + "github.com/zitadel/zitadel/internal/execution/mock" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/repository/action" + execution_rp "github.com/zitadel/zitadel/internal/repository/execution" + "github.com/zitadel/zitadel/internal/repository/session" + "github.com/zitadel/zitadel/internal/repository/user" + "github.com/zitadel/zitadel/internal/zerrors" +) + +func Test_EventExecution(t *testing.T) { + type args struct { + event eventstore.Event + targets []*query.ExecutionTarget + } + type res struct { + targets []Target + contextInfo *execution_rp.ContextInfoEvent + wantErr bool + } + tests := []struct { + name string + args args + res res + }{ + { + "session added, ok", + args{ + event: &eventstore.BaseEvent{ + Agg: &eventstore.Aggregate{ + ID: "aggID", + Type: session.AggregateType, + ResourceOwner: "resourceOwner", + InstanceID: "instanceID", + Version: session.AggregateVersion, + }, + EventType: session.AddedType, + Seq: 1, + Creation: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC), + User: userID, + Data: []byte(`{"ID":"","Seq":1,"Pos":0,"Creation":"2024-01-01T01:01:01.000000001Z"}`), + }, + targets: []*query.ExecutionTarget{{ + InstanceID: instanceID, + ExecutionID: "executionID", + TargetID: "targetID", + TargetType: domain.TargetTypeWebhook, + Endpoint: "endpoint", + Timeout: time.Minute, + InterruptOnError: true, + SigningKey: "key", + }}, + }, + res{ + targets: []Target{ + &query.ExecutionTarget{ + InstanceID: instanceID, + ExecutionID: "executionID", + TargetID: "targetID", + TargetType: domain.TargetTypeWebhook, + Endpoint: "endpoint", + Timeout: time.Minute, + InterruptOnError: true, + SigningKey: "key", + }, + }, + contextInfo: &execution_rp.ContextInfoEvent{ + AggregateID: "aggID", + AggregateType: "session", + ResourceOwner: "resourceOwner", + InstanceID: "instanceID", + Version: "v1", + Sequence: 1, + EventType: "session.added", + CreatedAt: time.Date(2024, 1, 1, 1, 1, 1, 1, time.UTC).Format(time.RFC3339Nano), + UserID: userID, + EventPayload: []byte(`{"ID":"","Seq":1,"Pos":0,"Creation":"2024-01-01T01:01:01.000000001Z"}`), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + request, err := NewRequest(tt.args.event, tt.args.targets) + if tt.res.wantErr { + assert.Error(t, err) + assert.Nil(t, request) + return + } + assert.NoError(t, err) + targets, err := TargetsFromRequest(request) + assert.NoError(t, err) + assert.Equal(t, tt.res.targets, targets) + assert.Equal(t, tt.res.contextInfo, execution_rp.ContextInfoFromRequest(request)) + }) + } +} + +func Test_groupsFromEventType(t *testing.T) { + type args struct { + eventType eventstore.EventType + } + type res struct { + groups []string + } + tests := []struct { + name string + args args + res res + }{ + { + "user human mfa init skipped, ok", + args{ + eventType: user.HumanMFAInitSkippedType, + }, + res{ + groups: []string{ + "user.human.mfa.init.skipped", + "user.human.mfa.init.*", + "user.human.mfa.*", + "user.human.*", + "user.*", + }, + }, + }, + { + "session added, ok", + args{ + eventType: session.AddedType, + }, + res{ + groups: []string{ + "session.added", + "session.*", + }, + }, + }, + { + "user added, ok", + args{ + eventType: user.HumanAddedType, + }, + res{ + groups: []string{ + "user.human.added", + "user.human.*", + "user.*", + }, + }, + }, + { + "execution set, ok", + args{ + eventType: execution_rp.SetEventV2Type, + }, + res{ + groups: []string{ + "execution.v2.set", + "execution.v2.*", + "execution.*", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.res.groups, groupsFromEventType(string(tt.args.eventType))) + }) + } +} + +func Test_idsForEventType(t *testing.T) { + type args struct { + eventType eventstore.EventType + } + type res struct { + groups []string + } + tests := []struct { + name string + args args + res res + }{ + { + "session added, ok", + args{ + eventType: session.AddedType, + }, + res{ + groups: []string{ + "event/session.added", + "event/session.*", + "event", + }, + }, + }, + { + "user added, ok", + args{ + eventType: user.HumanAddedType, + }, + res{ + groups: []string{ + "event/user.human.added", + "event/user.human.*", + "event/user.*", + "event", + }, + }, + }, + { + "execution set, ok", + args{ + eventType: execution_rp.SetEventV2Type, + }, + res{ + groups: []string{ + "event/execution.v2.set", + "event/execution.v2.*", + "event/execution.*", + "event", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.res.groups, idsForEventType(string(tt.args.eventType))) + }) + } +} + +func TestActionProjection_reduces(t *testing.T) { + tests := []struct { + name string + test func(*gomock.Controller, *mock.MockQueries, *mock.MockQueue) (fields, args, want) + }{ + { + name: "reduce, action, error", + test: func(ctrl *gomock.Controller, queries *mock.MockQueries, q *mock.MockQueue) (f fields, a args, w want) { + queries.EXPECT().TargetsByExecutionID(gomock.Any(), gomock.Any()).Return(nil, zerrors.ThrowInternal(nil, "QUERY-37ardr0pki", "Errors.Query.CloseRows")) + return fields{ + queries: queries, + queue: q, + }, args{ + event: &action.AddedEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(&repository.Event{ + InstanceID: instanceID, + AggregateID: eventID, + ResourceOwner: sql.NullString{String: orgID}, + CreationDate: time.Now().UTC(), + Typ: action.AddedEventType, + Data: []byte(eventData), + EditorUser: userID, + Seq: 1, + AggregateType: action.AggregateType, + Version: action.AggregateVersion, + }), + Name: "name", + Script: "name(){}", + Timeout: 3 * time.Second, + AllowedToFail: true, + }, + mapper: action.AddedEventMapper, + }, want{ + err: func(tt assert.TestingT, err error, i ...interface{}) bool { + return errors.Is(err, zerrors.ThrowInternal(nil, "QUERY-37ardr0pki", "Errors.Query.CloseRows")) + }, + } + }, + }, + + { + name: "reduce, action, none", + test: func(ctrl *gomock.Controller, queries *mock.MockQueries, q *mock.MockQueue) (f fields, a args, w want) { + queries.EXPECT().TargetsByExecutionID(gomock.Any(), gomock.Any()).Return([]*query.ExecutionTarget{}, nil) + return fields{ + queries: queries, + queue: q, + }, args{ + event: &action.AddedEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(&repository.Event{ + InstanceID: instanceID, + AggregateID: eventID, + ResourceOwner: sql.NullString{String: orgID}, + CreationDate: time.Now().UTC(), + Typ: action.AddedEventType, + Data: []byte(eventData), + EditorUser: userID, + Seq: 1, + AggregateType: action.AggregateType, + Version: action.AggregateVersion, + }), + Name: "name", + Script: "name(){}", + Timeout: 3 * time.Second, + AllowedToFail: true, + }, + mapper: action.AddedEventMapper, + }, want{ + noOperation: true, + } + }, + }, + { + name: "reduce, action, single", + test: func(ctrl *gomock.Controller, queries *mock.MockQueries, q *mock.MockQueue) (f fields, a args, w want) { + targets := mockTargets(1) + queries.EXPECT().TargetsByExecutionID(gomock.Any(), gomock.Any()).Return(targets, nil) + createdAt := time.Now().UTC() + q.EXPECT().Insert( + gomock.Any(), + &execution_rp.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + Type: action.AggregateType, + Version: action.AggregateVersion, + ID: eventID, + ResourceOwner: orgID, + }, + Sequence: 1, + CreatedAt: createdAt, + EventType: action.AddedEventType, + UserID: userID, + EventData: []byte(eventData), + TargetsData: mockTargetsToBytes(targets), + }, + gomock.Any(), + ).Return(nil) + return fields{ + queries: queries, + queue: q, + }, args{ + event: &action.AddedEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(&repository.Event{ + InstanceID: instanceID, + AggregateID: eventID, + ResourceOwner: sql.NullString{String: orgID}, + CreationDate: createdAt, + Typ: action.AddedEventType, + Data: []byte(eventData), + EditorUser: userID, + Seq: 1, + AggregateType: action.AggregateType, + Version: action.AggregateVersion, + }), + Name: "name", + Script: "name(){}", + Timeout: 3 * time.Second, + AllowedToFail: true, + }, + mapper: action.AddedEventMapper, + }, w + }, + }, + { + name: "reduce, action, multiple", + test: func(ctrl *gomock.Controller, queries *mock.MockQueries, q *mock.MockQueue) (f fields, a args, w want) { + targets := mockTargets(3) + queries.EXPECT().TargetsByExecutionID(gomock.Any(), gomock.Any()).Return(targets, nil) + createdAt := time.Now().UTC() + q.EXPECT().Insert( + gomock.Any(), + &execution_rp.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + Type: action.AggregateType, + Version: action.AggregateVersion, + ID: eventID, + ResourceOwner: orgID, + }, + Sequence: 1, + CreatedAt: createdAt, + EventType: action.AddedEventType, + UserID: userID, + EventData: []byte(eventData), + TargetsData: mockTargetsToBytes(targets), + }, + gomock.Any(), + ).Return(nil) + return fields{ + queries: queries, + queue: q, + }, args{ + event: &action.AddedEvent{ + BaseEvent: *eventstore.BaseEventFromRepo(&repository.Event{ + InstanceID: instanceID, + AggregateID: eventID, + ResourceOwner: sql.NullString{String: orgID}, + CreationDate: createdAt, + Typ: action.AddedEventType, + Data: []byte(eventData), + EditorUser: userID, + Seq: 1, + AggregateType: action.AggregateType, + Version: action.AggregateVersion, + }), + Name: "name", + Script: "name(){}", + Timeout: 3 * time.Second, + AllowedToFail: true, + }, + mapper: action.AddedEventMapper, + }, w + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + queries := mock.NewMockQueries(ctrl) + queue := mock.NewMockQueue(ctrl) + f, a, w := tt.test(ctrl, queries, queue) + + event, err := a.mapper(a.event) + assert.NoError(t, err) + + stmt, err := newEventExecutionsHandler(queries, f).reduce(event) + if w.err != nil { + w.err(t, err) + return + } + assert.NoError(t, err) + + if w.noOperation { + assert.Nil(t, stmt.Execute) + return + } + err = stmt.Execute(nil, "") + if w.stmtErr != nil { + w.stmtErr(t, err) + return + } + assert.NoError(t, err) + }) + } +} + +func mockTarget() *query.ExecutionTarget { + return &query.ExecutionTarget{ + InstanceID: "instanceID", + ExecutionID: "executionID", + TargetID: "targetID", + TargetType: domain.TargetTypeWebhook, + Endpoint: "endpoint", + Timeout: time.Minute, + InterruptOnError: true, + SigningKey: "key", + } +} + +func mockTargets(count int) []*query.ExecutionTarget { + var targets []*query.ExecutionTarget + if count > 0 { + targets = make([]*query.ExecutionTarget, count) + for i := range targets { + targets[i] = mockTarget() + } + } + return targets +} + +func mockTargetsToBytes(targets []*query.ExecutionTarget) []byte { + data, _ := json.Marshal(targets) + return data +} + +func newEventExecutionsHandler(queries *mock.MockQueries, f fields) *eventHandler { + return &eventHandler{ + queue: f.queue, + query: queries, + } +} diff --git a/internal/execution/mock/queries.mock.go b/internal/execution/mock/queries.mock.go new file mode 100644 index 0000000000..ab7cf38a32 --- /dev/null +++ b/internal/execution/mock/queries.mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/zitadel/zitadel/internal/execution (interfaces: Queries) +// +// Generated by this command: +// +// mockgen -package mock -destination ./mock/queries.mock.go github.com/zitadel/zitadel/internal/execution Queries +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + authz "github.com/zitadel/zitadel/internal/api/authz" + query "github.com/zitadel/zitadel/internal/query" + gomock "go.uber.org/mock/gomock" +) + +// MockQueries is a mock of Queries interface. +type MockQueries struct { + ctrl *gomock.Controller + recorder *MockQueriesMockRecorder +} + +// MockQueriesMockRecorder is the mock recorder for MockQueries. +type MockQueriesMockRecorder struct { + mock *MockQueries +} + +// NewMockQueries creates a new mock instance. +func NewMockQueries(ctrl *gomock.Controller) *MockQueries { + mock := &MockQueries{ctrl: ctrl} + mock.recorder = &MockQueriesMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueries) EXPECT() *MockQueriesMockRecorder { + return m.recorder +} + +// InstanceByID mocks base method. +func (m *MockQueries) InstanceByID(arg0 context.Context, arg1 string) (authz.Instance, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstanceByID", arg0, arg1) + ret0, _ := ret[0].(authz.Instance) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// InstanceByID indicates an expected call of InstanceByID. +func (mr *MockQueriesMockRecorder) InstanceByID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstanceByID", reflect.TypeOf((*MockQueries)(nil).InstanceByID), arg0, arg1) +} + +// TargetsByExecutionID mocks base method. +func (m *MockQueries) TargetsByExecutionID(arg0 context.Context, arg1 []string) ([]*query.ExecutionTarget, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TargetsByExecutionID", arg0, arg1) + ret0, _ := ret[0].([]*query.ExecutionTarget) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TargetsByExecutionID indicates an expected call of TargetsByExecutionID. +func (mr *MockQueriesMockRecorder) TargetsByExecutionID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TargetsByExecutionID", reflect.TypeOf((*MockQueries)(nil).TargetsByExecutionID), arg0, arg1) +} diff --git a/internal/execution/mock/queue.mock.go b/internal/execution/mock/queue.mock.go new file mode 100644 index 0000000000..c0e8d5fc7b --- /dev/null +++ b/internal/execution/mock/queue.mock.go @@ -0,0 +1,61 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/zitadel/zitadel/internal/execution (interfaces: Queue) +// +// Generated by this command: +// +// mockgen -package mock -destination ./mock/queue.mock.go github.com/zitadel/zitadel/internal/execution Queue +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + river "github.com/riverqueue/river" + queue "github.com/zitadel/zitadel/internal/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockQueue is a mock of Queue interface. +type MockQueue struct { + ctrl *gomock.Controller + recorder *MockQueueMockRecorder +} + +// MockQueueMockRecorder is the mock recorder for MockQueue. +type MockQueueMockRecorder struct { + mock *MockQueue +} + +// NewMockQueue creates a new mock instance. +func NewMockQueue(ctrl *gomock.Controller) *MockQueue { + mock := &MockQueue{ctrl: ctrl} + mock.recorder = &MockQueueMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueue) EXPECT() *MockQueueMockRecorder { + return m.recorder +} + +// Insert mocks base method. +func (m *MockQueue) Insert(arg0 context.Context, arg1 river.JobArgs, arg2 ...queue.InsertOpt) error { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Insert", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Insert indicates an expected call of Insert. +func (mr *MockQueueMockRecorder) Insert(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockQueue)(nil).Insert), varargs...) +} diff --git a/internal/execution/projections.go b/internal/execution/projections.go new file mode 100644 index 0000000000..d16d7c6fca --- /dev/null +++ b/internal/execution/projections.go @@ -0,0 +1,36 @@ +package execution + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler/v2" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/queue" +) + +var ( + projections []*handler.Handler +) + +func Register( + ctx context.Context, + executionsCustomConfig projection.CustomConfig, + workerConfig WorkerConfig, + queries *query.Queries, + eventTypes []string, + queue *queue.Queue, +) { + queue.ShouldStart() + projections = []*handler.Handler{ + NewEventHandler(ctx, projection.ApplyCustomConfig(executionsCustomConfig), eventTypes, eventstore.AggregateTypeFromEventType, queries, queue), + } + queue.AddWorkers(NewWorker(workerConfig)) +} + +func Start(ctx context.Context) { + for _, projection := range projections { + projection.Start(ctx) + } +} diff --git a/internal/execution/target_test.go b/internal/execution/target_test.go new file mode 100644 index 0000000000..8df480219d --- /dev/null +++ b/internal/execution/target_test.go @@ -0,0 +1,85 @@ +package execution + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "reflect" + "time" +) + +type testServer struct { + server *httptest.Server + called bool +} + +func (s *testServer) URL() string { + return s.server.URL +} + +func (s *testServer) Close() { + s.server.Close() +} + +func (s *testServer) Called() bool { + return s.called +} + +func testServerCall( + reqBody interface{}, + sleep time.Duration, + statusCode int, + respBody interface{}, +) (string, func(), func() bool) { + server := &testServer{ + called: false, + } + + handler := func(w http.ResponseWriter, r *http.Request) { + server.called = true + if reqBody != nil { + data, err := json.Marshal(reqBody) + if err != nil { + http.Error(w, "error, marshall: "+err.Error(), http.StatusInternalServerError) + return + } + sentBody, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "error, read body: "+err.Error(), http.StatusInternalServerError) + return + } + if !reflect.DeepEqual(data, sentBody) { + http.Error(w, "error, equal:\n"+string(data)+"\nsent:\n"+string(sentBody), http.StatusInternalServerError) + return + } + } + if statusCode != http.StatusOK { + http.Error(w, "error, statusCode", statusCode) + return + } + + time.Sleep(sleep) + + if respBody != nil { + w.Header().Set("Content-Type", "application/json") + resp, err := json.Marshal(respBody) + if err != nil { + http.Error(w, "error", http.StatusInternalServerError) + return + } + if _, err := w.Write(resp); err != nil { + http.Error(w, "error", http.StatusInternalServerError) + return + } + } else { + if _, err := io.WriteString(w, "finished successfully"); err != nil { + http.Error(w, "error", http.StatusInternalServerError) + return + } + } + } + + server.server = httptest.NewServer(http.HandlerFunc(handler)) + return server.URL(), server.Close, server.Called +} diff --git a/internal/execution/worker.go b/internal/execution/worker.go new file mode 100644 index 0000000000..105fa7d46e --- /dev/null +++ b/internal/execution/worker.go @@ -0,0 +1,90 @@ +package execution + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/riverqueue/river" + + "github.com/zitadel/zitadel/internal/query" + exec_repo "github.com/zitadel/zitadel/internal/repository/execution" +) + +type Worker struct { + river.WorkerDefaults[*exec_repo.Request] + + config WorkerConfig + now nowFunc +} + +// Timeout implements the Timeout-function of [river.Worker]. +// Maximum time a job can run before the context gets cancelled. +// The time can be shorter than the sum of target timeouts, this is expected behavior to not block the request indefinitely. +func (w *Worker) Timeout(*river.Job[*exec_repo.Request]) time.Duration { + return w.config.TransactionDuration +} + +// Work implements [river.Worker]. +func (w *Worker) Work(ctx context.Context, job *river.Job[*exec_repo.Request]) error { + ctx = ContextWithExecuter(ctx, job.Args.Aggregate) + + // if the event is too old, we can directly return as it will be removed anyway + if job.CreatedAt.Add(w.config.MaxTtl).Before(w.now()) { + return river.JobCancel(errors.New("event is too old")) + } + + targets, err := TargetsFromRequest(job.Args) + if err != nil { + // If we are not able to get the targets from the request, we can cancel the job, as we have nothing to call + return river.JobCancel(fmt.Errorf("unable to unmarshal targets because %w", err)) + } + + _, err = CallTargets(ctx, targets, exec_repo.ContextInfoFromRequest(job.Args)) + if err != nil { + // If there is an error returned from the targets, it means that the execution was interrupted + return river.JobCancel(fmt.Errorf("interruption during call of targets because %w", err)) + } + return nil +} + +// nowFunc makes [time.Now] mockable +type nowFunc func() time.Time + +type WorkerConfig struct { + Workers uint8 + TransactionDuration time.Duration + MaxTtl time.Duration +} + +func NewWorker( + config WorkerConfig, +) *Worker { + return &Worker{ + config: config, + now: time.Now, + } +} + +var _ river.Worker[*exec_repo.Request] = (*Worker)(nil) + +func (w *Worker) Register(workers *river.Workers, queues map[string]river.QueueConfig) { + river.AddWorker(workers, w) + queues[exec_repo.QueueName] = river.QueueConfig{ + MaxWorkers: int(w.config.Workers), + } +} + +func TargetsFromRequest(e *exec_repo.Request) ([]Target, error) { + var execTargets []*query.ExecutionTarget + if err := json.Unmarshal(e.TargetsData, &execTargets); err != nil { + return nil, err + } + targets := make([]Target, len(execTargets)) + for i, target := range execTargets { + targets[i] = target + } + return targets, nil +} diff --git a/internal/execution/worker_test.go b/internal/execution/worker_test.go new file mode 100644 index 0000000000..32f7879477 --- /dev/null +++ b/internal/execution/worker_test.go @@ -0,0 +1,288 @@ +package execution + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "testing" + "time" + + "github.com/riverqueue/river" + "github.com/riverqueue/river/rivertype" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/execution/mock" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/repository/action" + exec_repo "github.com/zitadel/zitadel/internal/repository/execution" + "github.com/zitadel/zitadel/internal/repository/user" + "github.com/zitadel/zitadel/internal/zerrors" +) + +type fields struct { + queries *mock.MockQueries + queue *mock.MockQueue +} +type fieldsWorker struct { + now nowFunc +} +type args struct { + event eventstore.Event + mapper func(event eventstore.Event) (eventstore.Event, error) +} +type argsWorker struct { + job *river.Job[*exec_repo.Request] +} +type want struct { + noOperation bool + err assert.ErrorAssertionFunc + stmtErr assert.ErrorAssertionFunc +} +type wantWorker struct { + targets []*query.ExecutionTarget + sendStatusCode int + err assert.ErrorAssertionFunc +} + +func newExecutionWorker(f fieldsWorker) *Worker { + return &Worker{ + config: WorkerConfig{ + Workers: 1, + TransactionDuration: 5 * time.Second, + MaxTtl: 5 * time.Minute, + }, + now: f.now, + } +} + +const ( + userID = "user1" + orgID = "orgID" + instanceID = "instanceID" + eventID = "eventID" + eventData = `{"name":"name","script":"name(){}","timeout":3000000000,"allowedToFail":true}` +) + +func Test_handleEventExecution(t *testing.T) { + testNow := time.Now + tests := []struct { + name string + test func() (fieldsWorker, argsWorker, wantWorker) + }{ + { + "max TTL", + func() (fieldsWorker, argsWorker, wantWorker) { + return fieldsWorker{ + now: testNow, + }, + argsWorker{ + job: &river.Job[*exec_repo.Request]{ + JobRow: &rivertype.JobRow{ + CreatedAt: time.Now().Add(-1 * time.Hour), + }, + Args: &exec_repo.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + ID: eventID, + ResourceOwner: instanceID, + }, + Sequence: 1, + CreatedAt: time.Now().Add(-1 * time.Hour), + EventType: user.HumanInviteCodeAddedType, + UserID: userID, + EventData: []byte(eventData), + }, + }, + }, + wantWorker{ + targets: mockTargets(1), + sendStatusCode: http.StatusOK, + err: func(tt assert.TestingT, err error, i ...interface{}) bool { + return errors.Is(err, new(river.JobCancelError)) + }, + } + }, + }, + { + "none", + func() (fieldsWorker, argsWorker, wantWorker) { + return fieldsWorker{ + now: testNow, + }, + argsWorker{ + job: &river.Job[*exec_repo.Request]{ + JobRow: &rivertype.JobRow{ + CreatedAt: time.Now(), + }, + Args: &exec_repo.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + ID: eventID, + ResourceOwner: instanceID, + }, + Sequence: 1, + CreatedAt: time.Now(), + EventType: user.HumanInviteCodeAddedType, + UserID: userID, + EventData: []byte(eventData), + }, + }, + }, + wantWorker{ + targets: mockTargets(0), + sendStatusCode: http.StatusOK, + err: nil, + } + }, + }, + { + "single", + func() (fieldsWorker, argsWorker, wantWorker) { + return fieldsWorker{ + now: testNow, + }, + argsWorker{ + job: &river.Job[*exec_repo.Request]{ + JobRow: &rivertype.JobRow{ + CreatedAt: time.Now(), + }, + Args: &exec_repo.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + Type: action.AggregateType, + Version: action.AggregateVersion, + ID: eventID, + ResourceOwner: orgID, + }, + Sequence: 1, + CreatedAt: time.Now().UTC(), + EventType: action.AddedEventType, + UserID: userID, + EventData: []byte(eventData), + }, + }, + }, + wantWorker{ + targets: mockTargets(1), + sendStatusCode: http.StatusOK, + err: nil, + } + }, + }, + { + "single, failed 400", + func() (fieldsWorker, argsWorker, wantWorker) { + return fieldsWorker{ + now: testNow, + }, + argsWorker{ + job: &river.Job[*exec_repo.Request]{ + JobRow: &rivertype.JobRow{ + CreatedAt: time.Now(), + }, + Args: &exec_repo.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + Type: action.AggregateType, + Version: action.AggregateVersion, + ID: eventID, + ResourceOwner: orgID, + }, + Sequence: 1, + CreatedAt: time.Now().UTC(), + EventType: action.AddedEventType, + UserID: userID, + EventData: []byte(eventData), + }, + }, + }, + wantWorker{ + targets: mockTargets(1), + sendStatusCode: http.StatusBadRequest, + err: func(tt assert.TestingT, err error, i ...interface{}) bool { + return errors.Is(err, zerrors.ThrowPreconditionFailed(nil, "EXEC-dra6yamk98", "Errors.Execution.Failed")) + }, + } + }, + }, + { + "multiple", + func() (fieldsWorker, argsWorker, wantWorker) { + return fieldsWorker{ + now: testNow, + }, + argsWorker{ + job: &river.Job[*exec_repo.Request]{ + JobRow: &rivertype.JobRow{ + CreatedAt: time.Now(), + }, + Args: &exec_repo.Request{ + Aggregate: &eventstore.Aggregate{ + InstanceID: instanceID, + Type: action.AggregateType, + Version: action.AggregateVersion, + ID: eventID, + ResourceOwner: orgID, + }, + Sequence: 1, + CreatedAt: time.Now().UTC(), + EventType: action.AddedEventType, + UserID: userID, + EventData: []byte(eventData), + }, + }, + }, + wantWorker{ + targets: mockTargets(3), + sendStatusCode: http.StatusOK, + err: nil, + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f, a, w := tt.test() + + closeFuncs := make([]func(), len(w.targets)) + calledFuncs := make([]func() bool, len(w.targets)) + for i := range w.targets { + url, closeF, calledF := testServerCall( + exec_repo.ContextInfoFromRequest(a.job.Args), + time.Second, + w.sendStatusCode, + nil, + ) + w.targets[i].Endpoint = url + closeFuncs[i] = closeF + calledFuncs[i] = calledF + } + + data, err := json.Marshal(w.targets) + require.NoError(t, err) + a.job.Args.TargetsData = data + + err = newExecutionWorker(f).Work( + authz.WithInstanceID(context.Background(), instanceID), + a.job, + ) + + if w.err != nil { + assert.Error(t, err) + return + } + assert.NoError(t, err) + + for _, closeF := range closeFuncs { + closeF() + } + for _, calledF := range calledFuncs { + assert.True(t, calledF()) + } + }) + } +} diff --git a/internal/integration/action.go b/internal/integration/action.go new file mode 100644 index 0000000000..b8f69c5788 --- /dev/null +++ b/internal/integration/action.go @@ -0,0 +1,102 @@ +package integration + +import ( + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "reflect" + "sync" + "time" +) + +type server struct { + server *httptest.Server + mu sync.Mutex + called int +} + +func (s *server) URL() string { + return s.server.URL +} + +func (s *server) Close() { + s.server.Close() +} + +func (s *server) Called() int { + s.mu.Lock() + called := s.called + s.mu.Unlock() + return called +} + +func (s *server) Increase() { + s.mu.Lock() + s.called++ + s.mu.Unlock() +} + +func (s *server) ResetCalled() { + s.mu.Lock() + s.called = 0 + s.mu.Unlock() +} + +func TestServerCall( + reqBody interface{}, + sleep time.Duration, + statusCode int, + respBody interface{}, +) (url string, closeF func(), calledF func() int, resetCalledF func()) { + server := &server{ + called: 0, + } + + handler := func(w http.ResponseWriter, r *http.Request) { + server.Increase() + if reqBody != nil { + data, err := json.Marshal(reqBody) + if err != nil { + http.Error(w, "error, marshall: "+err.Error(), http.StatusInternalServerError) + return + } + sentBody, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "error, read body: "+err.Error(), http.StatusInternalServerError) + return + } + if !reflect.DeepEqual(data, sentBody) { + http.Error(w, "error, equal:\n"+string(data)+"\nsent:\n"+string(sentBody), http.StatusInternalServerError) + return + } + } + if statusCode != http.StatusOK { + http.Error(w, "error, statusCode", statusCode) + return + } + + time.Sleep(sleep) + + if respBody != nil { + w.Header().Set("Content-Type", "application/json") + resp, err := json.Marshal(respBody) + if err != nil { + http.Error(w, "error", http.StatusInternalServerError) + return + } + if _, err := io.Writer.Write(w, resp); err != nil { + http.Error(w, "error", http.StatusInternalServerError) + return + } + } else { + if _, err := io.WriteString(w, "finished successfully"); err != nil { + http.Error(w, "error", http.StatusInternalServerError) + return + } + } + } + + server.server = httptest.NewServer(http.HandlerFunc(handler)) + return server.URL(), server.Close, server.Called, server.ResetCalled +} diff --git a/internal/integration/client.go b/internal/integration/client.go index 287004605a..5bec39e72c 100644 --- a/internal/integration/client.go +++ b/internal/integration/client.go @@ -695,7 +695,7 @@ func (i *Instance) CreateTarget(ctx context.Context, t *testing.T, name, endpoin req := &action.CreateTargetRequest{ Name: name, Endpoint: endpoint, - Timeout: durationpb.New(10 * time.Second), + Timeout: durationpb.New(5 * time.Second), } switch ty { case domain.TargetTypeWebhook: diff --git a/internal/notification/handlers/notification_worker.go b/internal/notification/handlers/notification_worker.go index e2f1d58153..fa082bc345 100644 --- a/internal/notification/handlers/notification_worker.go +++ b/internal/notification/handlers/notification_worker.go @@ -13,14 +13,12 @@ import ( "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/crypto" - "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/domain" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/notification/channels" "github.com/zitadel/zitadel/internal/notification/senders" "github.com/zitadel/zitadel/internal/notification/types" "github.com/zitadel/zitadel/internal/query" - "github.com/zitadel/zitadel/internal/queue" "github.com/zitadel/zitadel/internal/repository/notification" ) @@ -34,13 +32,13 @@ type NotificationWorker struct { commands Commands queries *NotificationQueries - es *eventstore.Eventstore - client *database.DB channels types.ChannelChains config WorkerConfig now nowFunc } +// Timeout implements the Timeout-function of [river.Worker]. +// Maximum time a job can run before the context gets cancelled. func (w *NotificationWorker) Timeout(*river.Job[*notification.Request]) time.Duration { return w.config.TransactionDuration } @@ -106,24 +104,15 @@ func NewNotificationWorker( config WorkerConfig, commands Commands, queries *NotificationQueries, - es *eventstore.Eventstore, - client *database.DB, channels types.ChannelChains, - queue *queue.Queue, ) *NotificationWorker { - w := &NotificationWorker{ + return &NotificationWorker{ config: config, commands: commands, queries: queries, - es: es, - client: client, channels: channels, now: time.Now, } - if !config.LegacyEnabled { - queue.AddWorkers(w) - } - return w } var _ river.Worker[*notification.Request] = (*NotificationWorker)(nil) diff --git a/internal/notification/projections.go b/internal/notification/projections.go index 6a0296f3bf..a2d4d4140e 100644 --- a/internal/notification/projections.go +++ b/internal/notification/projections.go @@ -6,7 +6,6 @@ import ( "github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/crypto" - "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore/handler/v2" "github.com/zitadel/zitadel/internal/notification/handlers" @@ -18,7 +17,6 @@ import ( var ( projections []*handler.Handler - worker *handlers.NotificationWorker ) func Register( @@ -35,7 +33,6 @@ func Register( otpEmailTmpl, fileSystemPath string, userEncryption, smtpEncryption, smsEncryption, keysEncryptionAlg crypto.EncryptionAlgorithm, tokenLifetime time.Duration, - client *database.DB, queue *queue.Queue, ) { if !notificationWorkerConfig.LegacyEnabled { @@ -59,7 +56,9 @@ func Register( if telemetryCfg.Enabled { projections = append(projections, handlers.NewTelemetryPusher(ctx, telemetryCfg, projection.ApplyCustomConfig(telemetryHandlerCustomConfig), commands, q, c)) } - worker = handlers.NewNotificationWorker(notificationWorkerConfig, commands, q, es, client, c, queue) + if !notificationWorkerConfig.LegacyEnabled { + queue.AddWorkers(handlers.NewNotificationWorker(notificationWorkerConfig, commands, q, c)) + } } func Start(ctx context.Context) { diff --git a/internal/repository/execution/queue.go b/internal/repository/execution/queue.go new file mode 100644 index 0000000000..28f8edbf31 --- /dev/null +++ b/internal/repository/execution/queue.go @@ -0,0 +1,71 @@ +package execution + +import ( + "encoding/json" + "time" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +const ( + QueueName = "execution" +) + +type Request struct { + Aggregate *eventstore.Aggregate `json:"aggregate"` + Sequence uint64 `json:"sequence"` + EventType eventstore.EventType `json:"eventType"` + CreatedAt time.Time `json:"createdAt"` + UserID string `json:"userID"` + EventData []byte `json:"eventData"` + TargetsData []byte `json:"targetsData"` +} + +func (e *Request) Kind() string { + return "execution_request" +} + +func ContextInfoFromRequest(e *Request) *ContextInfoEvent { + return &ContextInfoEvent{ + AggregateID: e.Aggregate.ID, + AggregateType: string(e.Aggregate.Type), + ResourceOwner: e.Aggregate.ResourceOwner, + InstanceID: e.Aggregate.InstanceID, + Version: string(e.Aggregate.Version), + Sequence: e.Sequence, + EventType: string(e.EventType), + CreatedAt: e.CreatedAt.Format(time.RFC3339Nano), + UserID: e.UserID, + EventPayload: e.EventData, + } +} + +type ContextInfoEvent struct { + AggregateID string `json:"aggregateID,omitempty"` + AggregateType string `json:"aggregateType,omitempty"` + ResourceOwner string `json:"resourceOwner,omitempty"` + InstanceID string `json:"instanceID,omitempty"` + Version string `json:"version,omitempty"` + Sequence uint64 `json:"sequence,omitempty"` + EventType string `json:"event_type,omitempty"` + CreatedAt string `json:"created_at,omitempty"` + UserID string `json:"userID,omitempty"` + EventPayload []byte `json:"event_payload,omitempty"` +} + +func (c *ContextInfoEvent) GetHTTPRequestBody() []byte { + data, err := json.Marshal(c) + if err != nil { + return nil + } + return data +} + +func (c *ContextInfoEvent) SetHTTPResponseBody(resp []byte) error { + // response is irrelevant and will not be unmarshaled + return nil +} + +func (c *ContextInfoEvent) GetContent() any { + return c.EventPayload +}