diff --git a/internal/eventstore/read_model.go b/internal/eventstore/read_model.go index 10c84358e5..d2c755cc3a 100644 --- a/internal/eventstore/read_model.go +++ b/internal/eventstore/read_model.go @@ -13,6 +13,7 @@ type ReadModel struct { Events []Event `json:"-"` ResourceOwner string `json:"-"` InstanceID string `json:"-"` + Position float64 `json:"-"` } // AppendEvents adds all the events to the read model. @@ -43,6 +44,7 @@ func (rm *ReadModel) Reduce() error { } rm.ChangeDate = rm.Events[len(rm.Events)-1].CreatedAt() rm.ProcessedSequence = rm.Events[len(rm.Events)-1].Sequence() + rm.Position = rm.Events[len(rm.Events)-1].Position() // all events processed and not needed anymore rm.Events = rm.Events[0:0] return nil diff --git a/internal/query/access_token.go b/internal/query/access_token.go index 40d9b1700a..a777a6afc7 100644 --- a/internal/query/access_token.go +++ b/internal/query/access_token.go @@ -17,7 +17,7 @@ import ( ) type OIDCSessionAccessTokenReadModel struct { - eventstore.WriteModel + eventstore.ReadModel UserID string SessionID string @@ -39,7 +39,7 @@ type OIDCSessionAccessTokenReadModel struct { func newOIDCSessionAccessTokenReadModel(id string) *OIDCSessionAccessTokenReadModel { return &OIDCSessionAccessTokenReadModel{ - WriteModel: eventstore.WriteModel{ + ReadModel: eventstore.ReadModel{ AggregateID: id, }, } @@ -57,13 +57,11 @@ func (wm *OIDCSessionAccessTokenReadModel) Reduce() error { wm.reduceTokenRevoked(event) } } - return wm.WriteModel.Reduce() + return wm.ReadModel.Reduce() } func (wm *OIDCSessionAccessTokenReadModel) Query() *eventstore.SearchQueryBuilder { return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - AwaitOpenTransactions(). - AllowTimeTravel(). AddQuery(). AggregateTypes(oidcsession.AggregateType). AggregateIDs(wm.AggregateID). @@ -120,7 +118,7 @@ func (q *Queries) ActiveAccessTokenByToken(ctx context.Context, token string) (m if !model.AccessTokenExpiration.After(time.Now()) { return nil, zerrors.ThrowPermissionDenied(nil, "QUERY-SAF3rf", "Errors.OIDCSession.Token.Expired") } - if err = q.checkSessionNotTerminatedAfter(ctx, model.SessionID, model.UserID, model.AccessTokenCreation, model.UserAgent.GetFingerprintID()); err != nil { + if err = q.checkSessionNotTerminatedAfter(ctx, model.SessionID, model.UserID, model.Position, model.UserAgent.GetFingerprintID()); err != nil { return nil, err } return model, nil @@ -142,13 +140,13 @@ func (q *Queries) accessTokenByOIDCSessionAndTokenID(ctx context.Context, oidcSe // checkSessionNotTerminatedAfter checks if a [session.TerminateType] event (or user events leading to a session termination) // occurred after a certain time and will return an error if so. -func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, creation time.Time, fingerprintID string) (err error) { +func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, userID string, position float64, fingerprintID string) (err error) { ctx, span := tracing.NewSpan(ctx) defer func() { span.EndWithError(err) }() model := &sessionTerminatedModel{ sessionID: sessionID, - creation: creation, + position: position, userID: userID, fingerPrintID: fingerprintID, } @@ -164,7 +162,7 @@ func (q *Queries) checkSessionNotTerminatedAfter(ctx context.Context, sessionID, } type sessionTerminatedModel struct { - creation time.Time + position float64 sessionID string userID string fingerPrintID string @@ -184,8 +182,7 @@ func (s *sessionTerminatedModel) AppendEvents(events ...eventstore.Event) { func (s *sessionTerminatedModel) Query() *eventstore.SearchQueryBuilder { query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). - AwaitOpenTransactions(). - CreationDateAfter(s.creation). + PositionAfter(s.position). AddQuery(). AggregateTypes(session.AggregateType). AggregateIDs(s.sessionID).