feat(storage): read only transactions for queries (#6415)

* fix: tests

* bastle wie en grosse

* fix(database): scan as callback

* fix tests

* fix merge failures

* remove as of system time

* refactor: remove unused test

* refacotr: remove unused lines
This commit is contained in:
Silvan
2023-08-22 12:49:22 +02:00
committed by GitHub
parent a9fb2a6e5c
commit 99e1c654a3
128 changed files with 1355 additions and 897 deletions

View File

@@ -36,13 +36,14 @@ type StatementHandler struct {
*handler.ProjectionHandler
Locker
client *database.DB
sequenceTable string
currentSequenceStmt string
updateSequencesBaseStmt string
maxFailureCount uint
failureCountStmt string
setFailureCountStmt string
client *database.DB
sequenceTable string
currentSequenceStmt string
currentSequenceWithoutLockStmt string
updateSequencesBaseStmt string
maxFailureCount uint
failureCountStmt string
setFailureCountStmt string
aggregates []eventstore.AggregateType
reduces map[eventstore.EventType]handler.Reduce
@@ -77,20 +78,21 @@ func NewStatementHandler(
}
h := StatementHandler{
client: config.Client,
sequenceTable: config.SequenceTable,
maxFailureCount: config.MaxFailureCount,
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable),
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
aggregates: aggregateTypes,
reduces: reduces,
bulkLimit: config.BulkLimit,
Locker: NewLocker(config.Client.DB, config.LockTable, config.ProjectionName),
initCheck: config.InitCheck,
initialized: make(chan bool),
reduceScheduledPseudoEvent: reduceScheduledPseudoEvent,
client: config.Client,
sequenceTable: config.SequenceTable,
maxFailureCount: config.MaxFailureCount,
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable),
currentSequenceWithoutLockStmt: fmt.Sprintf(currentSequenceStmtWithoutLockFormat, config.SequenceTable),
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
aggregates: aggregateTypes,
reduces: reduces,
bulkLimit: config.BulkLimit,
Locker: NewLocker(config.Client.DB, config.LockTable, config.ProjectionName),
initCheck: config.InitCheck,
initialized: make(chan bool),
reduceScheduledPseudoEvent: reduceScheduledPseudoEvent,
}
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.searchQuery, h.Lock, h.Unlock, h.initialized, reduceScheduledPseudoEvent)
@@ -114,7 +116,7 @@ func (h *StatementHandler) searchQuery(ctx context.Context, instanceIDs []string
}
func (h *StatementHandler) dbSearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) {
sequences, err := h.currentSequences(ctx, h.client.QueryContext, instanceIDs)
sequences, err := h.currentSequences(ctx, false, h.client.QueryContext, instanceIDs)
if err != nil {
return nil, 0, err
}
@@ -140,6 +142,26 @@ func (h *StatementHandler) dbSearchQuery(ctx context.Context, instanceIDs []stri
return queryBuilder, h.bulkLimit, nil
}
type transaction struct {
*sql.Tx
}
func (t *transaction) QueryContext(ctx context.Context, scan func(rows *sql.Rows) error, query string, args ...any) error {
rows, err := t.Tx.QueryContext(ctx, query, args...)
if err != nil {
return err
}
defer func() {
closeErr := rows.Close()
logging.OnError(closeErr).Info("rows.Close failed")
}()
if err = scan(rows); err != nil {
return err
}
return rows.Err()
}
// Update implements handler.Update
func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statement, reduce handler.Reduce) (index int, err error) {
if len(stmts) == 0 {
@@ -154,7 +176,7 @@ func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statemen
return -1, errors.ThrowInternal(err, "CRDB-e89Gq", "begin failed")
}
sequences, err := h.currentSequences(ctx, tx.QueryContext, instanceIDs)
sequences, err := h.currentSequences(ctx, true, (&transaction{Tx: tx}).QueryContext, instanceIDs)
if err != nil {
tx.Rollback()
return -1, err