reduce milestone pushed

This commit is contained in:
Elio Bischof
2023-06-28 11:35:22 +02:00
parent 51a9a54cfd
commit a14871ce46
13 changed files with 262 additions and 353 deletions

View File

@@ -52,7 +52,7 @@ type StatementHandler struct {
bulkLimit uint64
subscribe bool
reduceScheduledPseudoEvent bool
}
func NewStatementHandler(
@@ -61,37 +61,40 @@ func NewStatementHandler(
) StatementHandler {
aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers))
reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers))
subscribe := true
reduceScheduledPseudoEvent := false
for _, aggReducer := range config.Reducers {
aggregateTypes = append(aggregateTypes, aggReducer.Aggregate)
if aggReducer.Aggregate == pseudo.AggregateType {
subscribe = false
reduceScheduledPseudoEvent = true
if len(config.Reducers) != 1 ||
len(aggReducer.EventRedusers) != 1 ||
aggReducer.EventRedusers[0].Event != pseudo.ScheduledEventType {
panic("if a pseudo.AggregateType is reduced, exactly one event reducer for pseudo.ScheduledEventType is supported and no other aggregate can be reduced")
}
}
for _, eventReducer := range aggReducer.EventRedusers {
if eventReducer.Event == pseudo.TimestampEventType {
subscribe = false
}
reduces[eventReducer.Event] = eventReducer.Reduce
}
}
h := StatementHandler{
client: config.Client,
sequenceTable: config.SequenceTable,
maxFailureCount: config.MaxFailureCount,
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable),
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
aggregates: aggregateTypes,
reduces: reduces,
bulkLimit: config.BulkLimit,
Locker: NewLocker(config.Client.DB, config.LockTable, config.ProjectionName),
initCheck: config.InitCheck,
initialized: make(chan bool),
client: config.Client,
sequenceTable: config.SequenceTable,
maxFailureCount: config.MaxFailureCount,
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable),
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
aggregates: aggregateTypes,
reduces: reduces,
bulkLimit: config.BulkLimit,
Locker: NewLocker(config.Client.DB, config.LockTable, config.ProjectionName),
initCheck: config.InitCheck,
initialized: make(chan bool),
reduceScheduledPseudoEvent: reduceScheduledPseudoEvent,
}
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized, subscribe)
h.ProjectionHandler = handler.NewProjectionHandler(ctx, config.ProjectionHandlerConfig, h.reduce, h.Update, h.SearchQuery, h.Lock, h.Unlock, h.initialized, reduceScheduledPseudoEvent)
return h
}
@@ -99,10 +102,9 @@ func NewStatementHandler(
func (h *StatementHandler) Start() {
h.initialized <- true
close(h.initialized)
if !h.subscribe {
return
if h.reduceScheduledPseudoEvent {
h.Subscribe(h.aggregates...)
}
h.Subscribe(h.aggregates...)
}
func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string) (*eventstore.SearchQueryBuilder, uint64, error) {
@@ -111,7 +113,12 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string
return nil, 0, err
}
queryBuilder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).Limit(h.bulkLimit).AllowTimeTravel()
bulkLimit := h.bulkLimit
if h.reduceScheduledPseudoEvent {
bulkLimit = 1
}
queryBuilder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).Limit(bulkLimit).AllowTimeTravel()
for _, aggregateType := range h.aggregates {
for _, instanceID := range instanceIDs {
@@ -124,13 +131,18 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string
}
queryBuilder.
AddQuery().
AggregateTypes(aggregateType).
SequenceGreater(seq).
InstanceID(instanceID)
if !h.reduceScheduledPseudoEvent {
queryBuilder.
AddQuery().
AggregateTypes(aggregateType)
}
}
}
return queryBuilder, h.bulkLimit, nil
return queryBuilder, bulkLimit, nil
}
// Update implements handler.Update

View File

@@ -51,19 +51,20 @@ type NowFunc func() time.Time
type ProjectionHandler struct {
Handler
ProjectionName string
reduce Reduce
update Update
searchQuery SearchQuery
triggerProjection *time.Timer
lock Lock
unlock Unlock
requeueAfter time.Duration
retryFailedAfter time.Duration
retries int
concurrentInstances int
handleActiveInstances time.Duration
nowFunc NowFunc
ProjectionName string
reduce Reduce
update Update
searchQuery SearchQuery
triggerProjection *time.Timer
lock Lock
unlock Unlock
requeueAfter time.Duration
retryFailedAfter time.Duration
retries int
concurrentInstances int
handleActiveInstances time.Duration
nowFunc NowFunc
reduceScheduledPseudoEvent bool
}
func NewProjectionHandler(
@@ -75,32 +76,33 @@ func NewProjectionHandler(
lock Lock,
unlock Unlock,
initialized <-chan bool,
subscribe bool,
reduceScheduledPseudoEvent bool,
) *ProjectionHandler {
concurrentInstances := int(config.ConcurrentInstances)
if concurrentInstances < 1 {
concurrentInstances = 1
}
h := &ProjectionHandler{
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
reduce: reduce,
update: update,
searchQuery: query,
lock: lock,
unlock: unlock,
requeueAfter: config.RequeueEvery,
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
retryFailedAfter: config.RetryFailedAfter,
retries: int(config.Retries),
concurrentInstances: concurrentInstances,
handleActiveInstances: config.HandleActiveInstances,
nowFunc: time.Now,
Handler: NewHandler(config.HandlerConfig),
ProjectionName: config.ProjectionName,
reduce: reduce,
update: update,
searchQuery: query,
lock: lock,
unlock: unlock,
requeueAfter: config.RequeueEvery,
triggerProjection: time.NewTimer(0), // first trigger is instant on startup
retryFailedAfter: config.RetryFailedAfter,
retries: int(config.Retries),
concurrentInstances: concurrentInstances,
handleActiveInstances: config.HandleActiveInstances,
nowFunc: time.Now,
reduceScheduledPseudoEvent: reduceScheduledPseudoEvent,
}
go func() {
<-initialized
if subscribe {
if h.reduceScheduledPseudoEvent {
go h.subscribe(ctx)
}
go h.schedule(ctx)
@@ -116,9 +118,6 @@ func (h *ProjectionHandler) Trigger(ctx context.Context, instances ...string) er
if len(instances) > 0 {
ids = instances
}
if h.searchQuery == nil {
return h.processTimestamp(ctx, ids...)
}
return h.processEvents(ctx, ids...)
}
@@ -141,11 +140,6 @@ func (h *ProjectionHandler) processEvents(ctx context.Context, ids ...string) er
}
}
func (h *ProjectionHandler) processTimestamp(ctx context.Context, instances ...string) error {
_, err := h.Process(ctx, pseudo.NewTimestampEvent(h.nowFunc(), instances...))
return err
}
// Process handles multiple events by reducing them to statements and updating the projection
func (h *ProjectionHandler) Process(ctx context.Context, events ...eventstore.Event) (index int, err error) {
if len(events) == 0 {
@@ -182,6 +176,9 @@ func (h *ProjectionHandler) FetchEvents(ctx context.Context, instances ...string
if err != nil {
return nil, false, err
}
if h.reduceScheduledPseudoEvent {
events[0] = pseudo.NewScheduledEvent(ctx, time.Now(), instances...)
}
return events, int(eventsLimit) == len(events), err
}