feat: query side for executions and targets for actions v2 (#7524)

* feat: add projections and query side to executions and targets

* feat: add list and get endpoints for targets

* feat: add integration tests for query endpoints target and execution

* fix: linting

* fix: linting

* fix: review changes, renames and corrections

* fix: review changes, renames and corrections

* fix: review changes, renames and corrections

* fix: review changes, renames and corrections

* fix: review changes, renames and corrections

* fix: review changes, renames and corrections

* fix: remove position from list details
This commit is contained in:
Stefan Benz
2024-03-14 10:56:23 +01:00
committed by GitHub
parent 5d2cfc06d5
commit fb3c6f791b
30 changed files with 3266 additions and 326 deletions

View File

@@ -99,7 +99,7 @@ func TestActionProjection_reduces(t *testing.T) {
args: args{
event: getEvent(
testEvent(
action.ChangedEventType,
action.DeactivatedEventType,
action.AggregateType,
[]byte(`{}`),
),
@@ -131,7 +131,7 @@ func TestActionProjection_reduces(t *testing.T) {
args: args{
event: getEvent(
testEvent(
action.ChangedEventType,
action.ReactivatedEventType,
action.AggregateType,
[]byte(`{}`),
),
@@ -163,7 +163,7 @@ func TestActionProjection_reduces(t *testing.T) {
args: args{
event: getEvent(
testEvent(
action.ChangedEventType,
action.RemovedEventType,
action.AggregateType,
[]byte(`{}`),
),

View File

@@ -0,0 +1,109 @@
package projection
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
exec "github.com/zitadel/zitadel/internal/repository/execution"
"github.com/zitadel/zitadel/internal/repository/instance"
)
const (
ExecutionTable = "projections.executions"
ExecutionIDCol = "id"
ExecutionCreationDateCol = "creation_date"
ExecutionChangeDateCol = "change_date"
ExecutionResourceOwnerCol = "resource_owner"
ExecutionInstanceIDCol = "instance_id"
ExecutionSequenceCol = "sequence"
ExecutionTargetsCol = "targets"
ExecutionIncludesCol = "includes"
)
type executionProjection struct{}
func newExecutionProjection(ctx context.Context, config handler.Config) *handler.Handler {
return handler.NewHandler(ctx, &config, new(executionProjection))
}
func (*executionProjection) Name() string {
return ExecutionTable
}
func (*executionProjection) Init() *old_handler.Check {
return handler.NewTableCheck(
handler.NewTable([]*handler.InitColumn{
handler.NewColumn(ExecutionIDCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionCreationDateCol, handler.ColumnTypeTimestamp),
handler.NewColumn(ExecutionChangeDateCol, handler.ColumnTypeTimestamp),
handler.NewColumn(ExecutionResourceOwnerCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionInstanceIDCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionSequenceCol, handler.ColumnTypeInt64),
handler.NewColumn(ExecutionTargetsCol, handler.ColumnTypeTextArray, handler.Nullable()),
handler.NewColumn(ExecutionIncludesCol, handler.ColumnTypeTextArray, handler.Nullable()),
},
handler.NewPrimaryKey(ExecutionInstanceIDCol, ExecutionIDCol),
),
)
}
func (p *executionProjection) Reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{
{
Aggregate: exec.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: exec.SetEventType,
Reduce: p.reduceExecutionSet,
},
{
Event: exec.RemovedEventType,
Reduce: p.reduceExecutionRemoved,
},
},
},
{
Aggregate: instance.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: instance.InstanceRemovedEventType,
Reduce: reduceInstanceRemovedHelper(ExecutionInstanceIDCol),
},
},
},
}
}
func (p *executionProjection) reduceExecutionSet(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*exec.SetEvent](event)
if err != nil {
return nil, err
}
columns := []handler.Column{
handler.NewCol(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCol(ExecutionIDCol, e.Aggregate().ID),
handler.NewCol(ExecutionResourceOwnerCol, e.Aggregate().ResourceOwner),
handler.NewCol(ExecutionCreationDateCol, handler.OnlySetValueOnInsert(ExecutionTable, e.CreationDate())),
handler.NewCol(ExecutionChangeDateCol, e.CreationDate()),
handler.NewCol(ExecutionSequenceCol, e.Sequence()),
handler.NewCol(ExecutionTargetsCol, e.Targets),
handler.NewCol(ExecutionIncludesCol, e.Includes),
}
return handler.NewUpsertStatement(e, columns[0:2], columns), nil
}
func (p *executionProjection) reduceExecutionRemoved(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*exec.RemovedEvent](event)
if err != nil {
return nil, err
}
return handler.NewDeleteStatement(
e,
[]handler.Condition{
handler.NewCond(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCond(ExecutionIDCol, e.Aggregate().ID),
},
), nil
}

View File

@@ -0,0 +1,129 @@
package projection
import (
"testing"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
exec "github.com/zitadel/zitadel/internal/repository/execution"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/zerrors"
)
func TestExecutionProjection_reduces(t *testing.T) {
type args struct {
event func(t *testing.T) eventstore.Event
}
tests := []struct {
name string
args args
reduce func(event eventstore.Event) (*handler.Statement, error)
want wantReduce
}{
{
name: "reduceExecutionSet",
args: args{
event: getEvent(
testEvent(
exec.SetEventType,
exec.AggregateType,
[]byte(`{"targets": ["target"], "includes": ["include"]}`),
),
eventstore.GenericEventMapper[exec.SetEvent],
),
},
reduce: (&executionProjection{}).reduceExecutionSet,
want: wantReduce{
aggregateType: eventstore.AggregateType("execution"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.executions (instance_id, id, resource_owner, creation_date, change_date, sequence, targets, includes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (instance_id, id) DO UPDATE SET (resource_owner, creation_date, change_date, sequence, targets, includes) = (EXCLUDED.resource_owner, projections.executions.creation_date, EXCLUDED.change_date, EXCLUDED.sequence, EXCLUDED.targets, EXCLUDED.includes)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
"ro-id",
anyArg{},
anyArg{},
uint64(15),
[]string{"target"},
[]string{"include"},
},
},
},
},
},
},
{
name: "reduceExecutionRemoved",
args: args{
event: getEvent(
testEvent(
exec.RemovedEventType,
exec.AggregateType,
[]byte(`{}`),
),
eventstore.GenericEventMapper[exec.RemovedEvent],
),
},
reduce: (&executionProjection{}).reduceExecutionRemoved,
want: wantReduce{
aggregateType: eventstore.AggregateType("execution"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.executions WHERE (instance_id = $1) AND (id = $2)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
},
},
},
},
},
},
{
name: "reduceInstanceRemoved",
args: args{
event: getEvent(
testEvent(
instance.InstanceRemovedEventType,
instance.AggregateType,
nil,
),
instance.InstanceRemovedEventMapper,
),
},
reduce: reduceInstanceRemovedHelper(ExecutionInstanceIDCol),
want: wantReduce{
aggregateType: eventstore.AggregateType("instance"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.executions WHERE (instance_id = $1)",
expectedArgs: []interface{}{
"agg-id",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
event := baseEvent(t)
got, err := tt.reduce(event)
if ok := zerrors.IsErrorInvalidArgument(err); !ok {
t.Errorf("no wrong event mapping: %v, got: %v", err, got)
}
event = tt.args.event(t)
got, err = tt.reduce(event)
assertReduce(t, got, err, ExecutionTable, tt.want)
})
}
}

View File

@@ -74,6 +74,8 @@ var (
RestrictionsProjection *handler.Handler
SystemFeatureProjection *handler.Handler
InstanceFeatureProjection *handler.Handler
TargetProjection *handler.Handler
ExecutionProjection *handler.Handler
)
type projection interface {
@@ -152,6 +154,8 @@ func Create(ctx context.Context, sqlClient *database.DB, es handler.EventStore,
RestrictionsProjection = newRestrictionsProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["restrictions"]))
SystemFeatureProjection = newSystemFeatureProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["system_features"]))
InstanceFeatureProjection = newInstanceFeatureProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["instance_features"]))
TargetProjection = newTargetProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["targets"]))
ExecutionProjection = newExecutionProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["executions"]))
newProjectionsList()
return nil
}
@@ -263,5 +267,7 @@ func newProjectionsList() {
RestrictionsProjection,
SystemFeatureProjection,
InstanceFeatureProjection,
ExecutionProjection,
TargetProjection,
}
}

