remove pubsub in favor of trigger

This commit is contained in:
adlerhurst 2024-09-02 09:28:15 +02:00
parent 56c8d0a70f
commit e430687a28
8 changed files with 86 additions and 79 deletions

View File

@ -121,6 +121,7 @@ func CreateGatewayWithPrefix(
client_middleware.DefaultTracingClient(),
client_middleware.UnaryActivityClientInterceptor(),
),
// grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
}
connection, err := dial(ctx, port, opts)
if err != nil {
@ -148,6 +149,7 @@ func CreateGateway(
client_middleware.DefaultTracingClient(),
client_middleware.UnaryActivityClientInterceptor(),
),
// grpc.WithDefaultCallOptions(grpc.WaitForReady(true)),
})
if err != nil {
return nil, err

View File

@ -323,7 +323,7 @@ func (c *Commands) addOrgWithIDAndMember(ctx context.Context, name, userID, reso
return nil, err
}
events = append(events, orgMemberEvent)
pushedEvents, err := c.eventstore.Push(ctx, events...)
pushedEvents, err := c.eventstore.PushWithAwaitTriggers(ctx, events...)
if err != nil {
return nil, err
}

View File

@ -0,0 +1 @@
package eventstore

View File

@ -114,7 +114,43 @@ retry:
if err != nil {
return mappedEvents, err
}
es.notify(mappedEvents)
es.notify(context.WithoutCancel(ctx), mappedEvents)
return mappedEvents, nil
}
func (es *Eventstore) PushWithAwaitTriggers(ctx context.Context, cmds ...Command) ([]Event, error) {
if es.PushTimeout > 0 {
var cancel func()
ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
defer cancel()
}
var (
events []Event
err error
)
// Retry when there is a collision of the sequence as part of the primary key.
// "duplicate key value violates unique constraint \"events2_pkey\" (SQLSTATE 23505)"
// https://github.com/zitadel/zitadel/issues/7202
retry:
for i := 0; i <= es.maxRetries; i++ {
events, err = es.pusher.Push(ctx, cmds...)
var pgErr *pgconn.PgError
if !errors.As(err, &pgErr) || pgErr.ConstraintName != "events2_pkey" || pgErr.SQLState() != "23505" {
break retry
}
logging.WithError(err).Info("eventstore push retry")
}
if err != nil {
return nil, err
}
mappedEvents, err := es.mapEvents(events)
if err != nil {
return mappedEvents, err
}
<-es.notify(ctx, mappedEvents)
return mappedEvents, nil
}

View File

@ -1,9 +1,11 @@
package handler
import (
"context"
"database/sql"
)
type Executer interface {
Exec(string, ...interface{}) (sql.Result, error)
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
}

View File

