diff --git a/internal/eventstore/handler/v2/handler.go b/internal/eventstore/handler/v2/handler.go index cfa21b05ab..afe2778627 100644 --- a/internal/eventstore/handler/v2/handler.go +++ b/internal/eventstore/handler/v2/handler.go @@ -257,24 +257,36 @@ func (h *Handler) Trigger(ctx context.Context, opts ...triggerOpt) (_ context.Co // lockInstances tries to lock the instance. // If the instance is already locked from another process no cancel function is returned // the instance can be skipped then -// If the instance is locked, an unlock deferable function is returned +// If the instance is locked, an unlock deferrable function is returned func (h *Handler) lockInstance(ctx context.Context, config *triggerConfig) func() { instanceID := authz.GetInstance(ctx).InstanceID() - // Check that the instance has a mutex to lock - instanceMu, _ := h.triggeredInstancesSync.LoadOrStore(instanceID, new(sync.Mutex)) - unlock := func() { - instanceMu.(*sync.Mutex).Unlock() - } - if !instanceMu.(*sync.Mutex).TryLock() { - instanceMu.(*sync.Mutex).Lock() - if config.awaitRunning { - return unlock + // Check that the instance has a lock + instanceLock, _ := h.triggeredInstancesSync.LoadOrStore(instanceID, make(chan bool, 1)) + + // in case we don't want to wait for a running trigger / lock (e.g. spooler), + // we can directly return if we cannot lock + if !config.awaitRunning { + select { + case instanceLock.(chan bool) <- true: + return func() { + <-instanceLock.(chan bool) + } + default: + return nil } - defer unlock() + } + + // in case we want to wait for a running trigger / lock (e.g. query), + // we try to lock as long as the context is not cancelled + select { + case instanceLock.(chan bool) <- true: + return func() { + <-instanceLock.(chan bool) + } + case <-ctx.Done(): return nil } - return unlock } func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (additionalIteration bool, err error) {