This commit is contained in:
adlerhurst
2020-11-27 11:30:56 +01:00
parent 3bd4d3a8e3
commit 9487e8bdeb
20 changed files with 424 additions and 183 deletions

View File

@@ -50,10 +50,12 @@ func (s *Spooler) Start() {
}
}(i)
}
for _, handler := range s.handlers {
handler := &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore}
s.queue <- handler
}
go func() {
for _, handler := range s.handlers {
handler := &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore}
s.queue <- handler
}
}()
}
func requeueTask(task *spooledHandler, queue chan<- *spooledHandler) {

View File

@@ -170,7 +170,8 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
"aggregate", event.AggregateType,
"aggregateId", event.AggregateID,
"aggregateType", event.AggregateType,
"eventType", event.Type).WithError(err).Info("query failed")
"eventType", event.Type).WithError(err).Info("query failed",
"seq", event.PreviousSequence)
return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event")
}
}