zitadel/internal/query/projection/projection.go
Livio Amstutz ed80a8bb1e
feat: actions (#2377)
* feat(actions): begin api

* feat(actions): begin api

* api and projections

* fix: handle multiple statements for a single event in projections

* export func type

* fix test

* update to new reduce interface

* flows in login

* feat: jwt idp

* feat: command side

* feat: add tests

* actions and flows

* fill idp views with jwt idps and return apis

* add jwtEndpoint to jwt idp

* begin jwt request handling

* add feature

* merge

* merge

* handle jwt idp

* cleanup

* bug fixes

* autoregister

* get token from specific header name

* fix: proto

* fixes

* i18n

* begin tests

* fix and log http proxy

* remove docker cache

* fixes

* usergrants in actions api

* tests adn cleanup

* cleanup

* fix add user grant

* set login context

* i18n

Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
2021-09-27 13:43:49 +02:00

65 lines
2.1 KiB
Go

package projection
import (
"context"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/handler"
"github.com/caos/zitadel/internal/eventstore/handler/crdb"
"github.com/caos/zitadel/internal/query/projection/flow"
)
const (
currentSeqTable = "projections.current_sequences"
locksTable = "projections.locks"
failedEventsTable = "projections.failed_events"
)
func Start(ctx context.Context, es *eventstore.Eventstore, config Config) error {
sqlClient, err := config.CRDB.Start()
if err != nil {
return err
}
projectionConfig := crdb.StatementHandlerConfig{
ProjectionHandlerConfig: handler.ProjectionHandlerConfig{
HandlerConfig: handler.HandlerConfig{
Eventstore: es,
},
RequeueEvery: config.RequeueEvery.Duration,
RetryFailedAfter: config.RetryFailedAfter.Duration,
},
Client: sqlClient,
SequenceTable: currentSeqTable,
LockTable: locksTable,
FailedEventsTable: failedEventsTable,
MaxFailureCount: config.MaxFailureCount,
BulkLimit: config.BulkLimit,
}
// turned off for this release
//NewOrgProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["orgs"]))
//NewProjectProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["projects"]))
//owner.NewOrgOwnerProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["org_owners"]))
NewActionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["actions"]))
flow.NewFlowProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["flows"]))
return nil
}
func applyCustomConfig(config crdb.StatementHandlerConfig, customConfig CustomConfig) crdb.StatementHandlerConfig {
if customConfig.BulkLimit != nil {
config.BulkLimit = *customConfig.BulkLimit
}
if customConfig.MaxFailureCount != nil {
config.MaxFailureCount = *customConfig.MaxFailureCount
}
if customConfig.RequeueEvery != nil {
config.RequeueEvery = customConfig.RequeueEvery.Duration
}
if customConfig.RetryFailedAfter != nil {
config.RetryFailedAfter = customConfig.RetryFailedAfter.Duration
}
return config
}