mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:37:32 +00:00
perf: improve scalability of session api (#9635)
This pull request improves the scalability of the session API by enhancing middleware tracing and refining SQL query behavior for user authentication methods. # Which Problems Are Solved - Eventstore subscriptions locked each other during they wrote the events to the event channels of the subscribers in push. - `ListUserAuthMethodTypesRequired` query used `Bitmap heap scan` to join the tables needed. - The auth and oidc package triggered projections often when data were read. - The session API triggered the user projection each time a user was searched to write the user check command. # How the Problems Are Solved - the `sync.Mutex` was replaced with `sync.RWMutex` to allow parallel read of the map - The query was refactored to use index scans only - if the data should already be up-to-date `shouldTriggerBulk` is set to false - as the user should already exist for some time the trigger was removed. # Additional Changes - refactoring of `tracing#Span.End` calls # Additional Context - part of https://github.com/zitadel/zitadel/issues/9239 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com>
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
@@ -8,7 +9,7 @@ import (
|
||||
|
||||
var (
|
||||
subscriptions = map[AggregateType][]*Subscription{}
|
||||
subsMutext sync.Mutex
|
||||
subsMutex sync.RWMutex
|
||||
)
|
||||
|
||||
type Subscription struct {
|
||||
@@ -27,8 +28,8 @@ func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Su
|
||||
types: types,
|
||||
}
|
||||
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
subsMutex.Lock()
|
||||
defer subsMutex.Unlock()
|
||||
|
||||
for _, aggregate := range aggregates {
|
||||
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
|
||||
@@ -45,8 +46,8 @@ func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventT
|
||||
types: types,
|
||||
}
|
||||
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
subsMutex.Lock()
|
||||
defer subsMutex.Unlock()
|
||||
|
||||
for aggregate := range types {
|
||||
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
|
||||
@@ -56,8 +57,8 @@ func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventT
|
||||
}
|
||||
|
||||
func (es *Eventstore) notify(events []Event) {
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
subsMutex.RLock()
|
||||
defer subsMutex.RUnlock()
|
||||
for _, event := range events {
|
||||
subs, ok := subscriptions[event.Aggregate().Type]
|
||||
if !ok {
|
||||
@@ -71,14 +72,11 @@ func (es *Eventstore) notify(events []Event) {
|
||||
continue
|
||||
}
|
||||
//subscription for certain events
|
||||
for _, eventType := range eventTypes {
|
||||
if event.Type() == eventType {
|
||||
select {
|
||||
case sub.Events <- event:
|
||||
default:
|
||||
logging.Debug("unable to push event")
|
||||
}
|
||||
break
|
||||
if slices.Contains(eventTypes, event.Type()) {
|
||||
select {
|
||||
case sub.Events <- event:
|
||||
default:
|
||||
logging.Debug("unable to push event")
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -86,8 +84,8 @@ func (es *Eventstore) notify(events []Event) {
|
||||
}
|
||||
|
||||
func (s *Subscription) Unsubscribe() {
|
||||
subsMutext.Lock()
|
||||
defer subsMutext.Unlock()
|
||||
subsMutex.Lock()
|
||||
defer subsMutex.Unlock()
|
||||
for aggregate := range s.types {
|
||||
subs, ok := subscriptions[aggregate]
|
||||
if !ok {
|
||||
|
@@ -47,7 +47,7 @@ func (es *Eventstore) FillFields(ctx context.Context, events ...eventstore.FillF
|
||||
// Search implements the [eventstore.Search] method
|
||||
func (es *Eventstore) Search(ctx context.Context, conditions ...map[eventstore.FieldType]any) (result []*eventstore.SearchResult, err error) {
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer span.EndWithError(err)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
|
||||
var builder strings.Builder
|
||||
args := buildSearchStatement(ctx, &builder, conditions...)
|
||||
|
Reference in New Issue
Block a user