mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-07 22:07:40 +00:00
fix: handle context when locking for trigger (#7006)
This commit is contained in:
parent
79130b238b
commit
e57076430b
@ -257,24 +257,36 @@ func (h *Handler) Trigger(ctx context.Context, opts ...triggerOpt) (_ context.Co
|
|||||||
// lockInstances tries to lock the instance.
|
// lockInstances tries to lock the instance.
|
||||||
// If the instance is already locked from another process no cancel function is returned
|
// If the instance is already locked from another process no cancel function is returned
|
||||||
// the instance can be skipped then
|
// the instance can be skipped then
|
||||||
// If the instance is locked, an unlock deferable function is returned
|
// If the instance is locked, an unlock deferrable function is returned
|
||||||
func (h *Handler) lockInstance(ctx context.Context, config *triggerConfig) func() {
|
func (h *Handler) lockInstance(ctx context.Context, config *triggerConfig) func() {
|
||||||
instanceID := authz.GetInstance(ctx).InstanceID()
|
instanceID := authz.GetInstance(ctx).InstanceID()
|
||||||
|
|
||||||
// Check that the instance has a mutex to lock
|
// Check that the instance has a lock
|
||||||
instanceMu, _ := h.triggeredInstancesSync.LoadOrStore(instanceID, new(sync.Mutex))
|
instanceLock, _ := h.triggeredInstancesSync.LoadOrStore(instanceID, make(chan bool, 1))
|
||||||
unlock := func() {
|
|
||||||
instanceMu.(*sync.Mutex).Unlock()
|
// in case we don't want to wait for a running trigger / lock (e.g. spooler),
|
||||||
}
|
// we can directly return if we cannot lock
|
||||||
if !instanceMu.(*sync.Mutex).TryLock() {
|
if !config.awaitRunning {
|
||||||
instanceMu.(*sync.Mutex).Lock()
|
select {
|
||||||
if config.awaitRunning {
|
case instanceLock.(chan bool) <- true:
|
||||||
return unlock
|
return func() {
|
||||||
|
<-instanceLock.(chan bool)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
defer unlock()
|
}
|
||||||
|
|
||||||
|
// in case we want to wait for a running trigger / lock (e.g. query),
|
||||||
|
// we try to lock as long as the context is not cancelled
|
||||||
|
select {
|
||||||
|
case instanceLock.(chan bool) <- true:
|
||||||
|
return func() {
|
||||||
|
<-instanceLock.(chan bool)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return unlock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (additionalIteration bool, err error) {
|
func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (additionalIteration bool, err error) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user