mirror of
https://github.com/zitadel/zitadel.git
synced 2025-05-16 19:48:20 +00:00
fix(eventstore): improve pagination of handler filter (#6968)
* fix(setup): add filter_offset to `projections.current_states` * fix(eventstore): allow offset in query * fix(handler): offset for already processed events (cherry picked from commit e3d1ca4d586f615854c05184c32314fbe67e128e)
This commit is contained in:
parent
54e9e1f33d
commit
7cfb0e715a
26
cmd/setup/17.go
Normal file
26
cmd/setup/17.go
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
package setup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
_ "embed"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
//go:embed 17.sql
|
||||||
|
addOffsetField string
|
||||||
|
)
|
||||||
|
|
||||||
|
type AddOffsetToCurrentStates struct {
|
||||||
|
dbClient *database.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *AddOffsetToCurrentStates) Execute(ctx context.Context) error {
|
||||||
|
_, err := mig.dbClient.ExecContext(ctx, addOffsetField)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *AddOffsetToCurrentStates) String() string {
|
||||||
|
return "17_add_offset_col_to_current_states"
|
||||||
|
}
|
1
cmd/setup/17.sql
Normal file
1
cmd/setup/17.sql
Normal file
@ -0,0 +1 @@
|
|||||||
|
ALTER TABLE projections.current_states ADD filter_offset INTEGER;
|
@ -61,19 +61,20 @@ func MustNewConfig(v *viper.Viper) *Config {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Steps struct {
|
type Steps struct {
|
||||||
s1ProjectionTable *ProjectionTable
|
s1ProjectionTable *ProjectionTable
|
||||||
s2AssetsTable *AssetTable
|
s2AssetsTable *AssetTable
|
||||||
FirstInstance *FirstInstance
|
FirstInstance *FirstInstance
|
||||||
s5LastFailed *LastFailed
|
s5LastFailed *LastFailed
|
||||||
s6OwnerRemoveColumns *OwnerRemoveColumns
|
s6OwnerRemoveColumns *OwnerRemoveColumns
|
||||||
s7LogstoreTables *LogstoreTables
|
s7LogstoreTables *LogstoreTables
|
||||||
s8AuthTokens *AuthTokenIndexes
|
s8AuthTokens *AuthTokenIndexes
|
||||||
CorrectCreationDate *CorrectCreationDate
|
CorrectCreationDate *CorrectCreationDate
|
||||||
s12AddOTPColumns *AddOTPColumns
|
s12AddOTPColumns *AddOTPColumns
|
||||||
s13FixQuotaProjection *FixQuotaConstraints
|
s13FixQuotaProjection *FixQuotaConstraints
|
||||||
s14NewEventsTable *NewEventsTable
|
s14NewEventsTable *NewEventsTable
|
||||||
s15CurrentStates *CurrentProjectionState
|
s15CurrentStates *CurrentProjectionState
|
||||||
s16UniqueConstraintsLower *UniqueConstraintToLower
|
s16UniqueConstraintsLower *UniqueConstraintToLower
|
||||||
|
s17AddOffsetToUniqueConstraints *AddOffsetToCurrentStates
|
||||||
}
|
}
|
||||||
|
|
||||||
type encryptionKeyConfig struct {
|
type encryptionKeyConfig struct {
|
||||||
|
@ -101,6 +101,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
steps.s14NewEventsTable = &NewEventsTable{dbClient: esPusherDBClient}
|
steps.s14NewEventsTable = &NewEventsTable{dbClient: esPusherDBClient}
|
||||||
steps.s15CurrentStates = &CurrentProjectionState{dbClient: zitadelDBClient}
|
steps.s15CurrentStates = &CurrentProjectionState{dbClient: zitadelDBClient}
|
||||||
steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: zitadelDBClient}
|
steps.s16UniqueConstraintsLower = &UniqueConstraintToLower{dbClient: zitadelDBClient}
|
||||||
|
steps.s17AddOffsetToUniqueConstraints = &AddOffsetToCurrentStates{dbClient: zitadelDBClient}
|
||||||
|
|
||||||
err = projection.Create(ctx, zitadelDBClient, eventstoreClient, config.Projections, nil, nil, nil)
|
err = projection.Create(ctx, zitadelDBClient, eventstoreClient, config.Projections, nil, nil, nil)
|
||||||
logging.OnError(err).Fatal("unable to start projections")
|
logging.OnError(err).Fatal("unable to start projections")
|
||||||
@ -143,6 +144,8 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
logging.WithFields("name", steps.s15CurrentStates.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", steps.s15CurrentStates.String()).OnError(err).Fatal("migration failed")
|
||||||
err = migration.Migrate(ctx, eventstoreClient, steps.s16UniqueConstraintsLower)
|
err = migration.Migrate(ctx, eventstoreClient, steps.s16UniqueConstraintsLower)
|
||||||
logging.WithFields("name", steps.s16UniqueConstraintsLower.String()).OnError(err).Fatal("migration failed")
|
logging.WithFields("name", steps.s16UniqueConstraintsLower.String()).OnError(err).Fatal("migration failed")
|
||||||
|
err = migration.Migrate(ctx, eventstoreClient, steps.s17AddOffsetToUniqueConstraints)
|
||||||
|
logging.WithFields("name", steps.s17AddOffsetToUniqueConstraints.String()).OnError(err).Fatal("migration failed")
|
||||||
|
|
||||||
for _, repeatableStep := range repeatableSteps {
|
for _, repeatableStep := range repeatableSteps {
|
||||||
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
||||||
|
@ -70,12 +70,9 @@ func transactionFilter(filter FilterToQueryReducer, commands []eventstore.Comman
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for _, command := range commands {
|
matches := query.Matches(commands...)
|
||||||
event := command.(eventstore.Event)
|
for _, command := range matches {
|
||||||
if !query.Matches(event, len(events)) {
|
events = append(events, command.(eventstore.Event))
|
||||||
continue
|
|
||||||
}
|
|
||||||
events = append(events, event)
|
|
||||||
}
|
}
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
@ -81,7 +81,7 @@ type Aggregate struct {
|
|||||||
// AggregateType is the object name
|
// AggregateType is the object name
|
||||||
type AggregateType string
|
type AggregateType string
|
||||||
|
|
||||||
func isAggreagteTypes(a *Aggregate, types ...AggregateType) bool {
|
func isAggregateTypes(a *Aggregate, types ...AggregateType) bool {
|
||||||
for _, typ := range types {
|
for _, typ := range types {
|
||||||
if a.Type == typ {
|
if a.Type == typ {
|
||||||
return true
|
return true
|
||||||
|
@ -97,9 +97,9 @@ func GenericEventMapper[T any, PT BaseEventSetter[T]](event Event) (Event, error
|
|||||||
return e, nil
|
return e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func isEventTypes(event Event, types ...EventType) bool {
|
func isEventTypes(command Command, types ...EventType) bool {
|
||||||
for _, typ := range types {
|
for _, typ := range types {
|
||||||
if event.Type() == typ {
|
if command.Type() == typ {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,7 @@ func failureFromStatement(statement *Statement, err error) *failure {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) handleFailedStmt(tx *sql.Tx, currentState *state, f *failure) (shouldContinue bool) {
|
func (h *Handler) handleFailedStmt(tx *sql.Tx, f *failure) (shouldContinue bool) {
|
||||||
failureCount, err := h.failureCount(tx, f)
|
failureCount, err := h.failureCount(tx, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logFailure(f).WithError(err).Warn("unable to get failure count")
|
h.logFailure(f).WithError(err).Warn("unable to get failure count")
|
||||||
|
@ -331,7 +331,11 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
|||||||
|
|
||||||
var statements []*Statement
|
var statements []*Statement
|
||||||
statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState)
|
statements, additionalIteration, err = h.generateStatements(ctx, tx, currentState)
|
||||||
if err != nil || len(statements) == 0 {
|
if err != nil {
|
||||||
|
return additionalIteration, err
|
||||||
|
}
|
||||||
|
if len(statements) == 0 {
|
||||||
|
err = h.setState(tx, currentState)
|
||||||
return additionalIteration, err
|
return additionalIteration, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -341,6 +345,7 @@ func (h *Handler) processEvents(ctx context.Context, config *triggerConfig) (add
|
|||||||
}
|
}
|
||||||
|
|
||||||
currentState.position = statements[lastProcessedIndex].Position
|
currentState.position = statements[lastProcessedIndex].Position
|
||||||
|
currentState.offset = statements[lastProcessedIndex].offset
|
||||||
currentState.aggregateID = statements[lastProcessedIndex].AggregateID
|
currentState.aggregateID = statements[lastProcessedIndex].AggregateID
|
||||||
currentState.aggregateType = statements[lastProcessedIndex].AggregateType
|
currentState.aggregateType = statements[lastProcessedIndex].AggregateType
|
||||||
currentState.sequence = statements[lastProcessedIndex].Sequence
|
currentState.sequence = statements[lastProcessedIndex].Sequence
|
||||||
@ -365,37 +370,44 @@ func (h *Handler) generateStatements(ctx context.Context, tx *sql.Tx, currentSta
|
|||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
eventAmount := len(events)
|
eventAmount := len(events)
|
||||||
events = skipPreviouslyReduced(events, currentState)
|
|
||||||
|
|
||||||
if len(events) == 0 {
|
|
||||||
h.updateLastUpdated(ctx, tx, currentState)
|
|
||||||
return nil, false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
statements, err := h.eventsToStatements(tx, events, currentState)
|
statements, err := h.eventsToStatements(tx, events, currentState)
|
||||||
if len(statements) == 0 {
|
if err != nil || len(statements) == 0 {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
idx := skipPreviouslyReduced(statements, currentState)
|
||||||
|
if idx+1 == len(statements) {
|
||||||
|
currentState.position = statements[len(statements)-1].Position
|
||||||
|
currentState.offset = statements[len(statements)-1].offset
|
||||||
|
currentState.aggregateID = statements[len(statements)-1].AggregateID
|
||||||
|
currentState.aggregateType = statements[len(statements)-1].AggregateType
|
||||||
|
currentState.sequence = statements[len(statements)-1].Sequence
|
||||||
|
currentState.eventTimestamp = statements[len(statements)-1].CreationDate
|
||||||
|
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
statements = statements[idx+1:]
|
||||||
|
|
||||||
additionalIteration = eventAmount == int(h.bulkLimit)
|
additionalIteration = eventAmount == int(h.bulkLimit)
|
||||||
if len(statements) < len(events) {
|
if len(statements) < len(events) {
|
||||||
// retry imediatly if statements failed
|
// retry immediately if statements failed
|
||||||
additionalIteration = true
|
additionalIteration = true
|
||||||
}
|
}
|
||||||
|
|
||||||
return statements, additionalIteration, nil
|
return statements, additionalIteration, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func skipPreviouslyReduced(events []eventstore.Event, currentState *state) []eventstore.Event {
|
func skipPreviouslyReduced(statements []*Statement, currentState *state) int {
|
||||||
for i, event := range events {
|
for i, statement := range statements {
|
||||||
if event.Position() == currentState.position &&
|
if statement.Position == currentState.position &&
|
||||||
event.Aggregate().ID == currentState.aggregateID &&
|
statement.AggregateID == currentState.aggregateID &&
|
||||||
event.Aggregate().Type == currentState.aggregateType &&
|
statement.AggregateType == currentState.aggregateType &&
|
||||||
event.Sequence() == currentState.sequence {
|
statement.Sequence == currentState.sequence {
|
||||||
return events[i+1:]
|
return i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return events
|
return -1
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, currentState *state, statements []*Statement) (lastProcessedIndex int, err error) {
|
func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, currentState *state, statements []*Statement) (lastProcessedIndex int, err error) {
|
||||||
@ -434,7 +446,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, currentState
|
|||||||
if err = statement.Execute(tx, h.projection.Name()); err != nil {
|
if err = statement.Execute(tx, h.projection.Name()); err != nil {
|
||||||
h.log().WithError(err).Error("statement execution failed")
|
h.log().WithError(err).Error("statement execution failed")
|
||||||
|
|
||||||
shouldContinue = h.handleFailedStmt(tx, currentState, failureFromStatement(statement, err))
|
shouldContinue = h.handleFailedStmt(tx, failureFromStatement(statement, err))
|
||||||
if shouldContinue {
|
if shouldContinue {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -454,7 +466,11 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
|
|||||||
InstanceID(currentState.instanceID)
|
InstanceID(currentState.instanceID)
|
||||||
|
|
||||||
if currentState.position > 0 {
|
if currentState.position > 0 {
|
||||||
|
// decrease position by 10 because builder.PositionAfter filters for position > and we need position >=
|
||||||
builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10))
|
builder = builder.PositionAfter(math.Float64frombits(math.Float64bits(currentState.position) - 10))
|
||||||
|
if currentState.offset > 0 {
|
||||||
|
builder = builder.Offset(currentState.offset)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for aggregateType, eventTypes := range h.eventTypes {
|
for aggregateType, eventTypes := range h.eventTypes {
|
||||||
@ -469,7 +485,7 @@ func (h *Handler) eventQuery(currentState *state) *eventstore.SearchQueryBuilder
|
|||||||
return builder
|
return builder
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProjectionName returns the name of the unlying projection.
|
// ProjectionName returns the name of the underlying projection.
|
||||||
func (h *Handler) ProjectionName() string {
|
func (h *Handler) ProjectionName() string {
|
||||||
return h.projection.Name()
|
return h.projection.Name()
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@ type state struct {
|
|||||||
aggregateType eventstore.AggregateType
|
aggregateType eventstore.AggregateType
|
||||||
aggregateID string
|
aggregateID string
|
||||||
sequence uint64
|
sequence uint64
|
||||||
|
offset uint16
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -30,8 +31,6 @@ var (
|
|||||||
updateStateStmt string
|
updateStateStmt string
|
||||||
//go:embed state_lock.sql
|
//go:embed state_lock.sql
|
||||||
lockStateStmt string
|
lockStateStmt string
|
||||||
//go:embed state_set_last_run.sql
|
|
||||||
updateStateLastRunStmt string
|
|
||||||
|
|
||||||
errJustUpdated = errors.New("projection was just updated")
|
errJustUpdated = errors.New("projection was just updated")
|
||||||
)
|
)
|
||||||
@ -47,6 +46,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
|
|||||||
sequence = new(sql.NullInt64)
|
sequence = new(sql.NullInt64)
|
||||||
timestamp = new(sql.NullTime)
|
timestamp = new(sql.NullTime)
|
||||||
position = new(sql.NullFloat64)
|
position = new(sql.NullFloat64)
|
||||||
|
offset = new(sql.NullInt16)
|
||||||
)
|
)
|
||||||
|
|
||||||
stateQuery := currentStateStmt
|
stateQuery := currentStateStmt
|
||||||
@ -61,6 +61,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
|
|||||||
sequence,
|
sequence,
|
||||||
timestamp,
|
timestamp,
|
||||||
position,
|
position,
|
||||||
|
offset,
|
||||||
)
|
)
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
err = h.lockState(tx, currentState.instanceID)
|
err = h.lockState(tx, currentState.instanceID)
|
||||||
@ -75,6 +76,7 @@ func (h *Handler) currentState(ctx context.Context, tx *sql.Tx, config *triggerC
|
|||||||
currentState.sequence = uint64(sequence.Int64)
|
currentState.sequence = uint64(sequence.Int64)
|
||||||
currentState.eventTimestamp = timestamp.Time
|
currentState.eventTimestamp = timestamp.Time
|
||||||
currentState.position = position.Float64
|
currentState.position = position.Float64
|
||||||
|
currentState.offset = uint16(offset.Int16)
|
||||||
return currentState, nil
|
return currentState, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -87,6 +89,7 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error {
|
|||||||
updatedState.sequence,
|
updatedState.sequence,
|
||||||
updatedState.eventTimestamp,
|
updatedState.eventTimestamp,
|
||||||
updatedState.position,
|
updatedState.position,
|
||||||
|
updatedState.offset,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.log().WithError(err).Debug("unable to update state")
|
h.log().WithError(err).Debug("unable to update state")
|
||||||
@ -99,11 +102,6 @@ func (h *Handler) setState(tx *sql.Tx, updatedState *state) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) updateLastUpdated(ctx context.Context, tx *sql.Tx, updatedState *state) {
|
|
||||||
_, err := tx.ExecContext(ctx, updateStateLastRunStmt, h.projection.Name(), updatedState.instanceID)
|
|
||||||
h.log().OnError(err).Debug("unable to update last updated")
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) lockState(tx *sql.Tx, instanceID string) error {
|
func (h *Handler) lockState(tx *sql.Tx, instanceID string) error {
|
||||||
res, err := tx.Exec(lockStateStmt,
|
res, err := tx.Exec(lockStateStmt,
|
||||||
h.projection.Name(),
|
h.projection.Name(),
|
||||||
|
@ -4,6 +4,7 @@ SELECT
|
|||||||
, "sequence"
|
, "sequence"
|
||||||
, event_date
|
, event_date
|
||||||
, "position"
|
, "position"
|
||||||
|
, filter_offset
|
||||||
FROM
|
FROM
|
||||||
projections.current_states
|
projections.current_states
|
||||||
WHERE
|
WHERE
|
||||||
|
@ -4,6 +4,7 @@ SELECT
|
|||||||
, "sequence"
|
, "sequence"
|
||||||
, event_date
|
, event_date
|
||||||
, "position"
|
, "position"
|
||||||
|
, filter_offset
|
||||||
FROM
|
FROM
|
||||||
projections.current_states
|
projections.current_states
|
||||||
WHERE
|
WHERE
|
||||||
|
@ -7,6 +7,7 @@ INSERT INTO projections.current_states (
|
|||||||
, event_date
|
, event_date
|
||||||
, "position"
|
, "position"
|
||||||
, last_updated
|
, last_updated
|
||||||
|
, filter_offset
|
||||||
) VALUES (
|
) VALUES (
|
||||||
$1
|
$1
|
||||||
, $2
|
, $2
|
||||||
@ -16,6 +17,7 @@ INSERT INTO projections.current_states (
|
|||||||
, $6
|
, $6
|
||||||
, $7
|
, $7
|
||||||
, now()
|
, now()
|
||||||
|
, $8
|
||||||
) ON CONFLICT (
|
) ON CONFLICT (
|
||||||
projection_name
|
projection_name
|
||||||
, instance_id
|
, instance_id
|
||||||
@ -26,4 +28,5 @@ INSERT INTO projections.current_states (
|
|||||||
, event_date = $6
|
, event_date = $6
|
||||||
, "position" = $7
|
, "position" = $7
|
||||||
, last_updated = statement_timestamp()
|
, last_updated = statement_timestamp()
|
||||||
|
, filter_offset = $8
|
||||||
;
|
;
|
@ -1,2 +0,0 @@
|
|||||||
UPDATE projections.current_states SET last_updated = now() WHERE projection_name = $1 AND instance_id = $2;
|
|
||||||
|
|
@ -217,6 +217,7 @@ func TestHandler_updateLastUpdated(t *testing.T) {
|
|||||||
uint64(42),
|
uint64(42),
|
||||||
mock.AnyType[time.Time]{},
|
mock.AnyType[time.Time]{},
|
||||||
float64(42),
|
float64(42),
|
||||||
|
uint16(0),
|
||||||
),
|
),
|
||||||
mock.WithExecRowsAffected(1),
|
mock.WithExecRowsAffected(1),
|
||||||
),
|
),
|
||||||
@ -388,7 +389,7 @@ func TestHandler_currentState(t *testing.T) {
|
|||||||
"projection",
|
"projection",
|
||||||
),
|
),
|
||||||
mock.WithQueryResult(
|
mock.WithQueryResult(
|
||||||
[]string{"aggregate_id", "aggregate_type", "event_sequence", "event_date", "position"},
|
[]string{"aggregate_id", "aggregate_type", "event_sequence", "event_date", "position", "offset"},
|
||||||
[][]driver.Value{
|
[][]driver.Value{
|
||||||
{
|
{
|
||||||
"aggregate id",
|
"aggregate id",
|
||||||
@ -396,6 +397,7 @@ func TestHandler_currentState(t *testing.T) {
|
|||||||
int64(42),
|
int64(42),
|
||||||
testTime,
|
testTime,
|
||||||
float64(42),
|
float64(42),
|
||||||
|
uint16(10),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
),
|
),
|
||||||
@ -413,6 +415,7 @@ func TestHandler_currentState(t *testing.T) {
|
|||||||
aggregateType: "aggregate type",
|
aggregateType: "aggregate type",
|
||||||
aggregateID: "aggregate id",
|
aggregateID: "aggregate id",
|
||||||
sequence: 42,
|
sequence: 42,
|
||||||
|
offset: 10,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -17,15 +17,26 @@ import (
|
|||||||
|
|
||||||
func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, currentState *state) (statements []*Statement, err error) {
|
func (h *Handler) eventsToStatements(tx *sql.Tx, events []eventstore.Event, currentState *state) (statements []*Statement, err error) {
|
||||||
statements = make([]*Statement, 0, len(events))
|
statements = make([]*Statement, 0, len(events))
|
||||||
|
|
||||||
|
previousPosition := currentState.position
|
||||||
|
offset := currentState.offset
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
statement, err := h.reduce(event)
|
statement, err := h.reduce(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logEvent(event).WithError(err).Error("reduce failed")
|
h.logEvent(event).WithError(err).Error("reduce failed")
|
||||||
if shouldContinue := h.handleFailedStmt(tx, currentState, failureFromEvent(event, err)); shouldContinue {
|
if shouldContinue := h.handleFailedStmt(tx, failureFromEvent(event, err)); shouldContinue {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return statements, err
|
return statements, err
|
||||||
}
|
}
|
||||||
|
offset++
|
||||||
|
if previousPosition != event.Position() {
|
||||||
|
// offset is 1 because we want to skip this event
|
||||||
|
offset = 1
|
||||||
|
}
|
||||||
|
statement.offset = offset
|
||||||
|
statement.Position = event.Position()
|
||||||
|
previousPosition = event.Position()
|
||||||
statements = append(statements, statement)
|
statements = append(statements, statement)
|
||||||
}
|
}
|
||||||
return statements, nil
|
return statements, nil
|
||||||
@ -54,6 +65,8 @@ type Statement struct {
|
|||||||
CreationDate time.Time
|
CreationDate time.Time
|
||||||
InstanceID string
|
InstanceID string
|
||||||
|
|
||||||
|
offset uint16
|
||||||
|
|
||||||
Execute Exec
|
Execute Exec
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ type SearchQuery struct {
|
|||||||
AllowTimeTravel bool
|
AllowTimeTravel bool
|
||||||
AwaitOpenTransactions bool
|
AwaitOpenTransactions bool
|
||||||
Limit uint64
|
Limit uint64
|
||||||
|
Offset uint16
|
||||||
Desc bool
|
Desc bool
|
||||||
|
|
||||||
InstanceID *Filter
|
InstanceID *Filter
|
||||||
@ -121,12 +122,12 @@ func QueryFromBuilder(builder *eventstore.SearchQueryBuilder) (*SearchQuery, err
|
|||||||
query := &SearchQuery{
|
query := &SearchQuery{
|
||||||
Columns: builder.GetColumns(),
|
Columns: builder.GetColumns(),
|
||||||
Limit: builder.GetLimit(),
|
Limit: builder.GetLimit(),
|
||||||
|
Offset: builder.GetOffset(),
|
||||||
Desc: builder.GetDesc(),
|
Desc: builder.GetDesc(),
|
||||||
Tx: builder.GetTx(),
|
Tx: builder.GetTx(),
|
||||||
AllowTimeTravel: builder.GetAllowTimeTravel(),
|
AllowTimeTravel: builder.GetAllowTimeTravel(),
|
||||||
AwaitOpenTransactions: builder.GetAwaitOpenTransactions(),
|
AwaitOpenTransactions: builder.GetAwaitOpenTransactions(),
|
||||||
// Queries: make([]*Filter, 0, 7),
|
SubQueries: make([][]*Filter, len(builder.GetQueries())),
|
||||||
SubQueries: make([][]*Filter, len(builder.GetQueries())),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
|
for _, f := range []func(builder *eventstore.SearchQueryBuilder, query *SearchQuery) *Filter{
|
||||||
|
@ -89,6 +89,11 @@ func query(ctx context.Context, criteria querier, searchQuery *eventstore.Search
|
|||||||
query += " LIMIT ?"
|
query += " LIMIT ?"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if q.Offset > 0 {
|
||||||
|
values = append(values, q.Offset)
|
||||||
|
query += " OFFSET ?"
|
||||||
|
}
|
||||||
|
|
||||||
query = criteria.placeholder(query)
|
query = criteria.placeholder(query)
|
||||||
|
|
||||||
var contextQuerier interface {
|
var contextQuerier interface {
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
type SearchQueryBuilder struct {
|
type SearchQueryBuilder struct {
|
||||||
columns Columns
|
columns Columns
|
||||||
limit uint64
|
limit uint64
|
||||||
|
offset uint16
|
||||||
desc bool
|
desc bool
|
||||||
resourceOwner string
|
resourceOwner string
|
||||||
instanceID *string
|
instanceID *string
|
||||||
@ -37,6 +38,10 @@ func (b *SearchQueryBuilder) GetLimit() uint64 {
|
|||||||
return b.limit
|
return b.limit
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *SearchQueryBuilder) GetOffset() uint16 {
|
||||||
|
return b.offset
|
||||||
|
}
|
||||||
|
|
||||||
func (b *SearchQueryBuilder) GetDesc() bool {
|
func (b *SearchQueryBuilder) GetDesc() bool {
|
||||||
return b.desc
|
return b.desc
|
||||||
}
|
}
|
||||||
@ -149,25 +154,46 @@ func NewSearchQueryBuilder(columns Columns) *SearchQueryBuilder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (builder *SearchQueryBuilder) Matches(event Event, existingLen int) (matches bool) {
|
func (builder *SearchQueryBuilder) Matches(commands ...Command) []Command {
|
||||||
if builder.limit > 0 && uint64(existingLen) >= builder.limit {
|
matches := make([]Command, 0, len(commands))
|
||||||
|
for i, command := range commands {
|
||||||
|
if builder.limit > 0 && builder.limit <= uint64(len(matches)) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if builder.offset > 0 && uint16(i) < builder.offset {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if builder.matchCommand(command) {
|
||||||
|
matches = append(matches, command)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return matches
|
||||||
|
}
|
||||||
|
|
||||||
|
type sequencer interface {
|
||||||
|
Sequence() uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (builder *SearchQueryBuilder) matchCommand(command Command) bool {
|
||||||
|
if builder.resourceOwner != "" && command.Aggregate().ResourceOwner != builder.resourceOwner {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if builder.resourceOwner != "" && event.Aggregate().ResourceOwner != builder.resourceOwner {
|
if command.Aggregate().InstanceID != "" && builder.instanceID != nil && *builder.instanceID != "" && command.Aggregate().InstanceID != *builder.instanceID {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if event.Aggregate().InstanceID != "" && builder.instanceID != nil && *builder.instanceID != "" && event.Aggregate().InstanceID != *builder.instanceID {
|
if seq, ok := command.(sequencer); ok {
|
||||||
return false
|
if builder.eventSequenceGreater > 0 && seq.Sequence() <= builder.eventSequenceGreater {
|
||||||
}
|
return false
|
||||||
if builder.eventSequenceGreater > 0 && event.Sequence() <= builder.eventSequenceGreater {
|
}
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(builder.queries) == 0 {
|
if len(builder.queries) == 0 {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
for _, query := range builder.queries {
|
for _, query := range builder.queries {
|
||||||
if query.matches(event) {
|
if query.matches(command) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -186,6 +212,12 @@ func (builder *SearchQueryBuilder) Limit(limit uint64) *SearchQueryBuilder {
|
|||||||
return builder
|
return builder
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Limit defines how many events are returned maximally.
|
||||||
|
func (builder *SearchQueryBuilder) Offset(offset uint16) *SearchQueryBuilder {
|
||||||
|
builder.offset = offset
|
||||||
|
return builder
|
||||||
|
}
|
||||||
|
|
||||||
// ResourceOwner defines the resource owner (org) of the events
|
// ResourceOwner defines the resource owner (org) of the events
|
||||||
func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder {
|
func (builder *SearchQueryBuilder) ResourceOwner(resourceOwner string) *SearchQueryBuilder {
|
||||||
builder.resourceOwner = resourceOwner
|
builder.resourceOwner = resourceOwner
|
||||||
@ -317,14 +349,14 @@ func (query *SearchQuery) Builder() *SearchQueryBuilder {
|
|||||||
return query.builder
|
return query.builder
|
||||||
}
|
}
|
||||||
|
|
||||||
func (query *SearchQuery) matches(event Event) bool {
|
func (query *SearchQuery) matches(command Command) bool {
|
||||||
if ok := isAggreagteTypes(event.Aggregate(), query.aggregateTypes...); len(query.aggregateTypes) > 0 && !ok {
|
if ok := isAggregateTypes(command.Aggregate(), query.aggregateTypes...); len(query.aggregateTypes) > 0 && !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if ok := isAggregateIDs(event.Aggregate(), query.aggregateIDs...); len(query.aggregateIDs) > 0 && !ok {
|
if ok := isAggregateIDs(command.Aggregate(), query.aggregateIDs...); len(query.aggregateIDs) > 0 && !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if ok := isEventTypes(event, query.eventTypes...); len(query.eventTypes) > 0 && !ok {
|
if ok := isEventTypes(command, query.eventTypes...); len(query.eventTypes) > 0 && !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
|
@ -251,20 +251,22 @@ func assertQuery(t *testing.T, i int, want, got *SearchQuery) {
|
|||||||
|
|
||||||
func TestSearchQuery_matches(t *testing.T) {
|
func TestSearchQuery_matches(t *testing.T) {
|
||||||
type args struct {
|
type args struct {
|
||||||
event Event
|
event Command
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
query *SearchQuery
|
query *SearchQuery
|
||||||
event Event
|
event Command
|
||||||
want bool
|
want bool
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "wrong aggregate type",
|
name: "wrong aggregate type",
|
||||||
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery().AggregateTypes("searched"),
|
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery().AggregateTypes("searched"),
|
||||||
event: &BaseEvent{
|
event: &matcherCommand{
|
||||||
Agg: &Aggregate{
|
BaseEvent{
|
||||||
Type: "found",
|
Agg: &Aggregate{
|
||||||
|
Type: "found",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: false,
|
want: false,
|
||||||
@ -272,9 +274,11 @@ func TestSearchQuery_matches(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "wrong aggregate id",
|
name: "wrong aggregate id",
|
||||||
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery().AggregateIDs("1", "10", "100"),
|
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery().AggregateIDs("1", "10", "100"),
|
||||||
event: &BaseEvent{
|
event: &matcherCommand{
|
||||||
Agg: &Aggregate{
|
BaseEvent{
|
||||||
ID: "2",
|
Agg: &Aggregate{
|
||||||
|
ID: "2",
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
want: false,
|
want: false,
|
||||||
@ -282,9 +286,11 @@ func TestSearchQuery_matches(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "wrong event type",
|
name: "wrong event type",
|
||||||
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery().EventTypes("event.searched.type"),
|
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery().EventTypes("event.searched.type"),
|
||||||
event: &BaseEvent{
|
event: &matcherCommand{
|
||||||
EventType: "event.actual.type",
|
BaseEvent{
|
||||||
Agg: &Aggregate{},
|
EventType: "event.actual.type",
|
||||||
|
Agg: &Aggregate{},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
want: false,
|
want: false,
|
||||||
},
|
},
|
||||||
@ -295,26 +301,30 @@ func TestSearchQuery_matches(t *testing.T) {
|
|||||||
AggregateIDs("2").
|
AggregateIDs("2").
|
||||||
AggregateTypes("actual").
|
AggregateTypes("actual").
|
||||||
EventTypes("event.actual.type"),
|
EventTypes("event.actual.type"),
|
||||||
event: &BaseEvent{
|
event: &matcherCommand{
|
||||||
Seq: 55,
|
BaseEvent{
|
||||||
Agg: &Aggregate{
|
Seq: 55,
|
||||||
ID: "2",
|
Agg: &Aggregate{
|
||||||
Type: "actual",
|
ID: "2",
|
||||||
|
Type: "actual",
|
||||||
|
},
|
||||||
|
EventType: "event.actual.type",
|
||||||
},
|
},
|
||||||
EventType: "event.actual.type",
|
|
||||||
},
|
},
|
||||||
want: true,
|
want: true,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "matching empty query",
|
name: "matching empty query",
|
||||||
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery(),
|
query: NewSearchQueryBuilder(ColumnsEvent).AddQuery(),
|
||||||
event: &BaseEvent{
|
event: &matcherCommand{
|
||||||
Seq: 55,
|
BaseEvent{
|
||||||
Agg: &Aggregate{
|
Seq: 55,
|
||||||
ID: "2",
|
Agg: &Aggregate{
|
||||||
Type: "actual",
|
ID: "2",
|
||||||
|
Type: "actual",
|
||||||
|
},
|
||||||
|
EventType: "event.actual.type",
|
||||||
},
|
},
|
||||||
EventType: "event.actual.type",
|
|
||||||
},
|
},
|
||||||
want: true,
|
want: true,
|
||||||
},
|
},
|
||||||
@ -334,76 +344,128 @@ func TestSearchQuery_matches(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type matcherCommand struct {
|
||||||
|
BaseEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func (matcherCommand) Payload() any { return nil }
|
||||||
|
|
||||||
|
func (matcherCommand) UniqueConstraints() []*UniqueConstraint { return nil }
|
||||||
|
|
||||||
func TestSearchQueryBuilder_Matches(t *testing.T) {
|
func TestSearchQueryBuilder_Matches(t *testing.T) {
|
||||||
type args struct {
|
type args struct {
|
||||||
event Event
|
commands []Command
|
||||||
existingLen int
|
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
builder *SearchQueryBuilder
|
builder *SearchQueryBuilder
|
||||||
args args
|
args args
|
||||||
want bool
|
wantedLen int
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "sequence too high",
|
name: "sequence too high",
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent).SequenceGreater(60),
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
SequenceGreater(60),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Agg: &Aggregate{
|
&matcherCommand{
|
||||||
InstanceID: "instance",
|
BaseEvent{
|
||||||
},
|
Agg: &Aggregate{
|
||||||
Seq: 60,
|
InstanceID: "instance",
|
||||||
},
|
},
|
||||||
},
|
Seq: 60,
|
||||||
want: false,
|
},
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "limit exeeded",
|
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent).Limit(100),
|
|
||||||
args: args{
|
|
||||||
event: &BaseEvent{},
|
|
||||||
existingLen: 100,
|
|
||||||
},
|
|
||||||
want: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "wrong resource owner",
|
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent).ResourceOwner("query"),
|
|
||||||
args: args{
|
|
||||||
event: &BaseEvent{
|
|
||||||
Agg: &Aggregate{
|
|
||||||
ResourceOwner: "ro",
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
existingLen: 0,
|
|
||||||
},
|
},
|
||||||
want: false,
|
wantedLen: 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "wrong instance",
|
name: "limit exeeded",
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent).InstanceID("instance"),
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
Limit(2),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Agg: &Aggregate{
|
&matcherCommand{
|
||||||
InstanceID: "different instance",
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
existingLen: 0,
|
|
||||||
},
|
},
|
||||||
want: false,
|
wantedLen: 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "query failed",
|
name: "wrong resource owner",
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent).SequenceGreater(1000),
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
ResourceOwner("query"),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Seq: 999,
|
&matcherCommand{
|
||||||
Agg: &Aggregate{},
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
existingLen: 0,
|
|
||||||
},
|
},
|
||||||
want: false,
|
wantedLen: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "wrong instance",
|
||||||
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
InstanceID("instance"),
|
||||||
|
args: args{
|
||||||
|
commands: []Command{
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "different instance",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantedLen: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "query failed",
|
||||||
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
SequenceGreater(1000),
|
||||||
|
args: args{
|
||||||
|
commands: []Command{
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Seq: 999,
|
||||||
|
Agg: &Aggregate{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantedLen: 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "matching",
|
name: "matching",
|
||||||
@ -413,65 +475,242 @@ func TestSearchQueryBuilder_Matches(t *testing.T) {
|
|||||||
InstanceID("instance").
|
InstanceID("instance").
|
||||||
SequenceGreater(1000),
|
SequenceGreater(1000),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Agg: &Aggregate{
|
&matcherCommand{
|
||||||
ResourceOwner: "ro",
|
BaseEvent{
|
||||||
InstanceID: "instance",
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Seq: 1001,
|
|
||||||
},
|
},
|
||||||
existingLen: 999,
|
|
||||||
},
|
},
|
||||||
want: true,
|
wantedLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "matching builder resourceOwner and Instance",
|
name: "matching builder resourceOwner and Instance",
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent),
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
ResourceOwner("ro").
|
||||||
|
InstanceID("instance"),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Agg: &Aggregate{
|
&matcherCommand{
|
||||||
ResourceOwner: "ro",
|
BaseEvent{
|
||||||
InstanceID: "instance",
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro2",
|
||||||
|
InstanceID: "instance2",
|
||||||
|
},
|
||||||
|
Seq: 1002,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro2",
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1003,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
InstanceID: "instance2",
|
||||||
|
},
|
||||||
|
Seq: 1004,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Seq: 1001,
|
|
||||||
},
|
},
|
||||||
existingLen: 999,
|
|
||||||
},
|
},
|
||||||
want: true,
|
wantedLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "matching builder resourceOwner only",
|
name: "matching builder resourceOwner only",
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent),
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
ResourceOwner("ro"),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Agg: &Aggregate{
|
&matcherCommand{
|
||||||
ResourceOwner: "ro",
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
ResourceOwner: "ro2",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Seq: 1001,
|
|
||||||
},
|
},
|
||||||
existingLen: 999,
|
|
||||||
},
|
},
|
||||||
want: true,
|
wantedLen: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "matching builder instanceID only",
|
name: "matching builder instanceID only",
|
||||||
builder: NewSearchQueryBuilder(ColumnsEvent),
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
InstanceID("instance"),
|
||||||
args: args{
|
args: args{
|
||||||
event: &BaseEvent{
|
commands: []Command{
|
||||||
Agg: &Aggregate{
|
&matcherCommand{
|
||||||
InstanceID: "instance",
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance2",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Seq: 1001,
|
|
||||||
},
|
},
|
||||||
existingLen: 999,
|
|
||||||
},
|
},
|
||||||
want: true,
|
wantedLen: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "offset too high",
|
||||||
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
Offset(2),
|
||||||
|
args: args{
|
||||||
|
commands: []Command{
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantedLen: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "offset",
|
||||||
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
Offset(1),
|
||||||
|
args: args{
|
||||||
|
commands: []Command{
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1002,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantedLen: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "offset and limit",
|
||||||
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
Offset(1).
|
||||||
|
Limit(1),
|
||||||
|
args: args{
|
||||||
|
commands: []Command{
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1002,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
},
|
||||||
|
Seq: 1002,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantedLen: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "sub query",
|
||||||
|
builder: NewSearchQueryBuilder(ColumnsEvent).
|
||||||
|
AddQuery().
|
||||||
|
AggregateTypes("test").
|
||||||
|
Builder(),
|
||||||
|
args: args{
|
||||||
|
commands: []Command{
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
Type: "test",
|
||||||
|
},
|
||||||
|
Seq: 1001,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
Type: "test",
|
||||||
|
},
|
||||||
|
Seq: 1002,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&matcherCommand{
|
||||||
|
BaseEvent{
|
||||||
|
Agg: &Aggregate{
|
||||||
|
InstanceID: "instance",
|
||||||
|
Type: "test2",
|
||||||
|
},
|
||||||
|
Seq: 1003,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantedLen: 2,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
if got := tt.builder.Matches(tt.args.event, tt.args.existingLen); got != tt.want {
|
if got := tt.builder.Matches(tt.args.commands...); len(got) != tt.wantedLen {
|
||||||
t.Errorf("SearchQueryBuilder.Matches() = %v, want %v", got, tt.want)
|
t.Errorf("SearchQueryBuilder.Matches() = %v, wantted len %v", got, tt.wantedLen)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user