@ -269,52 +269,18 @@ func randomizeStart(min, maxSeconds float64) time.Duration {
}
func (h *Handler) subscribe(ctx context.Context) {
queue := make(chan eventstore.Event, 100)
subscription := eventstore.SubscribeEventTypes(queue, h.eventTypes)
trigger := func(ctx context.Context, position decimal.Decimal) error {
h.log().Debug("triggered by subscription")
_, err := h.Trigger(ctx, WithMaxPosition(position))
return err
}
subscription := eventstore.SubscribeEventTypes(trigger, h.eventTypes)
for {
select {
case <-ctx.Done():
subscription.Unsubscribe()
h.log().Debug("shutdown")
return
case event := <-queue:
events := checkAdditionalEvents(queue, event)
solvedInstances := make([]string, 0, len(events))
queueCtx := call.WithTimestamp(ctx)
for _, e := range events {
if instanceSolved(solvedInstances, e.Aggregate().InstanceID) {
continue
}
queueCtx = authz.WithInstanceID(queueCtx, e.Aggregate().InstanceID)
_, err := h.Trigger(queueCtx)
h.log().OnError(err).Debug("trigger of queued event failed")
if err == nil {
solvedInstances = append(solvedInstances, e.Aggregate().InstanceID)
}
}
}
}
}
func instanceSolved(solvedInstances []string, instanceID string) bool {
for _, solvedInstance := range solvedInstances {
if solvedInstance == instanceID {
return true
}
}
return false
}
func checkAdditionalEvents(eventQueue chan eventstore.Event, event eventstore.Event) []eventstore.Event {
events := make([]eventstore.Event, 1)
events[0] = event
for {
wait := time.NewTimer(1 * time.Millisecond)
select {
case event := <-eventQueue:
events = append(events, event)
case <-wait.C:
return events
}
}
}
@ -466,6 +432,10 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
}
}()
if h.ProjectionName() == "projections.instance_domains" {
h.log().Debug("gugus")
}
txCtx := ctx
if h.txDuration > 0 {
var cancel, cancelTx func()

View File

@ -1,8 +1,11 @@
package eventstore
import (
"context"
"slices"
"sync"
"github.com/shopspring/decimal"
"github.com/zitadel/logging"
)
@ -12,37 +15,18 @@ var (
)
type Subscription struct {
Events chan Event
types map[AggregateType][]EventType
Trigger SubscriptionTrigger
types map[AggregateType][]EventType
}
// SubscribeAggregates subscribes for all events on the given aggregates
func SubscribeAggregates(eventQueue chan Event, aggregates ...AggregateType) *Subscription {
types := make(map[AggregateType][]EventType, len(aggregates))
for _, aggregate := range aggregates {
types[aggregate] = nil
}
sub := &Subscription{
Events: eventQueue,
types: types,
}
subsMutext.Lock()
defer subsMutext.Unlock()
for _, aggregate := range aggregates {
subscriptions[aggregate] = append(subscriptions[aggregate], sub)
}
return sub
}
type SubscriptionTrigger func(ctx context.Context, position decimal.Decimal) error
// SubscribeEventTypes subscribes for the given event types
// if no event types are provided the subscription is for all events of the aggregate
func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventType) *Subscription {
func SubscribeEventTypes(trigger SubscriptionTrigger, types map[AggregateType][]EventType) *Subscription {
sub := &Subscription{
Events: eventQueue,
types: types,
Trigger: trigger,
types: types,
}
subsMutext.Lock()
@ -55,9 +39,10 @@ func SubscribeEventTypes(eventQueue chan Event, types map[AggregateType][]EventT
return sub
}
func (es *Eventstore) notify(events []Event) {
func (es *Eventstore) notify(ctx context.Context, events []Event) <-chan bool {
subsMutext.Lock()
defer subsMutext.Unlock()
var toNotify []*Subscription
for _, event := range events {
subs, ok := subscriptions[event.Aggregate().Type]
if !ok {
@ -67,22 +52,37 @@ func (es *Eventstore) notify(events []Event) {
eventTypes := sub.types[event.Aggregate().Type]
//subscription for all events
if len(eventTypes) == 0 {
sub.Events <- event
toNotify = append(toNotify, sub)
continue
}
//subscription for certain events
for _, eventType := range eventTypes {
if event.Type() == eventType {
select {
case sub.Events <- event:
default:
logging.Debug("unable to push event")
}
toNotify = append(toNotify, sub)
break
}
}
}
}
var wg sync.WaitGroup
for _, sub := range slices.Compact(toNotify) {
wg.Add(1)
go func() {
defer wg.Done()
err := sub.Trigger(ctx, events[len(events)-1].Position())
if err != nil {
logging.WithError(err).Error("failed to trigger subscription")
}
}()
}
lock := make(chan bool, 1)
go func() {
wg.Wait()
lock <- true
close(lock)
}()
return lock
}
func (s *Subscription) Unsubscribe() {
@ -101,8 +101,4 @@ func (s *Subscription) Unsubscribe() {
}
}
}
_, ok := <-s.Events
if ok {
close(s.Events)
}
}

View File

@ -51,7 +51,7 @@ export function createHuman(username: string, org: Org, accessToken: string): Pr
.then((res) => {
check(res, {
'create user is status ok': (r) => r.status === 201,
}) || reject(`unable to create user(username: ${username}) status: ${res.status} body: ${res.body}`);
}) || reject(`unable to create user(username: ${username}) status: ${res.status} body: ${res.body} duration: ${res.timings.duration}`);
createHumanTrend.add(res.timings.duration);
const user = http.get(url(`/v2beta/users/${res.json('userId')!}`), {