mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-10 17:23:39 +00:00
dab5d9e756
# Which Problems Are Solved If many events are written to the same aggregate id it can happen that zitadel [starts to retry the push transaction](48ffc902cc/internal/eventstore/eventstore.go (L101)
) because [the locking behaviour](48ffc902cc/internal/eventstore/v3/sequence.go (L25)
) during push does compute the wrong sequence because newly committed events are not visible to the transaction. These events impact the current sequence. In cases with high command traffic on a single aggregate id this can have severe impact on general performance of zitadel. Because many connections of the `eventstore pusher` database pool are blocked by each other. # How the Problems Are Solved To improve the performance this locking mechanism was removed and the business logic of push is moved to sql functions which reduce network traffic and can be analyzed by the database before the actual push. For clients of the eventstore framework nothing changed. # Additional Changes - after a connection is established prefetches the newly added database types - `eventstore.BaseEvent` now returns the correct revision of the event # Additional Context - part of https://github.com/zitadel/zitadel/issues/8931 --------- Co-authored-by: Tim Möhlmann <tim+github@zitadel.com> Co-authored-by: Livio Spring <livio.a@gmail.com> Co-authored-by: Max Peintner <max@caos.ch> Co-authored-by: Elio Bischof <elio@zitadel.com> Co-authored-by: Stefan Benz <46600784+stebenz@users.noreply.github.com> Co-authored-by: Miguel Cabrerizo <30386061+doncicuto@users.noreply.github.com> Co-authored-by: Joakim Lodén <Loddan@users.noreply.github.com> Co-authored-by: Yxnt <Yxnt@users.noreply.github.com> Co-authored-by: Stefan Benz <stefan@caos.ch> Co-authored-by: Harsha Reddy <harsha.reddy@klaviyo.com> Co-authored-by: Zach H <zhirschtritt@gmail.com>
136 lines
3.1 KiB
Go
136 lines
3.1 KiB
Go
package eventstore
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/zitadel/logging"
|
|
|
|
"github.com/zitadel/zitadel/internal/api/authz"
|
|
"github.com/zitadel/zitadel/internal/api/service"
|
|
)
|
|
|
|
var (
|
|
_ Event = (*BaseEvent)(nil)
|
|
)
|
|
|
|
// BaseEvent represents the minimum metadata of an event
|
|
type BaseEvent struct {
|
|
ID string
|
|
EventType EventType `json:"-"`
|
|
|
|
Agg *Aggregate
|
|
|
|
Seq uint64
|
|
Pos float64
|
|
Creation time.Time
|
|
previousAggregateSequence uint64
|
|
previousAggregateTypeSequence uint64
|
|
|
|
//User who created the event
|
|
User string `json:"-"`
|
|
//Service which created the event
|
|
Service string `json:"-"`
|
|
Data []byte `json:"-"`
|
|
}
|
|
|
|
// Position implements Event.
|
|
func (e *BaseEvent) Position() float64 {
|
|
return e.Pos
|
|
}
|
|
|
|
// EditorService implements Command
|
|
func (e *BaseEvent) EditorService() string {
|
|
return e.Service
|
|
}
|
|
|
|
// EditorUser implements Command
|
|
func (e *BaseEvent) EditorUser() string {
|
|
return e.User
|
|
}
|
|
|
|
// Creator implements action
|
|
func (e *BaseEvent) Creator() string {
|
|
return e.EditorUser()
|
|
}
|
|
|
|
// Type implements action
|
|
func (e *BaseEvent) Type() EventType {
|
|
return e.EventType
|
|
}
|
|
|
|
// Sequence is an upcounting unique number of the event
|
|
func (e *BaseEvent) Sequence() uint64 {
|
|
return e.Seq
|
|
}
|
|
|
|
// CreationDate is the the time, the event is inserted into the eventstore
|
|
func (e *BaseEvent) CreationDate() time.Time {
|
|
return e.Creation
|
|
}
|
|
|
|
// CreatedAt implements Event
|
|
func (e *BaseEvent) CreatedAt() time.Time {
|
|
return e.CreationDate()
|
|
}
|
|
|
|
// Aggregate implements action
|
|
func (e *BaseEvent) Aggregate() *Aggregate {
|
|
return e.Agg
|
|
}
|
|
|
|
// Data returns the payload of the event. It represent the changed fields by the event
|
|
func (e *BaseEvent) DataAsBytes() []byte {
|
|
return e.Data
|
|
}
|
|
|
|
// Revision implements action
|
|
func (e *BaseEvent) Revision() uint16 {
|
|
revision, err := strconv.ParseUint(strings.TrimPrefix(string(e.Agg.Version), "v"), 10, 16)
|
|
logging.OnError(err).Debug("failed to parse event revision")
|
|
return uint16(revision)
|
|
}
|
|
|
|
// Unmarshal implements Event
|
|
func (e *BaseEvent) Unmarshal(ptr any) error {
|
|
if len(e.Data) == 0 {
|
|
return nil
|
|
}
|
|
return json.Unmarshal(e.Data, ptr)
|
|
}
|
|
|
|
const defaultService = "zitadel"
|
|
|
|
// BaseEventFromRepo maps a stored event to a BaseEvent
|
|
func BaseEventFromRepo(event Event) *BaseEvent {
|
|
return &BaseEvent{
|
|
Agg: event.Aggregate(),
|
|
EventType: event.Type(),
|
|
Creation: event.CreatedAt(),
|
|
Seq: event.Sequence(),
|
|
Service: defaultService,
|
|
User: event.Creator(),
|
|
Data: event.DataAsBytes(),
|
|
Pos: event.Position(),
|
|
}
|
|
}
|
|
|
|
// NewBaseEventForPush is the constructor for event's which will be pushed into the eventstore
|
|
// the resource owner of the aggregate is only used if it's the first event of this aggregate type
|
|
// afterwards the resource owner of the first previous events is taken
|
|
func NewBaseEventForPush(ctx context.Context, aggregate *Aggregate, typ EventType) *BaseEvent {
|
|
return &BaseEvent{
|
|
Agg: aggregate,
|
|
User: authz.GetCtxData(ctx).UserID,
|
|
Service: service.FromContext(ctx),
|
|
EventType: typ,
|
|
}
|
|
}
|
|
|
|
func (*BaseEvent) Fields() []*FieldOperation {
|
|
return nil
|
|
}
|