mirror of
https://github.com/zitadel/zitadel.git
synced 2025-01-09 10:23:40 +00:00
1ac1492fd3
* fix: handle multiple statements for a single event in projections * export func type * fix test * Update internal/eventstore/handler/crdb/statement.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/eventstore/handler/crdb/statement.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * change to pointers * add error test case Co-authored-by: Silvan <silvan.reusser@gmail.com>
269 lines
7.2 KiB
Go
269 lines
7.2 KiB
Go
package crdb
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"os"
|
|
|
|
"github.com/caos/logging"
|
|
"github.com/caos/zitadel/internal/errors"
|
|
"github.com/caos/zitadel/internal/eventstore"
|
|
"github.com/caos/zitadel/internal/eventstore/handler"
|
|
"github.com/caos/zitadel/internal/id"
|
|
)
|
|
|
|
var (
|
|
errSeqNotUpdated = errors.ThrowInternal(nil, "CRDB-79GWt", "current sequence not updated")
|
|
)
|
|
|
|
type StatementHandlerConfig struct {
|
|
handler.ProjectionHandlerConfig
|
|
|
|
Client *sql.DB
|
|
SequenceTable string
|
|
LockTable string
|
|
FailedEventsTable string
|
|
MaxFailureCount uint
|
|
BulkLimit uint64
|
|
|
|
Reducers []handler.AggregateReducer
|
|
}
|
|
|
|
type StatementHandler struct {
|
|
*handler.ProjectionHandler
|
|
|
|
client *sql.DB
|
|
sequenceTable string
|
|
currentSequenceStmt string
|
|
updateSequencesBaseStmt string
|
|
maxFailureCount uint
|
|
failureCountStmt string
|
|
setFailureCountStmt string
|
|
lockStmt string
|
|
|
|
aggregates []eventstore.AggregateType
|
|
reduces map[eventstore.EventType]handler.Reduce
|
|
|
|
workerName string
|
|
bulkLimit uint64
|
|
}
|
|
|
|
func NewStatementHandler(
|
|
ctx context.Context,
|
|
config StatementHandlerConfig,
|
|
) StatementHandler {
|
|
workerName, err := os.Hostname()
|
|
if err != nil || workerName == "" {
|
|
workerName, err = id.SonyFlakeGenerator.Next()
|
|
logging.Log("SPOOL-bdO56").OnError(err).Panic("unable to generate lockID")
|
|
}
|
|
|
|
aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers))
|
|
reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers))
|
|
for _, aggReducer := range config.Reducers {
|
|
aggregateTypes = append(aggregateTypes, aggReducer.Aggregate)
|
|
for _, eventReducer := range aggReducer.EventRedusers {
|
|
reduces[eventReducer.Event] = eventReducer.Reduce
|
|
}
|
|
}
|
|
|
|
h := StatementHandler{
|
|
ProjectionHandler: handler.NewProjectionHandler(config.ProjectionHandlerConfig),
|
|
client: config.Client,
|
|
sequenceTable: config.SequenceTable,
|
|
maxFailureCount: config.MaxFailureCount,
|
|
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable),
|
|
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
|
|
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
|
|
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
|
|
lockStmt: fmt.Sprintf(lockStmtFormat, config.LockTable),
|
|
aggregates: aggregateTypes,
|
|
reduces: reduces,
|
|
workerName: workerName,
|
|
bulkLimit: config.BulkLimit,
|
|
}
|
|
|
|
go h.ProjectionHandler.Process(
|
|
ctx,
|
|
h.reduce,
|
|
h.Update,
|
|
h.Lock,
|
|
h.Unlock,
|
|
h.SearchQuery,
|
|
)
|
|
|
|
h.ProjectionHandler.Handler.Subscribe(h.aggregates...)
|
|
|
|
return h
|
|
}
|
|
|
|
func (h *StatementHandler) SearchQuery() (*eventstore.SearchQueryBuilder, uint64, error) {
|
|
sequences, err := h.currentSequences(h.client.Query)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
queryBuilder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).Limit(h.bulkLimit)
|
|
for _, aggregateType := range h.aggregates {
|
|
queryBuilder.
|
|
AddQuery().
|
|
AggregateTypes(aggregateType).
|
|
SequenceGreater(sequences[aggregateType])
|
|
}
|
|
|
|
return queryBuilder, h.bulkLimit, nil
|
|
}
|
|
|
|
//Update implements handler.Update
|
|
func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statement, reduce handler.Reduce) (unexecutedStmts []*handler.Statement, err error) {
|
|
tx, err := h.client.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return stmts, errors.ThrowInternal(err, "CRDB-e89Gq", "begin failed")
|
|
}
|
|
|
|
sequences, err := h.currentSequences(tx.Query)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return stmts, err
|
|
}
|
|
|
|
//checks for events between create statement and current sequence
|
|
// because there could be events between current sequence and a creation event
|
|
// and we cannot check via stmt.PreviousSequence
|
|
if stmts[0].PreviousSequence == 0 {
|
|
previousStmts, err := h.fetchPreviousStmts(ctx, stmts[0].Sequence, sequences, reduce)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return stmts, err
|
|
}
|
|
stmts = append(previousStmts, stmts...)
|
|
}
|
|
|
|
lastSuccessfulIdx := h.executeStmts(tx, stmts, sequences)
|
|
|
|
if lastSuccessfulIdx >= 0 {
|
|
err = h.updateCurrentSequences(tx, sequences)
|
|
if err != nil {
|
|
tx.Rollback()
|
|
return stmts, err
|
|
}
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
return stmts, err
|
|
}
|
|
|
|
if lastSuccessfulIdx == -1 {
|
|
return stmts, handler.ErrSomeStmtsFailed
|
|
}
|
|
|
|
unexecutedStmts = make([]*handler.Statement, len(stmts)-(lastSuccessfulIdx+1))
|
|
copy(unexecutedStmts, stmts[lastSuccessfulIdx+1:])
|
|
stmts = nil
|
|
|
|
if len(unexecutedStmts) > 0 {
|
|
return unexecutedStmts, handler.ErrSomeStmtsFailed
|
|
}
|
|
|
|
return unexecutedStmts, nil
|
|
}
|
|
|
|
func (h *StatementHandler) fetchPreviousStmts(
|
|
ctx context.Context,
|
|
stmtSeq uint64,
|
|
sequences currentSequences,
|
|
reduce handler.Reduce,
|
|
) (previousStmts []*handler.Statement, err error) {
|
|
|
|
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent)
|
|
queriesAdded := false
|
|
for _, aggregateType := range h.aggregates {
|
|
if stmtSeq <= sequences[aggregateType] {
|
|
continue
|
|
}
|
|
|
|
query.
|
|
AddQuery().
|
|
AggregateTypes(aggregateType).
|
|
SequenceGreater(sequences[aggregateType]).
|
|
SequenceLess(stmtSeq)
|
|
|
|
queriesAdded = true
|
|
}
|
|
|
|
if !queriesAdded {
|
|
return nil, nil
|
|
}
|
|
|
|
events, err := h.Eventstore.FilterEvents(ctx, query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, event := range events {
|
|
stmt, err := reduce(event)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
previousStmts = append(previousStmts, stmt)
|
|
}
|
|
return previousStmts, nil
|
|
}
|
|
|
|
func (h *StatementHandler) executeStmts(
|
|
tx *sql.Tx,
|
|
stmts []*handler.Statement,
|
|
sequences currentSequences,
|
|
) int {
|
|
|
|
lastSuccessfulIdx := -1
|
|
for i, stmt := range stmts {
|
|
if stmt.Sequence <= sequences[stmt.AggregateType] {
|
|
continue
|
|
}
|
|
if stmt.PreviousSequence > 0 && stmt.PreviousSequence != sequences[stmt.AggregateType] {
|
|
logging.LogWithFields("CRDB-jJBJn", "projection", h.ProjectionName, "aggregateType", stmt.AggregateType, "seq", stmt.Sequence, "prevSeq", stmt.PreviousSequence, "currentSeq", sequences[stmt.AggregateType]).Warn("sequences do not match")
|
|
break
|
|
}
|
|
err := h.executeStmt(tx, stmt)
|
|
if err == nil {
|
|
sequences[stmt.AggregateType], lastSuccessfulIdx = stmt.Sequence, i
|
|
continue
|
|
}
|
|
|
|
shouldContinue := h.handleFailedStmt(tx, stmt, err)
|
|
if !shouldContinue {
|
|
break
|
|
}
|
|
|
|
sequences[stmt.AggregateType], lastSuccessfulIdx = stmt.Sequence, i
|
|
}
|
|
return lastSuccessfulIdx
|
|
}
|
|
|
|
//executeStmt handles sql statements
|
|
//an error is returned if the statement could not be inserted properly
|
|
func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt *handler.Statement) error {
|
|
if stmt.IsNoop() {
|
|
return nil
|
|
}
|
|
_, err := tx.Exec("SAVEPOINT push_stmt")
|
|
if err != nil {
|
|
return errors.ThrowInternal(err, "CRDB-i1wp6", "unable to create savepoint")
|
|
}
|
|
err = stmt.Execute(tx, h.ProjectionName)
|
|
if err != nil {
|
|
_, rollbackErr := tx.Exec("ROLLBACK TO SAVEPOINT push_stmt")
|
|
if rollbackErr != nil {
|
|
return errors.ThrowInternal(rollbackErr, "CRDB-zzp3P", "rollback to savepoint failed")
|
|
}
|
|
return errors.ThrowInternal(err, "CRDB-oRkaN", "unable execute stmt")
|
|
}
|
|
_, err = tx.Exec("RELEASE push_stmt")
|
|
if err != nil {
|
|
return errors.ThrowInternal(err, "CRDB-qWgwT", "unable to release savepoint")
|
|
}
|
|
return nil
|
|
}
|