View File

@@ -0,0 +1,165 @@
package projection
import (
"context"
"github.com/zitadel/zitadel/internal/eventstore"
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/target"
)
const (
TargetTable = "projections.targets"
TargetIDCol = "id"
TargetCreationDateCol = "creation_date"
TargetChangeDateCol = "change_date"
TargetResourceOwnerCol = "resource_owner"
TargetInstanceIDCol = "instance_id"
TargetSequenceCol = "sequence"
TargetNameCol = "name"
TargetTargetType = "target_type"
TargetURLCol = "url"
TargetTimeoutCol = "timeout"
TargetAsyncCol = "async"
TargetInterruptOnErrorCol = "interrupt_on_error"
)
type targetProjection struct{}
func newTargetProjection(ctx context.Context, config handler.Config) *handler.Handler {
return handler.NewHandler(ctx, &config, new(targetProjection))
}
func (*targetProjection) Name() string {
return TargetTable
}
func (*targetProjection) Init() *old_handler.Check {
return handler.NewTableCheck(
handler.NewTable([]*handler.InitColumn{
handler.NewColumn(TargetIDCol, handler.ColumnTypeText),
handler.NewColumn(TargetCreationDateCol, handler.ColumnTypeTimestamp),
handler.NewColumn(TargetChangeDateCol, handler.ColumnTypeTimestamp),
handler.NewColumn(TargetResourceOwnerCol, handler.ColumnTypeText),
handler.NewColumn(TargetInstanceIDCol, handler.ColumnTypeText),
handler.NewColumn(TargetTargetType, handler.ColumnTypeEnum),
handler.NewColumn(TargetSequenceCol, handler.ColumnTypeInt64),
handler.NewColumn(TargetNameCol, handler.ColumnTypeText),
handler.NewColumn(TargetURLCol, handler.ColumnTypeText, handler.Default("")),
handler.NewColumn(TargetTimeoutCol, handler.ColumnTypeInt64, handler.Default(0)),
handler.NewColumn(TargetAsyncCol, handler.ColumnTypeBool, handler.Default(false)),
handler.NewColumn(TargetInterruptOnErrorCol, handler.ColumnTypeBool, handler.Default(false)),
},
handler.NewPrimaryKey(TargetInstanceIDCol, TargetIDCol),
),
)
}
func (p *targetProjection) Reducers() []handler.AggregateReducer {
return []handler.AggregateReducer{
{
Aggregate: target.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: target.AddedEventType,
Reduce: p.reduceTargetAdded,
},
{
Event: target.ChangedEventType,
Reduce: p.reduceTargetChanged,
},
{
Event: target.RemovedEventType,
Reduce: p.reduceTargetRemoved,
},
},
},
{
Aggregate: instance.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: instance.InstanceRemovedEventType,
Reduce: reduceInstanceRemovedHelper(TargetInstanceIDCol),
},
},
},
}
}
func (p *targetProjection) reduceTargetAdded(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*target.AddedEvent](event)
if err != nil {
return nil, err
}
return handler.NewCreateStatement(
e,
[]handler.Column{
handler.NewCol(TargetInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCol(TargetResourceOwnerCol, e.Aggregate().ResourceOwner),
handler.NewCol(TargetIDCol, e.Aggregate().ID),
handler.NewCol(TargetCreationDateCol, e.CreationDate()),
handler.NewCol(TargetChangeDateCol, e.CreationDate()),
handler.NewCol(TargetSequenceCol, e.Sequence()),
handler.NewCol(TargetNameCol, e.Name),
handler.NewCol(TargetURLCol, e.URL),
handler.NewCol(TargetTargetType, e.TargetType),
handler.NewCol(TargetTimeoutCol, e.Timeout),
handler.NewCol(TargetAsyncCol, e.Async),
handler.NewCol(TargetInterruptOnErrorCol, e.InterruptOnError),
},
), nil
}
func (p *targetProjection) reduceTargetChanged(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*target.ChangedEvent](event)
if err != nil {
return nil, err
}
values := []handler.Column{
handler.NewCol(TargetChangeDateCol, e.CreationDate()),
handler.NewCol(TargetSequenceCol, e.Sequence()),
handler.NewCol(TargetResourceOwnerCol, e.Aggregate().ResourceOwner),
}
if e.Name != nil {
values = append(values, handler.NewCol(TargetNameCol, *e.Name))
}
if e.TargetType != nil {
values = append(values, handler.NewCol(TargetTargetType, *e.TargetType))
}
if e.URL != nil {
values = append(values, handler.NewCol(TargetURLCol, *e.URL))
}
if e.Timeout != nil {
values = append(values, handler.NewCol(TargetTimeoutCol, *e.Timeout))
}
if e.Async != nil {
values = append(values, handler.NewCol(TargetAsyncCol, *e.Async))
}
if e.InterruptOnError != nil {
values = append(values, handler.NewCol(TargetInterruptOnErrorCol, *e.InterruptOnError))
}
return handler.NewUpdateStatement(
e,
values,
[]handler.Condition{
handler.NewCond(TargetInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCond(TargetIDCol, e.Aggregate().ID),
},
), nil
}
func (p *targetProjection) reduceTargetRemoved(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*target.RemovedEvent](event)
if err != nil {
return nil, err
}
return handler.NewDeleteStatement(
e,
[]handler.Condition{
handler.NewCond(TargetInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCond(TargetIDCol, e.Aggregate().ID),
},
), nil
}

