mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 01:37:31 +00:00
feat: actions (#2377)
* feat(actions): begin api * feat(actions): begin api * api and projections * fix: handle multiple statements for a single event in projections * export func type * fix test * update to new reduce interface * flows in login * feat: jwt idp * feat: command side * feat: add tests * actions and flows * fill idp views with jwt idps and return apis * add jwtEndpoint to jwt idp * begin jwt request handling * add feature * merge * merge * handle jwt idp * cleanup * bug fixes * autoregister * get token from specific header name * fix: proto * fixes * i18n * begin tests * fix and log http proxy * remove docker cache * fixes * usergrants in actions api * tests adn cleanup * cleanup * fix add user grant * set login context * i18n Co-authored-by: fabi <fabienne.gerschwiler@gmail.com>
This commit is contained in:
104
internal/query/action.go
Normal file
104
internal/query/action.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
"github.com/caos/zitadel/internal/domain"
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
)
|
||||
|
||||
var actionsQuery = squirrel.StatementBuilder.Select("creation_date", "change_date", "resource_owner", "sequence", "id", "action_state", "name", "script", "timeout", "allowed_to_fail").
|
||||
From("zitadel.projections.actions").PlaceholderFormat(squirrel.Dollar)
|
||||
|
||||
func (q *Queries) GetAction(ctx context.Context, id string, orgID string) (*Action, error) {
|
||||
idQuery, _ := newActionIDSearchQuery(id)
|
||||
actions, err := q.SearchActions(ctx, &ActionSearchQueries{Queries: []SearchQuery{idQuery}})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(actions) != 1 {
|
||||
return nil, errors.ThrowNotFound(nil, "QUERY-dft2g", "Errors.Action.NotFound")
|
||||
}
|
||||
return actions[0], err
|
||||
}
|
||||
|
||||
func (q *Queries) SearchActions(ctx context.Context, query *ActionSearchQueries) ([]*Action, error) {
|
||||
stmt, args, err := query.ToQuery(actionsQuery).ToSql()
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInvalidArgument(err, "QUERY-wQ3by", "Errors.orgs.invalid.request")
|
||||
}
|
||||
|
||||
rows, err := q.client.QueryContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-M6mYN", "Errors.orgs.internal")
|
||||
}
|
||||
|
||||
actions := []*Action{}
|
||||
for rows.Next() {
|
||||
org := new(Action)
|
||||
rows.Scan(
|
||||
&org.CreationDate,
|
||||
&org.ChangeDate,
|
||||
&org.ResourceOwner,
|
||||
&org.Sequence,
|
||||
&org.ID,
|
||||
&org.State,
|
||||
&org.Name,
|
||||
&org.Script,
|
||||
&org.Timeout,
|
||||
&org.AllowedToFail,
|
||||
)
|
||||
actions = append(actions, org)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-pA0Wj", "Errors.actions.internal")
|
||||
}
|
||||
|
||||
return actions, nil
|
||||
}
|
||||
|
||||
type Action struct {
|
||||
ID string `col:"id"`
|
||||
CreationDate time.Time `col:"creation_date"`
|
||||
ChangeDate time.Time `col:"change_date"`
|
||||
ResourceOwner string `col:"resource_owner"`
|
||||
State domain.ActionState `col:"action_state"`
|
||||
Sequence uint64 `col:"sequence"`
|
||||
|
||||
Name string `col:"name"`
|
||||
Script string `col:"script"`
|
||||
Timeout time.Duration `col:"-"`
|
||||
AllowedToFail bool `col:"-"`
|
||||
}
|
||||
|
||||
type ActionSearchQueries struct {
|
||||
SearchRequest
|
||||
Queries []SearchQuery
|
||||
}
|
||||
|
||||
func (q *ActionSearchQueries) ToQuery(query squirrel.SelectBuilder) squirrel.SelectBuilder {
|
||||
query = q.SearchRequest.ToQuery(query)
|
||||
for _, q := range q.Queries {
|
||||
query = q.ToQuery(query)
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func NewActionResourceOwnerQuery(id string) (SearchQuery, error) {
|
||||
return NewTextQuery("resource_owner", id, TextEquals)
|
||||
}
|
||||
|
||||
func NewActionNameSearchQuery(method TextComparison, value string) (SearchQuery, error) {
|
||||
return NewTextQuery("name", value, method)
|
||||
}
|
||||
|
||||
func NewActionStateSearchQuery(value domain.ActionState) (SearchQuery, error) {
|
||||
return NewIntQuery("state", int(value), IntEquals)
|
||||
}
|
||||
|
||||
func newActionIDSearchQuery(id string) (SearchQuery, error) {
|
||||
return NewTextQuery("id", id, TextEquals)
|
||||
}
|
173
internal/query/action_flow.go
Normal file
173
internal/query/action_flow.go
Normal file
@@ -0,0 +1,173 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/squirrel"
|
||||
|
||||
"github.com/caos/zitadel/internal/domain"
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
)
|
||||
|
||||
func (q *Queries) GetActionsByFlowAndTriggerType(ctx context.Context, flowType domain.FlowType, triggerType domain.TriggerType) ([]*Action, error) {
|
||||
flowTypeQuery, _ := NewTriggerActionFlowTypeSearchQuery(flowType)
|
||||
triggerTypeQuery, _ := NewTriggerActionTriggerTypeSearchQuery(triggerType)
|
||||
return q.SearchActionsFromFlow(ctx, &TriggerActionSearchQueries{Queries: []SearchQuery{flowTypeQuery, triggerTypeQuery}})
|
||||
}
|
||||
|
||||
var triggerActionsQuery = squirrel.StatementBuilder.Select("creation_date", "change_date", "resource_owner", "sequence", "action_id", "name", "script", "trigger_type", "trigger_sequence").
|
||||
From("zitadel.projections.flows_actions_triggers").PlaceholderFormat(squirrel.Dollar)
|
||||
|
||||
func (q *Queries) SearchActionsFromFlow(ctx context.Context, query *TriggerActionSearchQueries) ([]*Action, error) {
|
||||
stmt, args, err := query.ToQuery(triggerActionsQuery).OrderBy("flow_type", "trigger_type", "trigger_sequence").ToSql()
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInvalidArgument(err, "QUERY-wQ3by", "Errors.orgs.invalid.request")
|
||||
}
|
||||
|
||||
rows, err := q.client.QueryContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-M6mYN", "Errors.orgs.internal")
|
||||
}
|
||||
|
||||
actions := []*Action{}
|
||||
for rows.Next() {
|
||||
action := new(Action)
|
||||
var triggerType domain.TriggerType
|
||||
var triggerSequence int
|
||||
rows.Scan(
|
||||
&action.CreationDate,
|
||||
&action.ChangeDate,
|
||||
&action.ResourceOwner,
|
||||
&action.Sequence,
|
||||
//&action.State, //TODO: state in next release
|
||||
&action.ID,
|
||||
&action.Name,
|
||||
&action.Script,
|
||||
&triggerType,
|
||||
&triggerSequence,
|
||||
)
|
||||
actions = append(actions, action)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-pA0Wj", "Errors.actions.internal")
|
||||
}
|
||||
|
||||
return actions, nil
|
||||
}
|
||||
|
||||
func (q *Queries) GetFlow(ctx context.Context, flowType domain.FlowType) (*Flow, error) {
|
||||
flowTypeQuery, _ := NewTriggerActionFlowTypeSearchQuery(flowType)
|
||||
return q.SearchFlow(ctx, &TriggerActionSearchQueries{Queries: []SearchQuery{flowTypeQuery}})
|
||||
}
|
||||
|
||||
func (q *Queries) SearchFlow(ctx context.Context, query *TriggerActionSearchQueries) (*Flow, error) {
|
||||
stmt, args, err := query.ToQuery(triggerActionsQuery.OrderBy("flow_type", "trigger_type", "trigger_sequence")).ToSql()
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInvalidArgument(err, "QUERY-wQ3by", "Errors.orgs.invalid.request")
|
||||
}
|
||||
|
||||
rows, err := q.client.QueryContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-M6mYN", "Errors.orgs.internal")
|
||||
}
|
||||
|
||||
flow := &Flow{
|
||||
TriggerActions: make(map[domain.TriggerType][]*Action),
|
||||
}
|
||||
for rows.Next() {
|
||||
action := new(Action)
|
||||
var triggerType domain.TriggerType
|
||||
var triggerSequence int
|
||||
rows.Scan(
|
||||
&action.CreationDate,
|
||||
&action.ChangeDate,
|
||||
&action.ResourceOwner,
|
||||
&action.Sequence,
|
||||
//&action.State, //TODO: state in next release
|
||||
&action.ID,
|
||||
&action.Name,
|
||||
&action.Script,
|
||||
&triggerType,
|
||||
&triggerSequence,
|
||||
)
|
||||
|
||||
flow.TriggerActions[triggerType] = append(flow.TriggerActions[triggerType], action)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-pA0Wj", "Errors.actions.internal")
|
||||
}
|
||||
|
||||
return flow, nil
|
||||
}
|
||||
|
||||
func (q *Queries) GetFlowTypesOfActionID(ctx context.Context, actionID string) ([]domain.FlowType, error) {
|
||||
actionIDQuery, _ := NewTriggerActionActionIDSearchQuery(actionID)
|
||||
query := &TriggerActionSearchQueries{Queries: []SearchQuery{actionIDQuery}}
|
||||
stmt, args, err := query.ToQuery(
|
||||
squirrel.StatementBuilder.
|
||||
Select("flow_type").
|
||||
From("zitadel.projections.flows_actions_triggers").
|
||||
PlaceholderFormat(squirrel.Dollar)).ToSql()
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInvalidArgument(err, "QUERY-wQ3by", "Errors.orgs.invalid.request")
|
||||
}
|
||||
|
||||
rows, err := q.client.QueryContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-M6mYN", "Errors.orgs.internal")
|
||||
}
|
||||
flowTypes := make([]domain.FlowType, 0)
|
||||
for rows.Next() {
|
||||
var flow_type domain.FlowType
|
||||
rows.Scan(
|
||||
&flow_type,
|
||||
)
|
||||
|
||||
flowTypes = append(flowTypes, flow_type)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.ThrowInternal(err, "QUERY-pA0Wj", "Errors.actions.internal")
|
||||
}
|
||||
|
||||
return flowTypes, nil
|
||||
}
|
||||
|
||||
type Flow struct {
|
||||
ID string `col:"id"`
|
||||
CreationDate time.Time `col:"creation_date"`
|
||||
ChangeDate time.Time `col:"change_date"`
|
||||
ResourceOwner string `col:"resource_owner"`
|
||||
Sequence uint64 `col:"sequence"`
|
||||
Type domain.FlowType `col:"flow_type"`
|
||||
|
||||
TriggerActions map[domain.TriggerType][]*Action
|
||||
}
|
||||
|
||||
type TriggerActionSearchQueries struct {
|
||||
SearchRequest
|
||||
Queries []SearchQuery
|
||||
}
|
||||
|
||||
func (q *TriggerActionSearchQueries) ToQuery(query squirrel.SelectBuilder) squirrel.SelectBuilder {
|
||||
query = q.SearchRequest.ToQuery(query)
|
||||
for _, q := range q.Queries {
|
||||
query = q.ToQuery(query)
|
||||
}
|
||||
return query
|
||||
}
|
||||
|
||||
func NewTriggerActionTriggerTypeSearchQuery(value domain.TriggerType) (SearchQuery, error) {
|
||||
return NewIntQuery("trigger_type", int(value), IntEquals)
|
||||
}
|
||||
|
||||
func NewTriggerActionFlowTypeSearchQuery(value domain.FlowType) (SearchQuery, error) {
|
||||
return NewIntQuery("flow_type", int(value), IntEquals)
|
||||
}
|
||||
|
||||
func NewTriggerActionActionIDSearchQuery(actionID string) (SearchQuery, error) {
|
||||
return NewTextQuery("action_id", actionID, TextEquals)
|
||||
}
|
174
internal/query/projection/action.go
Normal file
174
internal/query/projection/action.go
Normal file
@@ -0,0 +1,174 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/domain"
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler/crdb"
|
||||
"github.com/caos/zitadel/internal/repository/action"
|
||||
)
|
||||
|
||||
type ActionProjection struct {
|
||||
crdb.StatementHandler
|
||||
}
|
||||
|
||||
func NewActionProjection(ctx context.Context, config crdb.StatementHandlerConfig) *ActionProjection {
|
||||
p := &ActionProjection{}
|
||||
config.ProjectionName = "projections.actions"
|
||||
config.Reducers = p.reducers()
|
||||
p.StatementHandler = crdb.NewStatementHandler(ctx, config)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ActionProjection) reducers() []handler.AggregateReducer {
|
||||
return []handler.AggregateReducer{
|
||||
{
|
||||
Aggregate: action.AggregateType,
|
||||
EventRedusers: []handler.EventReducer{
|
||||
{
|
||||
Event: action.AddedEventType,
|
||||
Reduce: p.reduceActionAdded,
|
||||
},
|
||||
{
|
||||
Event: action.ChangedEventType,
|
||||
Reduce: p.reduceActionChanged,
|
||||
},
|
||||
{
|
||||
Event: action.DeactivatedEventType,
|
||||
Reduce: p.reduceActionDeactivated,
|
||||
},
|
||||
{
|
||||
Event: action.ReactivatedEventType,
|
||||
Reduce: p.reduceActionReactivated,
|
||||
},
|
||||
{
|
||||
Event: action.RemovedEventType,
|
||||
Reduce: p.reduceActionRemoved,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
actionIDCol = "id"
|
||||
actionCreationDateCol = "creation_date"
|
||||
actionChangeDateCol = "change_date"
|
||||
actionResourceOwnerCol = "resource_owner"
|
||||
actionStateCol = "action_state"
|
||||
actionSequenceCol = "sequence"
|
||||
actionNameCol = "name"
|
||||
actionScriptCol = "script"
|
||||
actionTimeoutCol = "timeout"
|
||||
actionAllowedToFailCol = "allowed_to_fail"
|
||||
)
|
||||
|
||||
func (p *ActionProjection) reduceActionAdded(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.AddedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-zWCk3", "seq", event.Sequence, "expectedType", action.AddedEventType).Error("was not an event")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-uYq4r", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewCreateStatement(
|
||||
e,
|
||||
[]handler.Column{
|
||||
handler.NewCol(actionIDCol, e.Aggregate().ID),
|
||||
handler.NewCol(actionCreationDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionChangeDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionResourceOwnerCol, e.Aggregate().ResourceOwner),
|
||||
handler.NewCol(actionSequenceCol, e.Sequence()),
|
||||
handler.NewCol(actionNameCol, e.Name),
|
||||
handler.NewCol(actionScriptCol, e.Script),
|
||||
handler.NewCol(actionTimeoutCol, e.Timeout),
|
||||
handler.NewCol(actionAllowedToFailCol, e.AllowedToFail),
|
||||
handler.NewCol(actionStateCol, domain.ActionStateActive),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *ActionProjection) reduceActionChanged(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.ChangedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-q4oq8", "seq", event.Sequence, "expected", action.ChangedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-Bg8oM", "reduce.wrong.event.type")
|
||||
}
|
||||
values := []handler.Column{
|
||||
handler.NewCol(actionChangeDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionSequenceCol, e.Sequence()),
|
||||
}
|
||||
if e.Name != nil {
|
||||
values = append(values, handler.NewCol(actionNameCol, *e.Name))
|
||||
}
|
||||
if e.Script != nil {
|
||||
values = append(values, handler.NewCol(actionScriptCol, *e.Script))
|
||||
}
|
||||
if e.Timeout != nil {
|
||||
values = append(values, handler.NewCol(actionTimeoutCol, *e.Timeout))
|
||||
}
|
||||
if e.AllowedToFail != nil {
|
||||
values = append(values, handler.NewCol(actionAllowedToFailCol, *e.AllowedToFail))
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
e,
|
||||
values,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(actionIDCol, e.Aggregate().ID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *ActionProjection) reduceActionDeactivated(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.DeactivatedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-1gwdc", "seq", event.Sequence, "expectedType", action.DeactivatedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-BApK4", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
e,
|
||||
[]handler.Column{
|
||||
handler.NewCol(actionChangeDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionSequenceCol, e.Sequence()),
|
||||
handler.NewCol(actionStateCol, domain.ActionStateInactive),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(actionIDCol, e.Aggregate().ID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *ActionProjection) reduceActionReactivated(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.ReactivatedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-Vjwiy", "seq", event.Sequence, "expectedType", action.ReactivatedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-o37De", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
e,
|
||||
[]handler.Column{
|
||||
handler.NewCol(actionChangeDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionSequenceCol, e.Sequence()),
|
||||
handler.NewCol(actionStateCol, domain.ActionStateActive),
|
||||
},
|
||||
[]handler.Condition{
|
||||
handler.NewCond(actionIDCol, e.Aggregate().ID),
|
||||
},
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *ActionProjection) reduceActionRemoved(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.RemovedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-79OhB", "seq", event.Sequence, "expectedType", action.RemovedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-4TbKT", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewDeleteStatement(
|
||||
e,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(actionIDCol, e.Aggregate().ID),
|
||||
},
|
||||
), nil
|
||||
}
|
184
internal/query/projection/flow/flow.go
Normal file
184
internal/query/projection/flow/flow.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package flow
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler/crdb"
|
||||
"github.com/caos/zitadel/internal/repository/action"
|
||||
"github.com/caos/zitadel/internal/repository/org"
|
||||
)
|
||||
|
||||
type FlowProjection struct {
|
||||
crdb.StatementHandler
|
||||
}
|
||||
|
||||
func NewFlowProjection(ctx context.Context, config crdb.StatementHandlerConfig) *FlowProjection {
|
||||
p := &FlowProjection{}
|
||||
config.ProjectionName = "projections.flows"
|
||||
config.Reducers = p.reducers()
|
||||
p.StatementHandler = crdb.NewStatementHandler(ctx, config)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *FlowProjection) reducers() []handler.AggregateReducer {
|
||||
return []handler.AggregateReducer{
|
||||
{
|
||||
Aggregate: org.AggregateType,
|
||||
EventRedusers: []handler.EventReducer{
|
||||
{
|
||||
Event: org.TriggerActionsSetEventType,
|
||||
Reduce: p.reduceTriggerActionsSetEventType,
|
||||
},
|
||||
{
|
||||
Event: org.FlowClearedEventType,
|
||||
Reduce: p.reduceFlowClearedEventType,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Aggregate: action.AggregateType,
|
||||
EventRedusers: []handler.EventReducer{
|
||||
{
|
||||
Event: action.AddedEventType,
|
||||
Reduce: p.reduceFlowActionAdded,
|
||||
},
|
||||
{
|
||||
Event: action.ChangedEventType,
|
||||
Reduce: p.reduceFlowActionChanged,
|
||||
},
|
||||
{
|
||||
Event: action.RemovedEventType,
|
||||
Reduce: p.reduceFlowActionRemoved,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
triggerTableSuffix = "triggers"
|
||||
flowTypeCol = "flow_type"
|
||||
flowTriggerTypeCol = "trigger_type"
|
||||
flowResourceOwnerCol = "resource_owner"
|
||||
flowActionTriggerSequenceCol = "trigger_sequence"
|
||||
flowActionIDCol = "action_id"
|
||||
|
||||
actionTableSuffix = "actions"
|
||||
actionIDCol = "id"
|
||||
actionCreationDateCol = "creation_date"
|
||||
actionChangeDateCol = "change_date"
|
||||
actionResourceOwnerCol = "resource_owner"
|
||||
actionSequenceCol = "sequence"
|
||||
actionNameCol = "name"
|
||||
actionScriptCol = "script"
|
||||
)
|
||||
|
||||
func (p *FlowProjection) reduceTriggerActionsSetEventType(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*org.TriggerActionsSetEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-zWCk3", "seq", event.Sequence, "expectedType", action.AddedEventType).Error("was not an trigger actions set event")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-uYq4r", "reduce.wrong.event.type")
|
||||
}
|
||||
stmts := make([]func(reader eventstore.EventReader) crdb.Exec, len(e.ActionIDs)+1)
|
||||
stmts[0] = crdb.AddDeleteStatement(
|
||||
[]handler.Condition{
|
||||
handler.NewCond(flowTypeCol, e.FlowType),
|
||||
handler.NewCond(flowTriggerTypeCol, e.TriggerType),
|
||||
},
|
||||
crdb.WithTableSuffix(triggerTableSuffix),
|
||||
)
|
||||
for i, id := range e.ActionIDs {
|
||||
stmts[i+1] = crdb.AddCreateStatement(
|
||||
[]handler.Column{
|
||||
handler.NewCol(flowResourceOwnerCol, e.Aggregate().ResourceOwner),
|
||||
handler.NewCol(flowTypeCol, e.FlowType),
|
||||
handler.NewCol(flowTriggerTypeCol, e.TriggerType),
|
||||
handler.NewCol(flowActionIDCol, id),
|
||||
handler.NewCol(flowActionTriggerSequenceCol, i),
|
||||
},
|
||||
crdb.WithTableSuffix(triggerTableSuffix),
|
||||
)
|
||||
}
|
||||
return crdb.NewMultiStatement(e, stmts...), nil
|
||||
}
|
||||
|
||||
func (p *FlowProjection) reduceFlowClearedEventType(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*org.FlowClearedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-zWCk3", "seq", event.Sequence, "expectedType", action.AddedEventType).Error("was not an trigger actions set event")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-uYq4r", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewDeleteStatement(
|
||||
e,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(flowTypeCol, e.FlowType),
|
||||
},
|
||||
crdb.WithTableSuffix(triggerTableSuffix),
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *FlowProjection) reduceFlowActionAdded(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.AddedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-zWCk3", "seq", event.Sequence, "expectedType", action.AddedEventType).Error("was not an flow action added event")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-uYq4r", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewCreateStatement(
|
||||
e,
|
||||
[]handler.Column{
|
||||
handler.NewCol(actionIDCol, e.Aggregate().ID),
|
||||
handler.NewCol(actionCreationDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionChangeDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionResourceOwnerCol, e.Aggregate().ResourceOwner),
|
||||
handler.NewCol(actionSequenceCol, e.Sequence()),
|
||||
handler.NewCol(actionNameCol, e.Name),
|
||||
handler.NewCol(actionScriptCol, e.Script),
|
||||
},
|
||||
crdb.WithTableSuffix(actionTableSuffix),
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *FlowProjection) reduceFlowActionChanged(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.ChangedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-q4oq8", "seq", event.Sequence, "expected", action.ChangedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-Bg8oM", "reduce.wrong.event.type")
|
||||
}
|
||||
values := []handler.Column{
|
||||
handler.NewCol(actionChangeDateCol, e.CreationDate()),
|
||||
handler.NewCol(actionSequenceCol, e.Sequence()),
|
||||
}
|
||||
if e.Name != nil {
|
||||
values = append(values, handler.NewCol(actionNameCol, *e.Name))
|
||||
}
|
||||
if e.Script != nil {
|
||||
values = append(values, handler.NewCol(actionScriptCol, *e.Script))
|
||||
}
|
||||
return crdb.NewUpdateStatement(
|
||||
e,
|
||||
values,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(actionIDCol, e.Aggregate().ID),
|
||||
},
|
||||
crdb.WithTableSuffix(actionTableSuffix),
|
||||
), nil
|
||||
}
|
||||
|
||||
func (p *FlowProjection) reduceFlowActionRemoved(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*action.RemovedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-79OhB", "seq", event.Sequence, "expectedType", action.RemovedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-4TbKT", "reduce.wrong.event.type")
|
||||
}
|
||||
return crdb.NewDeleteStatement(
|
||||
e,
|
||||
[]handler.Condition{
|
||||
handler.NewCond(actionIDCol, e.Aggregate().ID),
|
||||
},
|
||||
crdb.WithTableSuffix(actionTableSuffix),
|
||||
), nil
|
||||
}
|
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler/crdb"
|
||||
"github.com/caos/zitadel/internal/query/projection/org/owner"
|
||||
"github.com/caos/zitadel/internal/query/projection/flow"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -37,9 +37,12 @@ func Start(ctx context.Context, es *eventstore.Eventstore, config Config) error
|
||||
BulkLimit: config.BulkLimit,
|
||||
}
|
||||
|
||||
NewOrgProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["orgs"]))
|
||||
NewProjectProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["projects"]))
|
||||
owner.NewOrgOwnerProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["org_owners"]))
|
||||
// turned off for this release
|
||||
//NewOrgProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["orgs"]))
|
||||
//NewProjectProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["projects"]))
|
||||
//owner.NewOrgOwnerProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["org_owners"]))
|
||||
NewActionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["actions"]))
|
||||
flow.NewFlowProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["flows"]))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -2,6 +2,7 @@ package query
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
sd "github.com/caos/zitadel/internal/config/systemdefaults"
|
||||
"github.com/caos/zitadel/internal/config/types"
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
iam_model "github.com/caos/zitadel/internal/iam/model"
|
||||
"github.com/caos/zitadel/internal/id"
|
||||
"github.com/caos/zitadel/internal/query/projection"
|
||||
"github.com/caos/zitadel/internal/repository/action"
|
||||
iam_repo "github.com/caos/zitadel/internal/repository/iam"
|
||||
"github.com/caos/zitadel/internal/repository/org"
|
||||
"github.com/caos/zitadel/internal/repository/project"
|
||||
@@ -22,6 +24,8 @@ type Queries struct {
|
||||
eventstore *eventstore.Eventstore
|
||||
idGenerator id.Generator
|
||||
secretCrypto crypto.Crypto
|
||||
|
||||
client *sql.DB
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -29,26 +33,32 @@ type Config struct {
|
||||
}
|
||||
|
||||
func StartQueries(ctx context.Context, es *eventstore.Eventstore, projections projection.Config, defaults sd.SystemDefaults) (repo *Queries, err error) {
|
||||
sqlClient, err := projections.CRDB.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repo = &Queries{
|
||||
iamID: defaults.IamID,
|
||||
eventstore: es,
|
||||
idGenerator: id.SonyFlakeGenerator,
|
||||
client: sqlClient,
|
||||
}
|
||||
iam_repo.RegisterEventMappers(repo.eventstore)
|
||||
usr_repo.RegisterEventMappers(repo.eventstore)
|
||||
org.RegisterEventMappers(repo.eventstore)
|
||||
project.RegisterEventMappers(repo.eventstore)
|
||||
action.RegisterEventMappers(repo.eventstore)
|
||||
|
||||
repo.secretCrypto, err = crypto.NewAESCrypto(defaults.IDPConfigVerificationKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// turned off for this release
|
||||
// err = projection.Start(ctx, es, projections)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
err = projection.Start(ctx, es, projections)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return repo, nil
|
||||
}
|
||||
|
149
internal/query/search_query.go
Normal file
149
internal/query/search_query.go
Normal file
@@ -0,0 +1,149 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
)
|
||||
|
||||
type SearchRequest struct {
|
||||
Offset uint64
|
||||
Limit uint64
|
||||
SortingColumn string
|
||||
Asc bool
|
||||
}
|
||||
|
||||
func (req *SearchRequest) ToQuery(query sq.SelectBuilder) sq.SelectBuilder {
|
||||
if req.Offset > 0 {
|
||||
query = query.Offset(req.Offset)
|
||||
}
|
||||
if req.Limit > 0 {
|
||||
query = query.Limit(req.Limit)
|
||||
}
|
||||
|
||||
if req.SortingColumn != "" {
|
||||
clause := "LOWER(?)"
|
||||
if !req.Asc {
|
||||
clause += " DESC"
|
||||
}
|
||||
query.OrderByClause(clause, req.SortingColumn)
|
||||
}
|
||||
|
||||
return query
|
||||
}
|
||||
|
||||
const sqlPlaceholder = "?"
|
||||
|
||||
type SearchQuery interface {
|
||||
ToQuery(sq.SelectBuilder) sq.SelectBuilder
|
||||
}
|
||||
|
||||
type TextQuery struct {
|
||||
Column string
|
||||
Text string
|
||||
Compare TextComparison
|
||||
}
|
||||
|
||||
func NewTextQuery(column, value string, compare TextComparison) (*TextQuery, error) {
|
||||
if compare < 0 || compare >= textMax {
|
||||
return nil, errors.New("invalid compare")
|
||||
}
|
||||
if column == "" {
|
||||
return nil, errors.New("missing column")
|
||||
}
|
||||
return &TextQuery{
|
||||
Column: column,
|
||||
Text: value,
|
||||
Compare: compare,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *TextQuery) ToQuery(query sq.SelectBuilder) sq.SelectBuilder {
|
||||
query = query.Where(q.comp())
|
||||
return query
|
||||
}
|
||||
|
||||
func (s *TextQuery) comp() map[string]interface{} {
|
||||
switch s.Compare {
|
||||
case TextEquals:
|
||||
return sq.Eq{s.Column: s.Text}
|
||||
case TextEqualsIgnore:
|
||||
return sq.Eq{"LOWER(" + s.Column + ")": strings.ToLower(s.Text)}
|
||||
case TextStartsWith:
|
||||
return sq.Like{s.Column: s.Text + sqlPlaceholder}
|
||||
case TextStartsWithIgnore:
|
||||
return sq.Like{"LOWER(" + s.Column + ")": strings.ToLower(s.Text) + sqlPlaceholder}
|
||||
case TextEndsWith:
|
||||
return sq.Like{s.Column: sqlPlaceholder + s.Text}
|
||||
case TextEndsWithIgnore:
|
||||
return sq.Like{"LOWER(" + s.Column + ")": sqlPlaceholder + strings.ToLower(s.Text)}
|
||||
case TextContains:
|
||||
return sq.Like{s.Column: sqlPlaceholder + s.Text + sqlPlaceholder}
|
||||
case TextContainsIgnore:
|
||||
return sq.Like{"LOWER(" + s.Column + ")": sqlPlaceholder + strings.ToLower(s.Text) + sqlPlaceholder}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type TextComparison int
|
||||
|
||||
const (
|
||||
TextEquals TextComparison = iota
|
||||
TextEqualsIgnore
|
||||
TextStartsWith
|
||||
TextStartsWithIgnore
|
||||
TextEndsWith
|
||||
TextEndsWithIgnore
|
||||
TextContains
|
||||
TextContainsIgnore
|
||||
|
||||
textMax
|
||||
)
|
||||
|
||||
type IntQuery struct {
|
||||
Column string
|
||||
Int int
|
||||
Compare IntComparison
|
||||
}
|
||||
|
||||
func NewIntQuery(column string, value int, compare IntComparison) (*IntQuery, error) {
|
||||
if compare < 0 || compare >= intMax {
|
||||
return nil, errors.New("invalid compare")
|
||||
}
|
||||
if column == "" {
|
||||
return nil, errors.New("missing column")
|
||||
}
|
||||
return &IntQuery{
|
||||
Column: column,
|
||||
Int: value,
|
||||
Compare: compare,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *IntQuery) ToQuery(query sq.SelectBuilder) sq.SelectBuilder {
|
||||
query = query.Where(q.comp())
|
||||
return query
|
||||
}
|
||||
|
||||
func (s *IntQuery) comp() sq.Sqlizer {
|
||||
switch s.Compare {
|
||||
case IntEquals:
|
||||
return sq.Eq{s.Column: s.Int}
|
||||
case IntGreater:
|
||||
return sq.Gt{s.Column: s.Int}
|
||||
case IntLess:
|
||||
return sq.Lt{s.Column: s.Int}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type IntComparison int
|
||||
|
||||
const (
|
||||
IntEquals IntComparison = iota
|
||||
IntGreater
|
||||
IntLess
|
||||
|
||||
intMax
|
||||
)
|
Reference in New Issue
Block a user