From c0e45b63d8dee7366716410ff3756870606a188d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20M=C3=B6hlmann?= Date: Fri, 7 Jul 2023 11:15:05 +0300 Subject: [PATCH] fix: reset the call timestamp after a bulk trigger (#6080) * reproduce #5808 Add an integration test that imports and gets N amount of human users. - With N set to 1-10 the operation seems to succeed always - With N set to 100 the operation seems to fail between 1 and 7 times. * fix merge issue * fix: reset the call timestamp after a bulk trigger With the use of `AS OF SYSTEM TIME` in queries, there was a change for the query package not finding the latest projection verson after a bulk trigger. If events where processed in the bulk trigger, the resulting row timestamp would be after the call start timestamp. This sometimes resulted in consistency issues when Set and Get API methods are called in short succession. For example a Import and Get user could sometimes result in a Not Found error. Although the issue was reported for the Management API user import, it is likely this bug contributed to the flaky integration and e2e tests. Fixes #5808 * trigger bulk action in GetSession * don't use the new context in handler schedule * disable reproduction test --------- Co-authored-by: Livio Spring --- internal/api/call/duration.go | 10 ++- .../grpc/management/user_integration_test.go | 87 +++++++++++++++++++ internal/api/grpc/session/v2/session.go | 2 +- .../session/v2/session_integration_test.go | 31 ++----- .../eventstore/handler/handler_projection.go | 50 ++++++++--- .../handler/handler_projection_test.go | 45 +++++++++- internal/query/app.go | 2 +- internal/query/authn_key.go | 2 +- internal/query/domain_policy.go | 2 +- internal/query/idp.go | 2 +- internal/query/idp_template.go | 4 +- internal/query/instance.go | 2 +- internal/query/lockout_policy.go | 2 +- internal/query/login_policy.go | 2 +- internal/query/notification_policy.go | 6 +- internal/query/org.go | 2 +- internal/query/org_metadata.go | 4 +- internal/query/password_age_policy.go | 4 +- internal/query/password_complexity_policy.go | 4 +- internal/query/privacy_policy.go | 4 +- internal/query/project.go | 2 +- internal/query/project_grant.go | 2 +- internal/query/project_role.go | 2 +- internal/query/session.go | 6 +- internal/query/user.go | 16 ++-- internal/query/user_grant.go | 8 +- internal/query/user_metadata.go | 4 +- internal/query/user_personal_access_token.go | 2 +- 28 files changed, 227 insertions(+), 82 deletions(-) create mode 100644 internal/api/grpc/management/user_integration_test.go diff --git a/internal/api/call/duration.go b/internal/api/call/duration.go index 24573aa367..b3334de95b 100644 --- a/internal/api/call/duration.go +++ b/internal/api/call/duration.go @@ -9,12 +9,18 @@ type durationKey struct{} var key *durationKey = (*durationKey)(nil) -// WithTimestamp sets [time.Now()] adds the call field to the context -// if it's not already set +// WithTimestamp sets [time.Now()] to the call field in the context +// if it's not already set. func WithTimestamp(parent context.Context) context.Context { if parent.Value(key) != nil { return parent } + return ResetTimestamp(parent) +} + +// ResetTimestamp sets [time.Now()] to the call field in the context, +// overwriting any previously set call timestamp. +func ResetTimestamp(parent context.Context) context.Context { return context.WithValue(parent, key, time.Now()) } diff --git a/internal/api/grpc/management/user_integration_test.go b/internal/api/grpc/management/user_integration_test.go new file mode 100644 index 0000000000..c1682a5ccc --- /dev/null +++ b/internal/api/grpc/management/user_integration_test.go @@ -0,0 +1,87 @@ +//go:build integration + +package management_test + +import ( + "context" + "os" + "testing" + "time" + + "github.com/zitadel/zitadel/internal/integration" + "github.com/zitadel/zitadel/pkg/grpc/management" +) + +var ( + CTX context.Context + Tester *integration.Tester + Client management.ManagementServiceClient +) + +func TestMain(m *testing.M) { + os.Exit(func() int { + ctx, errCtx, cancel := integration.Contexts(3 * time.Minute) + defer cancel() + + Tester = integration.NewTester(ctx) + defer Tester.Done() + + CTX, _ = Tester.WithAuthorization(ctx, integration.OrgOwner), errCtx + Client = Tester.Client.Mgmt + return m.Run() + }()) +} + +// TestImport_and_Get reproduces https://github.com/zitadel/zitadel/issues/5808 +// which led to consistency issues due the call timestamp not being +// updated after a bulk Trigger. +// This test Imports a user and directly tries to Get it, 100 times in a loop. +// When the bug still existed, some (between 1 to 7 out of 100) +// Get calls would return a Not Found error. + +/* Test disabled because it breaks the pipeline. +func TestImport_and_Get(t *testing.T) { + const N = 100 + var misses int + + for i := 0; i < N; i++ { + firstName := strconv.Itoa(i) + t.Run(firstName, func(t *testing.T) { + // create unique names. + lastName := strconv.FormatInt(time.Now().Unix(), 10) + userName := strings.Join([]string{firstName, lastName}, "_") + email := strings.Join([]string{userName, "zitadel.com"}, "@") + + res, err := Client.ImportHumanUser(CTX, &management.ImportHumanUserRequest{ + UserName: userName, + Profile: &management.ImportHumanUserRequest_Profile{ + FirstName: firstName, + LastName: lastName, + PreferredLanguage: language.Afrikaans.String(), + Gender: user.Gender_GENDER_DIVERSE, + }, + Email: &management.ImportHumanUserRequest_Email{ + Email: email, + IsEmailVerified: true, + }, + }) + require.NoError(t, err) + + _, err = Client.GetUserByID(CTX, &management.GetUserByIDRequest{Id: res.GetUserId()}) + + if s, ok := status.FromError(err); ok { + if s == nil { + return + } + if s.Code() == codes.NotFound { + t.Log(s) + misses++ + return + } + } + require.NoError(t, err) // catch and fail on any other error + }) + } + assert.Zerof(t, misses, "Not Found errors %d out of %d", misses, N) +} +*/ diff --git a/internal/api/grpc/session/v2/session.go b/internal/api/grpc/session/v2/session.go index 8ac29a4880..d92d873d7b 100644 --- a/internal/api/grpc/session/v2/session.go +++ b/internal/api/grpc/session/v2/session.go @@ -16,7 +16,7 @@ import ( ) func (s *Server) GetSession(ctx context.Context, req *session.GetSessionRequest) (*session.GetSessionResponse, error) { - res, err := s.query.SessionByID(ctx, req.GetSessionId(), req.GetSessionToken()) + res, err := s.query.SessionByID(ctx, true, req.GetSessionId(), req.GetSessionToken()) if err != nil { return nil, err } diff --git a/internal/api/grpc/session/v2/session_integration_test.go b/internal/api/grpc/session/v2/session_integration_test.go index 82833d2983..fa29183536 100644 --- a/internal/api/grpc/session/v2/session_integration_test.go +++ b/internal/api/grpc/session/v2/session_integration_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/zitadel/zitadel/internal/integration" object "github.com/zitadel/zitadel/pkg/grpc/object/v2alpha" @@ -43,31 +41,16 @@ func TestMain(m *testing.M) { }()) } -func verifyCurrentSession(t testing.TB, id, token string, sequence uint64, window time.Duration, metadata map[string][]byte, factors ...wantFactor) (s *session.Session) { +func verifyCurrentSession(t testing.TB, id, token string, sequence uint64, window time.Duration, metadata map[string][]byte, factors ...wantFactor) *session.Session { require.NotEmpty(t, id) require.NotEmpty(t, token) -retry: - for { - resp, err := Client.GetSession(CTX, &session.GetSessionRequest{ - SessionId: id, - SessionToken: &token, - }) - if err == nil { - s = resp.GetSession() - break retry - } - if code := status.Convert(err).Code(); code == codes.NotFound || code == codes.PermissionDenied { - select { - case <-CTX.Done(): - t.Fatal(CTX.Err(), err) - case <-time.After(time.Second): - t.Log("retrying GetSession") - continue - } - } - require.NoError(t, err) - } + resp, err := Client.GetSession(CTX, &session.GetSessionRequest{ + SessionId: id, + SessionToken: &token, + }) + require.NoError(t, err) + s := resp.GetSession() assert.Equal(t, id, s.GetId()) assert.WithinRange(t, s.GetCreationDate().AsTime(), time.Now().Add(-window), time.Now().Add(window)) diff --git a/internal/eventstore/handler/handler_projection.go b/internal/eventstore/handler/handler_projection.go index c8c3096905..baab2d3a61 100644 --- a/internal/eventstore/handler/handler_projection.go +++ b/internal/eventstore/handler/handler_projection.go @@ -6,9 +6,11 @@ import ( "runtime/debug" "time" + "github.com/sirupsen/logrus" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/api/call" "github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/repository/pseudo" ) @@ -110,27 +112,51 @@ func NewProjectionHandler( return h } -// Trigger handles all events for the provided instances (or current instance from context if non specified) -// by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit -func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) error { - ids := []string{authz.GetInstance(ctx).InstanceID()} - if len(instances) > 0 { - ids = instances +func triggerInstances(ctx context.Context, instances []string) []string { + if len(instances) == 0 { + instances = append(instances, authz.GetInstance(ctx).InstanceID()) } + return instances +} + +// Trigger handles all events for the provided instances (or current instance from context if non specified) +// by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit. +// If a bulk action was executed, the call timestamp in context will be reset for subsequent queries. +// The returned context is never nil. It is either the original context or an updated context. +// +// If Trigger encounters an error, it is only logged. If the error is important for the caller, +// use TriggerErr instead. +func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) context.Context { + instances = triggerInstances(ctx, instances) + ctx, err := h.TriggerErr(ctx, instances...) + logging.OnError(err).WithFields(logrus.Fields{ + "projection": h.ProjectionName, + "instanceIDs": instances, + }).Error("trigger failed") + return ctx +} + +// TriggerErr handles all events for the provided instances (or current instance from context if non specified) +// by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit. +// If a bulk action was executed, the call timestamp in context will be reset for subsequent queries. +// The returned context is never nil. It is either the original context or an updated context. +func (h *ProjectionHandler) TriggerErr(ctx context.Context, instances ...string) (context.Context, error) { + instances = triggerInstances(ctx, instances) for { - events, hasLimitExceeded, err := h.FetchEvents(ctx, ids...) + events, hasLimitExceeded, err := h.FetchEvents(ctx, instances...) if err != nil { - return err + return ctx, err } if len(events) == 0 { - return nil + return ctx, nil } _, err = h.Process(ctx, events...) + ctx = call.ResetTimestamp(ctx) if err != nil { - return err + return ctx, err } if !hasLimitExceeded { - return nil + return ctx, nil } } } @@ -274,7 +300,7 @@ func (h *ProjectionHandler) schedule(ctx context.Context) { continue } go h.cancelOnErr(lockInstanceCtx, errs, cancelInstanceLock) - err = h.Trigger(lockInstanceCtx, instances...) + _, err = h.TriggerErr(lockInstanceCtx, instances...) if err != nil { logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed") failed = true diff --git a/internal/eventstore/handler/handler_projection_test.go b/internal/eventstore/handler/handler_projection_test.go index 349799e46a..3ee791a2d4 100644 --- a/internal/eventstore/handler/handler_projection_test.go +++ b/internal/eventstore/handler/handler_projection_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/zitadel/zitadel/internal/api/authz" + "github.com/zitadel/zitadel/internal/api/call" "github.com/zitadel/zitadel/internal/api/service" "github.com/zitadel/zitadel/internal/database" "github.com/zitadel/zitadel/internal/eventstore" @@ -29,6 +30,47 @@ var ( ) func TestProjectionHandler_Trigger(t *testing.T) { + const pause = time.Millisecond + + startCtx := call.WithTimestamp(context.Background()) + start := call.FromContext(startCtx) + + h := &ProjectionHandler{ + Handler: Handler{ + Eventstore: eventstore.NewEventstore(eventstore.TestConfig( + es_repo_mock.NewRepo(t).ExpectFilterEvents( + &repository.Event{ + ID: "id", + Sequence: 1, + PreviousAggregateSequence: 0, + CreationDate: time.Now(), + Type: "test.added", + Version: "v1", + AggregateID: "testid", + AggregateType: "testAgg", + }, + ), + )), + }, + ProjectionName: "test", + reduce: testReduce(newTestStatement("testAgg", 1, 0)), + update: testUpdate(t, 1, 0, nil), + searchQuery: testQuery( + eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + AddQuery(). + AggregateTypes("test"). + Builder(), + 5, nil, + ), + } + + time.Sleep(pause) + endCtx := h.Trigger(startCtx) + // check if the new context has a call timestamp that's later than start+pause. + assert.WithinRange(t, call.FromContext(endCtx), start.Add(pause), start.Add(pause+time.Second)) +} + +func TestProjectionHandler_TriggerErr(t *testing.T) { type fields struct { reduce Reduce update Update @@ -223,7 +265,8 @@ func TestProjectionHandler_Trigger(t *testing.T) { searchQuery: tt.fields.query, } - err := h.Trigger(tt.args.ctx, tt.args.instances...) + // context timestamp is checked in [TestProjectionHandler_Trigger] + _, err := h.TriggerErr(tt.args.ctx, tt.args.instances...) if !tt.want.isErr(err) { t.Errorf("unexpected error %v", err) } diff --git a/internal/query/app.go b/internal/query/app.go index 6b40f712e3..577f6c1814 100644 --- a/internal/query/app.go +++ b/internal/query/app.go @@ -253,7 +253,7 @@ func (q *Queries) AppByProjectAndAppID(ctx context.Context, shouldTriggerBulk bo defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.AppProjection.Trigger(ctx) + ctx = projection.AppProjection.Trigger(ctx) } stmt, scan := prepareAppQuery(ctx, q.client) diff --git a/internal/query/authn_key.go b/internal/query/authn_key.go index 3d9341c947..87140b2165 100644 --- a/internal/query/authn_key.go +++ b/internal/query/authn_key.go @@ -191,7 +191,7 @@ func (q *Queries) GetAuthNKeyByID(ctx context.Context, shouldTriggerBulk bool, i defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.AuthNKeyProjection.Trigger(ctx) + ctx = projection.AuthNKeyProjection.Trigger(ctx) } query, scan := prepareAuthNKeyQuery(ctx, q.client) diff --git a/internal/query/domain_policy.go b/internal/query/domain_policy.go index f9f5f94b75..2f7a596cd1 100644 --- a/internal/query/domain_policy.go +++ b/internal/query/domain_policy.go @@ -91,7 +91,7 @@ func (q *Queries) DomainPolicyByOrg(ctx context.Context, shouldTriggerBulk bool, defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.DomainPolicyProjection.Trigger(ctx) + ctx = projection.DomainPolicyProjection.Trigger(ctx) } eq := sq.And{ sq.Eq{DomainPolicyColInstanceID.identifier(): authz.GetInstance(ctx).InstanceID()}, diff --git a/internal/query/idp.go b/internal/query/idp.go index e483755c81..35c0b7b5d7 100644 --- a/internal/query/idp.go +++ b/internal/query/idp.go @@ -193,7 +193,7 @@ func (q *Queries) IDPByIDAndResourceOwner(ctx context.Context, shouldTriggerBulk defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.IDPProjection.Trigger(ctx) + ctx = projection.IDPProjection.Trigger(ctx) } eq := sq.Eq{ diff --git a/internal/query/idp_template.go b/internal/query/idp_template.go index 2639691a74..b83efe156e 100644 --- a/internal/query/idp_template.go +++ b/internal/query/idp_template.go @@ -7,7 +7,6 @@ import ( "time" sq "github.com/Masterminds/squirrel" - "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/call" @@ -612,8 +611,7 @@ func (q *Queries) IDPTemplateByID(ctx context.Context, shouldTriggerBulk bool, i defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - err := projection.IDPTemplateProjection.Trigger(ctx) - logging.OnError(err).WithField("projection", idpTemplateTable.identifier()).Warn("could not trigger projection for query") + ctx = projection.IDPTemplateProjection.Trigger(ctx) } eq := sq.Eq{ diff --git a/internal/query/instance.go b/internal/query/instance.go index 2b4cfb1850..9040478af4 100644 --- a/internal/query/instance.go +++ b/internal/query/instance.go @@ -182,7 +182,7 @@ func (q *Queries) Instance(ctx context.Context, shouldTriggerBulk bool) (_ *Inst defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.InstanceProjection.Trigger(ctx) + ctx = projection.InstanceProjection.Trigger(ctx) } stmt, scan := prepareInstanceDomainQuery(ctx, q.client, authz.GetInstance(ctx).RequestedDomain()) diff --git a/internal/query/lockout_policy.go b/internal/query/lockout_policy.go index 84b0e27eed..ff9dfc663a 100644 --- a/internal/query/lockout_policy.go +++ b/internal/query/lockout_policy.go @@ -86,7 +86,7 @@ func (q *Queries) LockoutPolicyByOrg(ctx context.Context, shouldTriggerBulk bool defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.LockoutPolicyProjection.Trigger(ctx) + ctx = projection.LockoutPolicyProjection.Trigger(ctx) } eq := sq.Eq{ LockoutColInstanceID.identifier(): authz.GetInstance(ctx).InstanceID(), diff --git a/internal/query/login_policy.go b/internal/query/login_policy.go index d575a26d33..4862aa765b 100644 --- a/internal/query/login_policy.go +++ b/internal/query/login_policy.go @@ -166,7 +166,7 @@ func (q *Queries) LoginPolicyByID(ctx context.Context, shouldTriggerBulk bool, o defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.LoginPolicyProjection.Trigger(ctx) + ctx = projection.LoginPolicyProjection.Trigger(ctx) } eq := sq.Eq{LoginPolicyColumnInstanceID.identifier(): authz.GetInstance(ctx).InstanceID()} if !withOwnerRemoved { diff --git a/internal/query/notification_policy.go b/internal/query/notification_policy.go index e15e7c84f7..64873e1ca9 100644 --- a/internal/query/notification_policy.go +++ b/internal/query/notification_policy.go @@ -81,7 +81,8 @@ func (q *Queries) NotificationPolicyByOrg(ctx context.Context, shouldTriggerBulk defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - if err := projection.NotificationPolicyProjection.Trigger(ctx); err != nil { + ctx, err = projection.NotificationPolicyProjection.TriggerErr(ctx) + if err != nil { return nil, err } } @@ -112,7 +113,8 @@ func (q *Queries) DefaultNotificationPolicy(ctx context.Context, shouldTriggerBu defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - if err := projection.NotificationPolicyProjection.Trigger(ctx); err != nil { + ctx, err = projection.NotificationPolicyProjection.TriggerErr(ctx) + if err != nil { return nil, err } } diff --git a/internal/query/org.go b/internal/query/org.go index d5c90f08f8..40bcbe4005 100644 --- a/internal/query/org.go +++ b/internal/query/org.go @@ -94,7 +94,7 @@ func (q *Queries) OrgByID(ctx context.Context, shouldTriggerBulk bool, id string defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.OrgProjection.Trigger(ctx) + ctx = projection.OrgProjection.Trigger(ctx) } stmt, scan := prepareOrgQuery(ctx, q.client) diff --git a/internal/query/org_metadata.go b/internal/query/org_metadata.go index c22bd6a71b..5ce36369ed 100644 --- a/internal/query/org_metadata.go +++ b/internal/query/org_metadata.go @@ -82,7 +82,7 @@ func (q *Queries) GetOrgMetadataByKey(ctx context.Context, shouldTriggerBulk boo defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.OrgMetadataProjection.Trigger(ctx) + ctx = projection.OrgMetadataProjection.Trigger(ctx) } query, scan := prepareOrgMetadataQuery(ctx, q.client) @@ -111,7 +111,7 @@ func (q *Queries) SearchOrgMetadata(ctx context.Context, shouldTriggerBulk bool, defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.OrgMetadataProjection.Trigger(ctx) + ctx = projection.OrgMetadataProjection.Trigger(ctx) } eq := sq.Eq{ OrgMetadataOrgIDCol.identifier(): orgID, diff --git a/internal/query/password_age_policy.go b/internal/query/password_age_policy.go index 2de71235af..fda59e88e0 100644 --- a/internal/query/password_age_policy.go +++ b/internal/query/password_age_policy.go @@ -86,7 +86,7 @@ func (q *Queries) PasswordAgePolicyByOrg(ctx context.Context, shouldTriggerBulk defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PasswordAgeProjection.Trigger(ctx) + ctx = projection.PasswordAgeProjection.Trigger(ctx) } eq := sq.Eq{PasswordAgeColInstanceID.identifier(): authz.GetInstance(ctx).InstanceID()} if !withOwnerRemoved { @@ -116,7 +116,7 @@ func (q *Queries) DefaultPasswordAgePolicy(ctx context.Context, shouldTriggerBul defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PasswordAgeProjection.Trigger(ctx) + ctx = projection.PasswordAgeProjection.Trigger(ctx) } stmt, scan := preparePasswordAgePolicyQuery(ctx, q.client) diff --git a/internal/query/password_complexity_policy.go b/internal/query/password_complexity_policy.go index 4877f0159c..e5f15f1810 100644 --- a/internal/query/password_complexity_policy.go +++ b/internal/query/password_complexity_policy.go @@ -38,7 +38,7 @@ func (q *Queries) PasswordComplexityPolicyByOrg(ctx context.Context, shouldTrigg defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PasswordComplexityProjection.Trigger(ctx) + ctx = projection.PasswordComplexityProjection.Trigger(ctx) } eq := sq.Eq{PasswordComplexityColInstanceID.identifier(): authz.GetInstance(ctx).InstanceID()} if !withOwnerRemoved { @@ -68,7 +68,7 @@ func (q *Queries) DefaultPasswordComplexityPolicy(ctx context.Context, shouldTri defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PasswordComplexityProjection.Trigger(ctx) + ctx = projection.PasswordComplexityProjection.Trigger(ctx) } stmt, scan := preparePasswordComplexityPolicyQuery(ctx, q.client) diff --git a/internal/query/privacy_policy.go b/internal/query/privacy_policy.go index 2874c6d22a..e207c3e9b2 100644 --- a/internal/query/privacy_policy.go +++ b/internal/query/privacy_policy.go @@ -96,7 +96,7 @@ func (q *Queries) PrivacyPolicyByOrg(ctx context.Context, shouldTriggerBulk bool defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PrivacyPolicyProjection.Trigger(ctx) + ctx = projection.PrivacyPolicyProjection.Trigger(ctx) } eq := sq.Eq{PrivacyColInstanceID.identifier(): authz.GetInstance(ctx).InstanceID()} if !withOwnerRemoved { @@ -125,7 +125,7 @@ func (q *Queries) DefaultPrivacyPolicy(ctx context.Context, shouldTriggerBulk bo defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PrivacyPolicyProjection.Trigger(ctx) + ctx = projection.PrivacyPolicyProjection.Trigger(ctx) } stmt, scan := preparePrivacyPolicyQuery(ctx, q.client) diff --git a/internal/query/project.go b/internal/query/project.go index e7df58cf54..065a3e5c5b 100644 --- a/internal/query/project.go +++ b/internal/query/project.go @@ -105,7 +105,7 @@ func (q *Queries) ProjectByID(ctx context.Context, shouldTriggerBulk bool, id st defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.ProjectProjection.Trigger(ctx) + ctx = projection.ProjectProjection.Trigger(ctx) } stmt, scan := prepareProjectQuery(ctx, q.client) diff --git a/internal/query/project_grant.go b/internal/query/project_grant.go index bbf2657360..fee8ae8369 100644 --- a/internal/query/project_grant.go +++ b/internal/query/project_grant.go @@ -116,7 +116,7 @@ func (q *Queries) ProjectGrantByID(ctx context.Context, shouldTriggerBulk bool, defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.ProjectGrantProjection.Trigger(ctx) + ctx = projection.ProjectGrantProjection.Trigger(ctx) } stmt, scan := prepareProjectGrantQuery(ctx, q.client) diff --git a/internal/query/project_role.go b/internal/query/project_role.go index 03ebbf60bd..7365b12471 100644 --- a/internal/query/project_role.go +++ b/internal/query/project_role.go @@ -88,7 +88,7 @@ func (q *Queries) SearchProjectRoles(ctx context.Context, shouldTriggerBulk bool defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.ProjectRoleProjection.Trigger(ctx) + ctx = projection.ProjectRoleProjection.Trigger(ctx) } eq := sq.Eq{ProjectRoleColumnInstanceID.identifier(): authz.GetInstance(ctx).InstanceID()} diff --git a/internal/query/session.go b/internal/query/session.go index e9706410ac..76ccefba60 100644 --- a/internal/query/session.go +++ b/internal/query/session.go @@ -141,10 +141,14 @@ var ( } ) -func (q *Queries) SessionByID(ctx context.Context, id, sessionToken string) (_ *Session, err error) { +func (q *Queries) SessionByID(ctx context.Context, shouldTriggerBulk bool, id, sessionToken string) (_ *Session, err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() + if shouldTriggerBulk { + ctx = projection.SessionProjection.Trigger(ctx) + } + query, scan := prepareSessionQuery(ctx, q.client) stmt, args, err := query.Where( sq.Eq{ diff --git a/internal/query/user.go b/internal/query/user.go index 1550a4678b..46a683e677 100644 --- a/internal/query/user.go +++ b/internal/query/user.go @@ -338,8 +338,8 @@ func (q *Queries) GetUserByID(ctx context.Context, shouldTriggerBulk bool, userI defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.UserProjection.Trigger(ctx) - projection.LoginNameProjection.Trigger(ctx) + ctx = projection.UserProjection.Trigger(ctx) + ctx = projection.LoginNameProjection.Trigger(ctx) } query, scan := prepareUserQuery(ctx, q.client) @@ -367,8 +367,8 @@ func (q *Queries) GetUser(ctx context.Context, shouldTriggerBulk bool, withOwner defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.UserProjection.Trigger(ctx) - projection.LoginNameProjection.Trigger(ctx) + ctx = projection.UserProjection.Trigger(ctx) + ctx = projection.LoginNameProjection.Trigger(ctx) } query, scan := prepareUserQuery(ctx, q.client) @@ -467,8 +467,8 @@ func (q *Queries) GetNotifyUserByID(ctx context.Context, shouldTriggered bool, u defer func() { span.EndWithError(err) }() if shouldTriggered { - projection.UserProjection.Trigger(ctx) - projection.LoginNameProjection.Trigger(ctx) + ctx = projection.UserProjection.Trigger(ctx) + ctx = projection.LoginNameProjection.Trigger(ctx) } query, scan := prepareNotifyUserQuery(ctx, q.client) @@ -496,8 +496,8 @@ func (q *Queries) GetNotifyUser(ctx context.Context, shouldTriggered bool, withO defer func() { span.EndWithError(err) }() if shouldTriggered { - projection.UserProjection.Trigger(ctx) - projection.LoginNameProjection.Trigger(ctx) + ctx = projection.UserProjection.Trigger(ctx) + ctx = projection.LoginNameProjection.Trigger(ctx) } query, scan := prepareNotifyUserQuery(ctx, q.client) diff --git a/internal/query/user_grant.go b/internal/query/user_grant.go index ceb8652e4d..6d0eaa95bc 100644 --- a/internal/query/user_grant.go +++ b/internal/query/user_grant.go @@ -8,8 +8,6 @@ import ( sq "github.com/Masterminds/squirrel" - "github.com/zitadel/logging" - "github.com/zitadel/zitadel/internal/api/authz" "github.com/zitadel/zitadel/internal/api/call" "github.com/zitadel/zitadel/internal/database" @@ -235,7 +233,7 @@ func (q *Queries) UserGrant(ctx context.Context, shouldTriggerBulk bool, withOwn defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.UserGrantProjection.Trigger(ctx) + ctx = projection.UserGrantProjection.Trigger(ctx) } query, scan := prepareUserGrantQuery(ctx, q.client) @@ -260,9 +258,7 @@ func (q *Queries) UserGrants(ctx context.Context, queries *UserGrantsQueries, sh defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - logging.OnError( - projection.UserGrantProjection.Trigger(ctx), - ).Debug("unable to trigger") + ctx = projection.UserGrantProjection.Trigger(ctx) } query, scan := prepareUserGrantsQuery(ctx, q.client) diff --git a/internal/query/user_metadata.go b/internal/query/user_metadata.go index b7c4c27360..d0a9202b8b 100644 --- a/internal/query/user_metadata.go +++ b/internal/query/user_metadata.go @@ -82,7 +82,7 @@ func (q *Queries) GetUserMetadataByKey(ctx context.Context, shouldTriggerBulk bo defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.UserMetadataProjection.Trigger(ctx) + ctx = projection.UserMetadataProjection.Trigger(ctx) } query, scan := prepareUserMetadataQuery(ctx, q.client) @@ -111,7 +111,7 @@ func (q *Queries) SearchUserMetadata(ctx context.Context, shouldTriggerBulk bool defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.UserMetadataProjection.Trigger(ctx) + ctx = projection.UserMetadataProjection.Trigger(ctx) } query, scan := prepareUserMetadataListQuery(ctx, q.client) diff --git a/internal/query/user_personal_access_token.go b/internal/query/user_personal_access_token.go index 4afab7ec89..5558eed3c2 100644 --- a/internal/query/user_personal_access_token.go +++ b/internal/query/user_personal_access_token.go @@ -90,7 +90,7 @@ func (q *Queries) PersonalAccessTokenByID(ctx context.Context, shouldTriggerBulk defer func() { span.EndWithError(err) }() if shouldTriggerBulk { - projection.PersonalAccessTokenProjection.Trigger(ctx) + ctx = projection.PersonalAccessTokenProjection.Trigger(ctx) } query, scan := preparePersonalAccessTokenQuery(ctx, q.client)