View File

@@ -0,0 +1,173 @@
package projection
import (
"testing"
"time"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/target"
"github.com/zitadel/zitadel/internal/zerrors"
)
func TestTargetProjection_reduces(t *testing.T) {
type args struct {
event func(t *testing.T) eventstore.Event
}
tests := []struct {
name string
args args
reduce func(event eventstore.Event) (*handler.Statement, error)
want wantReduce
}{
{
name: "reduceTargetAdded",
args: args{
event: getEvent(
testEvent(
target.AddedEventType,
target.AggregateType,
[]byte(`{"name": "name", "targetType":0, "url":"https://example.com", "timeout": 3000000000, "async": true, "interruptOnError": true}`),
),
eventstore.GenericEventMapper[target.AddedEvent],
),
},
reduce: (&targetProjection{}).reduceTargetAdded,
want: wantReduce{
aggregateType: eventstore.AggregateType("target"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.targets (instance_id, resource_owner, id, creation_date, change_date, sequence, name, url, target_type, timeout, async, interrupt_on_error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
expectedArgs: []interface{}{
"instance-id",
"ro-id",
"agg-id",
anyArg{},
anyArg{},
uint64(15),
"name",
"https://example.com",
domain.TargetTypeWebhook,
3 * time.Second,
true,
true,
},
},
},
},
},
},
{
name: "reduceTargetChanged",
args: args{
event: getEvent(
testEvent(
target.ChangedEventType,
target.AggregateType,
[]byte(`{"name": "name2", "targetType":0, "url":"https://example.com", "timeout": 3000000000, "async": true, "interruptOnError": true}`),
),
eventstore.GenericEventMapper[target.ChangedEvent],
),
},
reduce: (&targetProjection{}).reduceTargetChanged,
want: wantReduce{
aggregateType: eventstore.AggregateType("target"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "UPDATE projections.targets SET (change_date, sequence, resource_owner, name, target_type, url, timeout, async, interrupt_on_error) = ($1, $2, $3, $4, $5, $6, $7, $8, $9) WHERE (instance_id = $10) AND (id = $11)",
expectedArgs: []interface{}{
anyArg{},
uint64(15),
"ro-id",
"name2",
domain.TargetTypeWebhook,
"https://example.com",
3 * time.Second,
true,
true,
"instance-id",
"agg-id",
},
},
},
},
},
},
{
name: "reduceTargetRemoved",
args: args{
event: getEvent(
testEvent(
target.RemovedEventType,
target.AggregateType,
[]byte(`{}`),
),
eventstore.GenericEventMapper[target.RemovedEvent],
),
},
reduce: (&targetProjection{}).reduceTargetRemoved,
want: wantReduce{
aggregateType: eventstore.AggregateType("target"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.targets WHERE (instance_id = $1) AND (id = $2)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
},
},
},
},
},
},
{
name: "reduceInstanceRemoved",
args: args{
event: getEvent(
testEvent(
instance.InstanceRemovedEventType,
instance.AggregateType,
nil,
),
instance.InstanceRemovedEventMapper,
),
},
reduce: reduceInstanceRemovedHelper(TargetInstanceIDCol),
want: wantReduce{
aggregateType: eventstore.AggregateType("instance"),
sequence: 15,
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.targets WHERE (instance_id = $1)",
expectedArgs: []interface{}{
"agg-id",
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
event := baseEvent(t)
got, err := tt.reduce(event)
if ok := zerrors.IsErrorInvalidArgument(err); !ok {
t.Errorf("no wrong event mapping: %v, got: %v", err, got)
}
event = tt.args.event(t)
got, err = tt.reduce(event)
assertReduce(t, got, err, TargetTable, tt.want)
})
}
}