fix: remove routines

This commit is contained in:
adlerhurst
2020-10-19 13:58:59 +02:00
parent 35ce026651
commit 370597a0e8

View File

@@ -6,7 +6,6 @@ import (
"errors" "errors"
"regexp" "regexp"
"strconv" "strconv"
"sync"
"github.com/caos/logging" "github.com/caos/logging"
caos_errs "github.com/caos/zitadel/internal/errors" caos_errs "github.com/caos/zitadel/internal/errors"
@@ -113,54 +112,42 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
logging.Log("SQL-3to5p").WithError(err).Warn("prepare failed") logging.Log("SQL-3to5p").WithError(err).Warn("prepare failed")
return caos_errs.ThrowInternal(err, "SQL-OdXRE", "prepare failed") return caos_errs.ThrowInternal(err, "SQL-OdXRE", "prepare failed")
} }
wg := sync.WaitGroup{}
errs := make(chan error, len(events))
for _, event := range events { for _, event := range events {
wg.Add(1) previousSequence := Sequence(event.PreviousSequence)
go func(event *repository.Event) { if event.PreviousEvent != nil {
defer wg.Done() if event.PreviousEvent.AggregateType != event.AggregateType || event.PreviousEvent.AggregateID != event.AggregateID {
previousSequence := Sequence(event.PreviousSequence) return caos_errs.ThrowPreconditionFailed(nil, "SQL-J55uR", "aggregate of linked events unequal")
if event.PreviousEvent != nil {
if event.PreviousEvent.AggregateType != event.AggregateType || event.PreviousEvent.AggregateID != event.AggregateID {
errs <- caos_errs.ThrowPreconditionFailed(nil, "SQL-J55uR", "aggregate of linked events unequal")
return
}
previousSequence = Sequence(event.PreviousEvent.Sequence)
} }
err = stmt.QueryRowContext(ctx, previousSequence = Sequence(event.PreviousEvent.Sequence)
event.Type, }
event.AggregateType, err = stmt.QueryRowContext(ctx,
event.AggregateID, event.Type,
event.Version, event.AggregateType,
&sql.NullTime{ event.AggregateID,
Time: event.CreationDate, event.Version,
Valid: !event.CreationDate.IsZero(), &sql.NullTime{
}, Time: event.CreationDate,
Data(event.Data), Valid: !event.CreationDate.IsZero(),
event.EditorUser, },
event.EditorService, Data(event.Data),
event.ResourceOwner, event.EditorUser,
previousSequence, event.EditorService,
event.CheckPreviousSequence, event.ResourceOwner,
).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate) previousSequence,
event.CheckPreviousSequence,
).Scan(&event.ID, &event.Sequence, &previousSequence, &event.CreationDate)
event.PreviousSequence = uint64(previousSequence) event.PreviousSequence = uint64(previousSequence)
if err != nil { if err != nil {
logging.LogWithFields("SQL-IP3js", logging.LogWithFields("SQL-IP3js",
"aggregate", event.AggregateType, "aggregate", event.AggregateType,
"aggregateId", event.AggregateID, "aggregateId", event.AggregateID,
"aggregateType", event.AggregateType, "aggregateType", event.AggregateType,
"eventType", event.Type).WithError(err).Info("query failed") "eventType", event.Type).WithError(err).Info("query failed")
errs <- caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event") return caos_errs.ThrowInternal(err, "SQL-SBP37", "unable to create event")
} }
}(event)
}
wg.Wait()
close(errs)
for err := range errs {
return err
} }
return nil return nil