mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 01:57:32 +00:00
fix: reset the call timestamp after a bulk trigger (#6080)
* reproduce #5808 Add an integration test that imports and gets N amount of human users. - With N set to 1-10 the operation seems to succeed always - With N set to 100 the operation seems to fail between 1 and 7 times. * fix merge issue * fix: reset the call timestamp after a bulk trigger With the use of `AS OF SYSTEM TIME` in queries, there was a change for the query package not finding the latest projection verson after a bulk trigger. If events where processed in the bulk trigger, the resulting row timestamp would be after the call start timestamp. This sometimes resulted in consistency issues when Set and Get API methods are called in short succession. For example a Import and Get user could sometimes result in a Not Found error. Although the issue was reported for the Management API user import, it is likely this bug contributed to the flaky integration and e2e tests. Fixes #5808 * trigger bulk action in GetSession * don't use the new context in handler schedule * disable reproduction test --------- Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
@@ -6,9 +6,11 @@ import (
|
||||
"runtime/debug"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/repository/pseudo"
|
||||
)
|
||||
@@ -110,27 +112,51 @@ func NewProjectionHandler(
|
||||
return h
|
||||
}
|
||||
|
||||
// Trigger handles all events for the provided instances (or current instance from context if non specified)
|
||||
// by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit
|
||||
func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) error {
|
||||
ids := []string{authz.GetInstance(ctx).InstanceID()}
|
||||
if len(instances) > 0 {
|
||||
ids = instances
|
||||
func triggerInstances(ctx context.Context, instances []string) []string {
|
||||
if len(instances) == 0 {
|
||||
instances = append(instances, authz.GetInstance(ctx).InstanceID())
|
||||
}
|
||||
return instances
|
||||
}
|
||||
|
||||
// Trigger handles all events for the provided instances (or current instance from context if non specified)
|
||||
// by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit.
|
||||
// If a bulk action was executed, the call timestamp in context will be reset for subsequent queries.
|
||||
// The returned context is never nil. It is either the original context or an updated context.
|
||||
//
|
||||
// If Trigger encounters an error, it is only logged. If the error is important for the caller,
|
||||
// use TriggerErr instead.
|
||||
func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) context.Context {
|
||||
instances = triggerInstances(ctx, instances)
|
||||
ctx, err := h.TriggerErr(ctx, instances...)
|
||||
logging.OnError(err).WithFields(logrus.Fields{
|
||||
"projection": h.ProjectionName,
|
||||
"instanceIDs": instances,
|
||||
}).Error("trigger failed")
|
||||
return ctx
|
||||
}
|
||||
|
||||
// TriggerErr handles all events for the provided instances (or current instance from context if non specified)
|
||||
// by calling FetchEvents and Process until the amount of events is smaller than the BulkLimit.
|
||||
// If a bulk action was executed, the call timestamp in context will be reset for subsequent queries.
|
||||
// The returned context is never nil. It is either the original context or an updated context.
|
||||
func (h *ProjectionHandler) TriggerErr(ctx context.Context, instances ...string) (context.Context, error) {
|
||||
instances = triggerInstances(ctx, instances)
|
||||
for {
|
||||
events, hasLimitExceeded, err := h.FetchEvents(ctx, ids...)
|
||||
events, hasLimitExceeded, err := h.FetchEvents(ctx, instances...)
|
||||
if err != nil {
|
||||
return err
|
||||
return ctx, err
|
||||
}
|
||||
if len(events) == 0 {
|
||||
return nil
|
||||
return ctx, nil
|
||||
}
|
||||
_, err = h.Process(ctx, events...)
|
||||
ctx = call.ResetTimestamp(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
return ctx, err
|
||||
}
|
||||
if !hasLimitExceeded {
|
||||
return nil
|
||||
return ctx, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -274,7 +300,7 @@ func (h *ProjectionHandler) schedule(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
go h.cancelOnErr(lockInstanceCtx, errs, cancelInstanceLock)
|
||||
err = h.Trigger(lockInstanceCtx, instances...)
|
||||
_, err = h.TriggerErr(lockInstanceCtx, instances...)
|
||||
if err != nil {
|
||||
logging.WithFields("projection", h.ProjectionName, "instanceIDs", instances).WithError(err).Error("trigger failed")
|
||||
failed = true
|
||||
|
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
"github.com/zitadel/zitadel/internal/api/service"
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
@@ -29,6 +30,47 @@ var (
|
||||
)
|
||||
|
||||
func TestProjectionHandler_Trigger(t *testing.T) {
|
||||
const pause = time.Millisecond
|
||||
|
||||
startCtx := call.WithTimestamp(context.Background())
|
||||
start := call.FromContext(startCtx)
|
||||
|
||||
h := &ProjectionHandler{
|
||||
Handler: Handler{
|
||||
Eventstore: eventstore.NewEventstore(eventstore.TestConfig(
|
||||
es_repo_mock.NewRepo(t).ExpectFilterEvents(
|
||||
&repository.Event{
|
||||
ID: "id",
|
||||
Sequence: 1,
|
||||
PreviousAggregateSequence: 0,
|
||||
CreationDate: time.Now(),
|
||||
Type: "test.added",
|
||||
Version: "v1",
|
||||
AggregateID: "testid",
|
||||
AggregateType: "testAgg",
|
||||
},
|
||||
),
|
||||
)),
|
||||
},
|
||||
ProjectionName: "test",
|
||||
reduce: testReduce(newTestStatement("testAgg", 1, 0)),
|
||||
update: testUpdate(t, 1, 0, nil),
|
||||
searchQuery: testQuery(
|
||||
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
AddQuery().
|
||||
AggregateTypes("test").
|
||||
Builder(),
|
||||
5, nil,
|
||||
),
|
||||
}
|
||||
|
||||
time.Sleep(pause)
|
||||
endCtx := h.Trigger(startCtx)
|
||||
// check if the new context has a call timestamp that's later than start+pause.
|
||||
assert.WithinRange(t, call.FromContext(endCtx), start.Add(pause), start.Add(pause+time.Second))
|
||||
}
|
||||
|
||||
func TestProjectionHandler_TriggerErr(t *testing.T) {
|
||||
type fields struct {
|
||||
reduce Reduce
|
||||
update Update
|
||||
@@ -223,7 +265,8 @@ func TestProjectionHandler_Trigger(t *testing.T) {
|
||||
searchQuery: tt.fields.query,
|
||||
}
|
||||
|
||||
err := h.Trigger(tt.args.ctx, tt.args.instances...)
|
||||
// context timestamp is checked in [TestProjectionHandler_Trigger]
|
||||
_, err := h.TriggerErr(tt.args.ctx, tt.args.instances...)
|
||||
if !tt.want.isErr(err) {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user