2024-03-14 10:56:23 +01:00
|
|
|
package projection
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
|
2024-05-04 11:55:57 +02:00
|
|
|
"github.com/zitadel/zitadel/internal/domain"
|
2024-03-14 10:56:23 +01:00
|
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
|
|
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
|
|
|
|
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
|
|
|
|
exec "github.com/zitadel/zitadel/internal/repository/execution"
|
|
|
|
"github.com/zitadel/zitadel/internal/repository/instance"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2024-05-04 11:55:57 +02:00
|
|
|
ExecutionTable = "projections.executions1"
|
|
|
|
ExecutionIDCol = "id"
|
|
|
|
ExecutionCreationDateCol = "creation_date"
|
|
|
|
ExecutionChangeDateCol = "change_date"
|
|
|
|
ExecutionInstanceIDCol = "instance_id"
|
|
|
|
ExecutionSequenceCol = "sequence"
|
|
|
|
|
|
|
|
ExecutionTargetSuffix = "targets"
|
|
|
|
ExecutionTargetExecutionIDCol = "execution_id"
|
|
|
|
ExecutionTargetInstanceIDCol = "instance_id"
|
|
|
|
ExecutionTargetPositionCol = "position"
|
|
|
|
ExecutionTargetTargetIDCol = "target_id"
|
|
|
|
ExecutionTargetIncludeCol = "include"
|
2024-03-14 10:56:23 +01:00
|
|
|
)
|
|
|
|
|
|
|
|
type executionProjection struct{}
|
|
|
|
|
|
|
|
func newExecutionProjection(ctx context.Context, config handler.Config) *handler.Handler {
|
|
|
|
return handler.NewHandler(ctx, &config, new(executionProjection))
|
|
|
|
}
|
|
|
|
|
|
|
|
func (*executionProjection) Name() string {
|
|
|
|
return ExecutionTable
|
|
|
|
}
|
|
|
|
|
|
|
|
func (*executionProjection) Init() *old_handler.Check {
|
2024-05-04 11:55:57 +02:00
|
|
|
return handler.NewMultiTableCheck(
|
2024-03-14 10:56:23 +01:00
|
|
|
handler.NewTable([]*handler.InitColumn{
|
|
|
|
handler.NewColumn(ExecutionIDCol, handler.ColumnTypeText),
|
|
|
|
handler.NewColumn(ExecutionCreationDateCol, handler.ColumnTypeTimestamp),
|
|
|
|
handler.NewColumn(ExecutionChangeDateCol, handler.ColumnTypeTimestamp),
|
|
|
|
handler.NewColumn(ExecutionSequenceCol, handler.ColumnTypeInt64),
|
2024-05-04 11:55:57 +02:00
|
|
|
handler.NewColumn(ExecutionInstanceIDCol, handler.ColumnTypeText),
|
2024-03-14 10:56:23 +01:00
|
|
|
},
|
|
|
|
handler.NewPrimaryKey(ExecutionInstanceIDCol, ExecutionIDCol),
|
|
|
|
),
|
2024-05-04 11:55:57 +02:00
|
|
|
handler.NewSuffixedTable([]*handler.InitColumn{
|
|
|
|
handler.NewColumn(ExecutionTargetInstanceIDCol, handler.ColumnTypeText),
|
|
|
|
handler.NewColumn(ExecutionTargetExecutionIDCol, handler.ColumnTypeText),
|
|
|
|
handler.NewColumn(ExecutionTargetPositionCol, handler.ColumnTypeInt64),
|
|
|
|
handler.NewColumn(ExecutionTargetIncludeCol, handler.ColumnTypeText, handler.Nullable()),
|
|
|
|
handler.NewColumn(ExecutionTargetTargetIDCol, handler.ColumnTypeText, handler.Nullable()),
|
|
|
|
},
|
|
|
|
handler.NewPrimaryKey(ExecutionTargetInstanceIDCol, ExecutionTargetExecutionIDCol, ExecutionTargetPositionCol),
|
|
|
|
ExecutionTargetSuffix,
|
|
|
|
handler.WithForeignKey(handler.NewForeignKey("execution", []string{ExecutionTargetInstanceIDCol, ExecutionTargetExecutionIDCol}, []string{ExecutionInstanceIDCol, ExecutionIDCol})),
|
|
|
|
handler.WithIndex(handler.NewIndex("execution", []string{ExecutionTargetInstanceIDCol, ExecutionTargetExecutionIDCol})),
|
|
|
|
),
|
2024-03-14 10:56:23 +01:00
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *executionProjection) Reducers() []handler.AggregateReducer {
|
|
|
|
return []handler.AggregateReducer{
|
|
|
|
{
|
|
|
|
Aggregate: exec.AggregateType,
|
|
|
|
EventReducers: []handler.EventReducer{
|
|
|
|
{
|
2024-05-04 11:55:57 +02:00
|
|
|
Event: exec.SetEventV2Type,
|
2024-03-14 10:56:23 +01:00
|
|
|
Reduce: p.reduceExecutionSet,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Event: exec.RemovedEventType,
|
|
|
|
Reduce: p.reduceExecutionRemoved,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
Aggregate: instance.AggregateType,
|
|
|
|
EventReducers: []handler.EventReducer{
|
|
|
|
{
|
|
|
|
Event: instance.InstanceRemovedEventType,
|
|
|
|
Reduce: reduceInstanceRemovedHelper(ExecutionInstanceIDCol),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *executionProjection) reduceExecutionSet(event eventstore.Event) (*handler.Statement, error) {
|
2024-05-04 11:55:57 +02:00
|
|
|
e, err := assertEvent[*exec.SetEventV2](event)
|
2024-03-14 10:56:23 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-05-04 11:55:57 +02:00
|
|
|
|
|
|
|
stmts := []func(eventstore.Event) handler.Exec{
|
|
|
|
handler.AddUpsertStatement(
|
|
|
|
[]handler.Column{
|
|
|
|
handler.NewCol(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
|
|
|
|
handler.NewCol(ExecutionIDCol, e.Aggregate().ID),
|
|
|
|
},
|
|
|
|
[]handler.Column{
|
|
|
|
handler.NewCol(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
|
|
|
|
handler.NewCol(ExecutionIDCol, e.Aggregate().ID),
|
|
|
|
handler.NewCol(ExecutionCreationDateCol, handler.OnlySetValueOnInsert(ExecutionTable, e.CreationDate())),
|
|
|
|
handler.NewCol(ExecutionChangeDateCol, e.CreationDate()),
|
|
|
|
handler.NewCol(ExecutionSequenceCol, e.Sequence()),
|
|
|
|
},
|
|
|
|
),
|
|
|
|
// cleanup execution targets to re-insert them
|
|
|
|
handler.AddDeleteStatement(
|
|
|
|
[]handler.Condition{
|
|
|
|
handler.NewCond(ExecutionTargetInstanceIDCol, e.Aggregate().InstanceID),
|
|
|
|
handler.NewCond(ExecutionTargetExecutionIDCol, e.Aggregate().ID),
|
|
|
|
},
|
|
|
|
handler.WithTableSuffix(ExecutionTargetSuffix),
|
|
|
|
),
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(e.Targets) > 0 {
|
|
|
|
for i, target := range e.Targets {
|
|
|
|
var targetStr, includeStr string
|
|
|
|
switch target.Type {
|
|
|
|
case domain.ExecutionTargetTypeTarget:
|
|
|
|
targetStr = target.Target
|
|
|
|
case domain.ExecutionTargetTypeInclude:
|
|
|
|
includeStr = target.Target
|
|
|
|
case domain.ExecutionTargetTypeUnspecified:
|
|
|
|
continue
|
|
|
|
default:
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
stmts = append(stmts,
|
|
|
|
handler.AddCreateStatement(
|
|
|
|
[]handler.Column{
|
|
|
|
handler.NewCol(ExecutionTargetInstanceIDCol, e.Aggregate().InstanceID),
|
|
|
|
handler.NewCol(ExecutionTargetExecutionIDCol, e.Aggregate().ID),
|
|
|
|
handler.NewCol(ExecutionTargetPositionCol, i+1),
|
|
|
|
handler.NewCol(ExecutionTargetIncludeCol, includeStr),
|
|
|
|
handler.NewCol(ExecutionTargetTargetIDCol, targetStr),
|
|
|
|
},
|
|
|
|
handler.WithTableSuffix(ExecutionTargetSuffix),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
}
|
2024-03-14 10:56:23 +01:00
|
|
|
}
|
2024-05-04 11:55:57 +02:00
|
|
|
|
|
|
|
return handler.NewMultiStatement(e, stmts...), nil
|
2024-03-14 10:56:23 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *executionProjection) reduceExecutionRemoved(event eventstore.Event) (*handler.Statement, error) {
|
|
|
|
e, err := assertEvent[*exec.RemovedEvent](event)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2024-05-04 11:55:57 +02:00
|
|
|
|
|
|
|
return handler.NewDeleteStatement(e,
|
2024-03-14 10:56:23 +01:00
|
|
|
[]handler.Condition{
|
|
|
|
handler.NewCond(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
|
|
|
|
handler.NewCond(ExecutionIDCol, e.Aggregate().ID),
|
|
|
|
},
|
|
|
|
), nil
|
|
|
|
}
|