feat: add action v2 execution on requests and responses (#7637)

* feat: add execution of targets to grpc calls

* feat: add execution of targets to grpc calls

* feat: add execution of targets to grpc calls

* feat: add execution of targets to grpc calls

* feat: add execution of targets to grpc calls

* feat: add execution of targets to grpc calls

* feat: add execution of targets to grpc calls

* feat: split request and response logic to handle the different context information

* feat: split request and response logic to handle the different context information

* fix: integration test

* fix: import alias

* fix: refactor execution package

* fix: refactor execution interceptor integration and unit tests

* fix: refactor execution interceptor integration and unit tests

* fix: refactor execution interceptor integration and unit tests

* fix: refactor execution interceptor integration and unit tests

* fix: refactor execution interceptor integration and unit tests

* docs: basic documentation for executions and targets

* fix: change order for interceptors

* fix: merge back origin/main

* fix: change target definition command and query side (#7735)

* fix: change target definition command and query side

* fix: correct refactoring name changes

* fix: correct refactoring name changes

* fix: changing execution defintion with target list and type

* fix: changing execution definition with target list and type

* fix: add back search queries for target and include

* fix: projections change for execution with targets suffix table

* fix: projections change for execution with targets suffix table

* fix: projections change for execution with targets suffix table

* fix: projections change for execution with targets suffix table

* fix: projections change for execution with targets suffix table

* fix: projections change for execution with targets suffix table

* fix: projections change for execution with targets suffix table

* docs: add example to actions v2

* docs: add example to actions v2

* fix: correct integration tests on query for executions

* fix: add separate event for execution v2 as content changed

* fix: add separate event for execution v2 as content changed

* fix: added review comment changes

* fix: added review comment changes

* fix: added review comment changes

---------

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>

* fix: added review comment changes

* fix: added review comment changes

* Update internal/api/grpc/server/middleware/execution_interceptor.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* fix: added review comment changes

* fix: added review comment changes

* fix: added review comment changes

* fix: added review comment changes

* fix: added review comment changes

* fix: added review comment changes

---------

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
Co-authored-by: Elio Bischof <elio@zitadel.com>
This commit is contained in:
Stefan Benz
2024-05-04 11:55:57 +02:00
committed by GitHub
parent 7e345444bf
commit 1c5ecba42a
67 changed files with 4397 additions and 1556 deletions

View File

@@ -3,7 +3,10 @@ package query
import (
"context"
"database/sql"
_ "embed"
"encoding/json"
"errors"
"time"
sq "github.com/Masterminds/squirrel"
@@ -11,6 +14,8 @@ import (
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/query/projection"
exec "github.com/zitadel/zitadel/internal/repository/execution"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/zerrors"
)
@@ -23,18 +28,10 @@ var (
name: projection.ExecutionIDCol,
table: executionTable,
}
ExecutionColumnCreationDate = Column{
name: projection.ExecutionCreationDateCol,
table: executionTable,
}
ExecutionColumnChangeDate = Column{
name: projection.ExecutionChangeDateCol,
table: executionTable,
}
ExecutionColumnResourceOwner = Column{
name: projection.ExecutionResourceOwnerCol,
table: executionTable,
}
ExecutionColumnInstanceID = Column{
name: projection.ExecutionInstanceIDCol,
table: executionTable,
@@ -43,14 +40,33 @@ var (
name: projection.ExecutionSequenceCol,
table: executionTable,
}
ExecutionColumnTargets = Column{
name: projection.ExecutionTargetsCol,
table: executionTable,
executionTargetsTable = table{
name: projection.ExecutionTable + "_" + projection.ExecutionTargetSuffix,
instanceIDCol: projection.ExecutionTargetInstanceIDCol,
}
ExecutionColumnIncludes = Column{
name: projection.ExecutionIncludesCol,
table: executionTable,
executionTargetsTableAlias = executionTargetsTable.setAlias("execution_targets")
ExecutionTargetsColumnInstanceID = Column{
name: projection.ExecutionTargetInstanceIDCol,
table: executionTargetsTableAlias,
}
ExecutionTargetsColumnExecutionID = Column{
name: projection.ExecutionTargetExecutionIDCol,
table: executionTargetsTableAlias,
}
executionTargetsListCol = Column{
name: "targets",
table: executionTargetsTableAlias,
}
)
var (
//go:embed execution_targets.sql
executionTargetsQuery string
//go:embed targets_by_execution_id.sql
TargetsByExecutionIDQuery string
//go:embed targets_by_execution_ids.sql
TargetsByExecutionIDsQuery string
)
type Executions struct {
@@ -66,8 +82,7 @@ type Execution struct {
ID string
domain.ObjectDetails
Targets database.TextArray[string]
Includes database.TextArray[string]
Targets []*exec.Target
}
type ExecutionSearchQueries struct {
@@ -108,84 +123,301 @@ func NewExecutionTypeSearchQuery(t domain.ExecutionType) (SearchQuery, error) {
return NewTextQuery(ExecutionColumnID, t.String(), TextStartsWith)
}
func NewExecutionTargetSearchQuery(value string) (SearchQuery, error) {
return NewTextQuery(ExecutionColumnTargets, value, TextListContains)
func NewTargetSearchQuery(target string) (SearchQuery, error) {
data, err := targetItemJSONB(domain.ExecutionTargetTypeTarget, target)
if err != nil {
return nil, err
}
return NewListContains(executionTargetsListCol, data)
}
func NewExecutionIncludeSearchQuery(value string) (SearchQuery, error) {
return NewTextQuery(ExecutionColumnIncludes, value, TextListContains)
func NewIncludeSearchQuery(include string) (SearchQuery, error) {
data, err := targetItemJSONB(domain.ExecutionTargetTypeInclude, include)
if err != nil {
return nil, err
}
return NewListContains(executionTargetsListCol, data)
}
func prepareExecutionsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(rows *sql.Rows) (*Executions, error)) {
return sq.Select(
ExecutionColumnID.identifier(),
ExecutionColumnChangeDate.identifier(),
ExecutionColumnResourceOwner.identifier(),
ExecutionColumnSequence.identifier(),
ExecutionColumnTargets.identifier(),
ExecutionColumnIncludes.identifier(),
countColumn.identifier(),
).From(executionTable.identifier()).
PlaceholderFormat(sq.Dollar),
func(rows *sql.Rows) (*Executions, error) {
executions := make([]*Execution, 0)
var count uint64
for rows.Next() {
execution := new(Execution)
err := rows.Scan(
&execution.ID,
&execution.EventDate,
&execution.ResourceOwner,
&execution.Sequence,
&execution.Targets,
&execution.Includes,
&count,
)
if err != nil {
return nil, err
}
executions = append(executions, execution)
}
// marshall executionTargets into the same JSONB structure as in the SQL queries
func targetItemJSONB(t domain.ExecutionTargetType, targetItem string) ([]byte, error) {
var target *executionTarget
switch t {
case domain.ExecutionTargetTypeTarget:
target = &executionTarget{Target: targetItem}
case domain.ExecutionTargetTypeInclude:
target = &executionTarget{Include: targetItem}
case domain.ExecutionTargetTypeUnspecified:
return nil, nil
default:
return nil, nil
}
return json.Marshal([]*executionTarget{target})
}
if err := rows.Close(); err != nil {
return nil, zerrors.ThrowInternal(err, "QUERY-72xfx5jlj7", "Errors.Query.CloseRows")
}
// TargetsByExecutionID query list of targets for best match of a list of IDs, for example:
// [ "request/zitadel.action.v3alpha.ActionService/GetTargetByID",
// "request/zitadel.action.v3alpha.ActionService",
// "request" ]
func (q *Queries) TargetsByExecutionID(ctx context.Context, ids []string) (execution []*ExecutionTarget, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.End() }()
return &Executions{
Executions: executions,
SearchResponse: SearchResponse{
Count: count,
},
}, nil
}
instanceID := authz.GetInstance(ctx).InstanceID()
if instanceID == "" {
return nil, nil
}
err = q.client.QueryContext(ctx,
func(rows *sql.Rows) error {
execution, err = scanExecutionTargets(rows)
return err
},
TargetsByExecutionIDQuery,
instanceID,
database.TextArray[string](ids),
)
return execution, err
}
// TargetsByExecutionIDs query list of targets for best matches of 2 separate lists of IDs, combined for performance, for example:
// [ "request/zitadel.action.v3alpha.ActionService/GetTargetByID",
// "request/zitadel.action.v3alpha.ActionService",
// "request" ]
// and
// [ "response/zitadel.action.v3alpha.ActionService/GetTargetByID",
// "response/zitadel.action.v3alpha.ActionService",
// "response" ]
func (q *Queries) TargetsByExecutionIDs(ctx context.Context, ids1, ids2 []string) (execution []*ExecutionTarget, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.End() }()
instanceID := authz.GetInstance(ctx).InstanceID()
if instanceID == "" {
return nil, nil
}
err = q.client.QueryContext(ctx,
func(rows *sql.Rows) error {
execution, err = scanExecutionTargets(rows)
return err
},
TargetsByExecutionIDsQuery,
instanceID,
database.TextArray[string](ids1),
database.TextArray[string](ids2),
)
return execution, err
}
func prepareExecutionQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(row *sql.Row) (*Execution, error)) {
return sq.Select(
ExecutionColumnInstanceID.identifier(),
ExecutionColumnID.identifier(),
ExecutionColumnChangeDate.identifier(),
ExecutionColumnResourceOwner.identifier(),
ExecutionColumnSequence.identifier(),
ExecutionColumnTargets.identifier(),
ExecutionColumnIncludes.identifier(),
executionTargetsListCol.identifier(),
).From(executionTable.identifier()).
Join("(" + executionTargetsQuery + ") AS " + executionTargetsTableAlias.alias + " ON " +
ExecutionTargetsColumnInstanceID.identifier() + " = " + ExecutionColumnInstanceID.identifier() + " AND " +
ExecutionTargetsColumnExecutionID.identifier() + " = " + ExecutionColumnID.identifier(),
).
PlaceholderFormat(sq.Dollar),
func(row *sql.Row) (*Execution, error) {
execution := new(Execution)
err := row.Scan(
&execution.ID,
&execution.EventDate,
&execution.ResourceOwner,
&execution.Sequence,
&execution.Targets,
&execution.Includes,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zerrors.ThrowNotFound(err, "QUERY-qzn1xycesh", "Errors.Execution.NotFound")
}
return nil, zerrors.ThrowInternal(err, "QUERY-f8sjvm4tb8", "Errors.Internal")
}
return execution, nil
}
scanExecution
}
func prepareExecutionsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuilder, func(rows *sql.Rows) (*Executions, error)) {
return sq.Select(
ExecutionColumnInstanceID.identifier(),
ExecutionColumnID.identifier(),
ExecutionColumnChangeDate.identifier(),
ExecutionColumnSequence.identifier(),
executionTargetsListCol.identifier(),
countColumn.identifier(),
).From(executionTable.identifier()).
Join("(" + executionTargetsQuery + ") AS " + executionTargetsTableAlias.alias + " ON " +
ExecutionTargetsColumnInstanceID.identifier() + " = " + ExecutionColumnInstanceID.identifier() + " AND " +
ExecutionTargetsColumnExecutionID.identifier() + " = " + ExecutionColumnID.identifier(),
).
PlaceholderFormat(sq.Dollar),
scanExecutions
}
type executionTarget struct {
Position int `json:"position,omitempty"`
Include string `json:"include,omitempty"`
Target string `json:"target,omitempty"`
}
func scanExecution(row *sql.Row) (*Execution, error) {
execution := new(Execution)
targets := make([]byte, 0)
err := row.Scan(
&execution.ResourceOwner,
&execution.ID,
&execution.EventDate,
&execution.Sequence,
&targets,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zerrors.ThrowNotFound(err, "QUERY-qzn1xycesh", "Errors.Execution.NotFound")
}
return nil, zerrors.ThrowInternal(err, "QUERY-f8sjvm4tb8", "Errors.Internal")
}
executionTargets := make([]*executionTarget, 0)
if err := json.Unmarshal(targets, &executionTargets); err != nil {
return nil, err
}
execution.Targets = make([]*exec.Target, len(executionTargets))
for i := range executionTargets {
if executionTargets[i].Target != "" {
execution.Targets[i] = &exec.Target{Type: domain.ExecutionTargetTypeTarget, Target: executionTargets[i].Target}
}
if executionTargets[i].Include != "" {
execution.Targets[i] = &exec.Target{Type: domain.ExecutionTargetTypeInclude, Target: executionTargets[i].Include}
}
}
return execution, nil
}
func executionTargetsUnmarshal(data []byte) ([]*exec.Target, error) {
executionTargets := make([]*executionTarget, 0)
if err := json.Unmarshal(data, &executionTargets); err != nil {
return nil, err
}
targets := make([]*exec.Target, len(executionTargets))
// position starts with 1
for _, item := range executionTargets {
if item.Target != "" {
targets[item.Position-1] = &exec.Target{Type: domain.ExecutionTargetTypeTarget, Target: item.Target}
}
if item.Include != "" {
targets[item.Position-1] = &exec.Target{Type: domain.ExecutionTargetTypeInclude, Target: item.Include}
}
}
return targets, nil
}
func scanExecutions(rows *sql.Rows) (*Executions, error) {
executions := make([]*Execution, 0)
var count uint64
for rows.Next() {
execution := new(Execution)
targets := make([]byte, 0)
err := rows.Scan(
&execution.ResourceOwner,
&execution.ID,
&execution.EventDate,
&execution.Sequence,
&targets,
&count,
)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, zerrors.ThrowNotFound(err, "QUERY-tbrmno85vp", "Errors.Execution.NotFound")
}
return nil, zerrors.ThrowInternal(err, "QUERY-tyw2ydsj84", "Errors.Internal")
}
execution.Targets, err = executionTargetsUnmarshal(targets)
if err != nil {
return nil, err
}
executions = append(executions, execution)
}
if err := rows.Close(); err != nil {
return nil, zerrors.ThrowInternal(err, "QUERY-yhka3fs3mw", "Errors.Query.CloseRows")
}
return &Executions{
Executions: executions,
SearchResponse: SearchResponse{
Count: count,
},
}, nil
}
type ExecutionTarget struct {
InstanceID string
ExecutionID string
TargetID string
TargetType domain.TargetType
Endpoint string
Timeout time.Duration
InterruptOnError bool
}
func (e *ExecutionTarget) GetExecutionID() string {
return e.ExecutionID
}
func (e *ExecutionTarget) GetTargetID() string {
return e.TargetID
}
func (e *ExecutionTarget) IsInterruptOnError() bool {
return e.InterruptOnError
}
func (e *ExecutionTarget) GetEndpoint() string {
return e.Endpoint
}
func (e *ExecutionTarget) GetTargetType() domain.TargetType {
return e.TargetType
}
func (e *ExecutionTarget) GetTimeout() time.Duration {
return e.Timeout
}
func scanExecutionTargets(rows *sql.Rows) ([]*ExecutionTarget, error) {
targets := make([]*ExecutionTarget, 0)
for rows.Next() {
target := new(ExecutionTarget)
var (
instanceID = &sql.NullString{}
executionID = &sql.NullString{}
targetID = &sql.NullString{}
targetType = &sql.NullInt32{}
endpoint = &sql.NullString{}
timeout = &sql.NullInt64{}
interruptOnError = &sql.NullBool{}
)
err := rows.Scan(
executionID,
instanceID,
targetID,
targetType,
endpoint,
timeout,
interruptOnError,
)
if err != nil {
return nil, err
}
target.InstanceID = instanceID.String
target.ExecutionID = executionID.String
target.TargetID = targetID.String
target.TargetType = domain.TargetType(targetType.Int32)
target.Endpoint = endpoint.String
target.Timeout = time.Duration(timeout.Int64)
target.InterruptOnError = interruptOnError.Bool
targets = append(targets, target)
}
if err := rows.Close(); err != nil {
return nil, zerrors.ThrowInternal(err, "QUERY-37ardr0pki", "Errors.Query.CloseRows")
}
return targets, nil
}

View File

@@ -0,0 +1,11 @@
SELECT instance_id,
execution_id,
JSONB_AGG(
JSON_OBJECT(
'position' : position,
'include' : include,
'target' : target_id
)
) as targets
FROM projections.executions1_targets
GROUP BY instance_id, execution_id

View File

@@ -8,44 +8,56 @@ import (
"regexp"
"testing"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/domain"
exec "github.com/zitadel/zitadel/internal/repository/execution"
"github.com/zitadel/zitadel/internal/zerrors"
)
var (
prepareExecutionsStmt = `SELECT projections.executions.id,` +
` projections.executions.change_date,` +
` projections.executions.resource_owner,` +
` projections.executions.sequence,` +
` projections.executions.targets,` +
` projections.executions.includes,` +
prepareExecutionsStmt = `SELECT projections.executions1.instance_id,` +
` projections.executions1.id,` +
` projections.executions1.change_date,` +
` projections.executions1.sequence,` +
` execution_targets.targets,` +
` COUNT(*) OVER ()` +
` FROM projections.executions`
` FROM projections.executions1` +
` JOIN (` +
`SELECT instance_id, execution_id, JSONB_AGG( JSON_OBJECT( 'position' : position, 'include' : include, 'target' : target_id ) ) as targets` +
` FROM projections.executions1_targets` +
` GROUP BY instance_id, execution_id` +
`)` +
` AS execution_targets` +
` ON execution_targets.instance_id = projections.executions1.instance_id` +
` AND execution_targets.execution_id = projections.executions1.id`
prepareExecutionsCols = []string{
"instance_id",
"id",
"change_date",
"resource_owner",
"sequence",
"targets",
"includes",
"count",
}
prepareExecutionStmt = `SELECT projections.executions.id,` +
` projections.executions.change_date,` +
` projections.executions.resource_owner,` +
` projections.executions.sequence,` +
` projections.executions.targets,` +
` projections.executions.includes` +
` FROM projections.executions`
prepareExecutionStmt = `SELECT projections.executions1.instance_id,` +
` projections.executions1.id,` +
` projections.executions1.change_date,` +
` projections.executions1.sequence,` +
` execution_targets.targets` +
` FROM projections.executions1` +
` JOIN (` +
`SELECT instance_id, execution_id, JSONB_AGG( JSON_OBJECT( 'position' : position, 'include' : include, 'target' : target_id ) ) as targets` +
` FROM projections.executions1_targets` +
` GROUP BY instance_id, execution_id` +
`)` +
` AS execution_targets` +
` ON execution_targets.instance_id = projections.executions1.instance_id` +
` AND execution_targets.execution_id = projections.executions1.id`
prepareExecutionCols = []string{
"instance_id",
"id",
"change_date",
"resource_owner",
"sequence",
"targets",
"includes",
}
)
@@ -81,12 +93,11 @@ func Test_ExecutionPrepares(t *testing.T) {
prepareExecutionsCols,
[][]driver.Value{
{
"ro",
"id",
testNow,
"ro",
uint64(20211109),
database.TextArray[string]{"target"},
database.TextArray[string]{"include"},
[]byte(`[{"position" : 1, "target" : "target"}, {"position" : 2, "include" : "include"}]`),
},
},
),
@@ -103,8 +114,10 @@ func Test_ExecutionPrepares(t *testing.T) {
ResourceOwner: "ro",
Sequence: 20211109,
},
Targets: database.TextArray[string]{"target"},
Includes: database.TextArray[string]{"include"},
Targets: []*exec.Target{
{Type: domain.ExecutionTargetTypeTarget, Target: "target"},
{Type: domain.ExecutionTargetTypeInclude, Target: "include"},
},
},
},
},
@@ -118,20 +131,18 @@ func Test_ExecutionPrepares(t *testing.T) {
prepareExecutionsCols,
[][]driver.Value{
{
"ro",
"id-1",
testNow,
"ro",
uint64(20211109),
database.TextArray[string]{"target1"},
database.TextArray[string]{"include1"},
[]byte(`[{"position" : 1, "target" : "target"}, {"position" : 2, "include" : "include"}]`),
},
{
"ro",
"id-2",
testNow,
"ro",
uint64(20211110),
database.TextArray[string]{"target2"},
database.TextArray[string]{"include2"},
[]byte(`[{"position" : 2, "target" : "target"}, {"position" : 1, "include" : "include"}]`),
},
},
),
@@ -148,8 +159,10 @@ func Test_ExecutionPrepares(t *testing.T) {
ResourceOwner: "ro",
Sequence: 20211109,
},
Targets: database.TextArray[string]{"target1"},
Includes: database.TextArray[string]{"include1"},
Targets: []*exec.Target{
{Type: domain.ExecutionTargetTypeTarget, Target: "target"},
{Type: domain.ExecutionTargetTypeInclude, Target: "include"},
},
},
{
ID: "id-2",
@@ -158,8 +171,10 @@ func Test_ExecutionPrepares(t *testing.T) {
ResourceOwner: "ro",
Sequence: 20211110,
},
Targets: database.TextArray[string]{"target2"},
Includes: database.TextArray[string]{"include2"},
Targets: []*exec.Target{
{Type: domain.ExecutionTargetTypeInclude, Target: "include"},
{Type: domain.ExecutionTargetTypeTarget, Target: "target"},
},
},
},
},
@@ -207,12 +222,11 @@ func Test_ExecutionPrepares(t *testing.T) {
regexp.QuoteMeta(prepareExecutionStmt),
prepareExecutionCols,
[]driver.Value{
"ro",
"id",
testNow,
"ro",
uint64(20211109),
database.TextArray[string]{"target"},
database.TextArray[string]{"include"},
[]byte(`[{"position" : 1, "target" : "target"}, {"position" : 2, "include" : "include"}]`),
},
),
},
@@ -223,8 +237,10 @@ func Test_ExecutionPrepares(t *testing.T) {
ResourceOwner: "ro",
Sequence: 20211109,
},
Targets: database.TextArray[string]{"target"},
Includes: database.TextArray[string]{"include"},
Targets: []*exec.Target{
{Type: domain.ExecutionTargetTypeTarget, Target: "target"},
{Type: domain.ExecutionTargetTypeInclude, Target: "include"},
},
},
},
{

View File

@@ -3,6 +3,7 @@ package projection
import (
"context"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
@@ -11,15 +12,19 @@ import (
)
const (
ExecutionTable = "projections.executions"
ExecutionIDCol = "id"
ExecutionCreationDateCol = "creation_date"
ExecutionChangeDateCol = "change_date"
ExecutionResourceOwnerCol = "resource_owner"
ExecutionInstanceIDCol = "instance_id"
ExecutionSequenceCol = "sequence"
ExecutionTargetsCol = "targets"
ExecutionIncludesCol = "includes"
ExecutionTable = "projections.executions1"
ExecutionIDCol = "id"
ExecutionCreationDateCol = "creation_date"
ExecutionChangeDateCol = "change_date"
ExecutionInstanceIDCol = "instance_id"
ExecutionSequenceCol = "sequence"
ExecutionTargetSuffix = "targets"
ExecutionTargetExecutionIDCol = "execution_id"
ExecutionTargetInstanceIDCol = "instance_id"
ExecutionTargetPositionCol = "position"
ExecutionTargetTargetIDCol = "target_id"
ExecutionTargetIncludeCol = "include"
)
type executionProjection struct{}
@@ -33,19 +38,28 @@ func (*executionProjection) Name() string {
}
func (*executionProjection) Init() *old_handler.Check {
return handler.NewTableCheck(
return handler.NewMultiTableCheck(
handler.NewTable([]*handler.InitColumn{
handler.NewColumn(ExecutionIDCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionCreationDateCol, handler.ColumnTypeTimestamp),
handler.NewColumn(ExecutionChangeDateCol, handler.ColumnTypeTimestamp),
handler.NewColumn(ExecutionResourceOwnerCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionInstanceIDCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionSequenceCol, handler.ColumnTypeInt64),
handler.NewColumn(ExecutionTargetsCol, handler.ColumnTypeTextArray, handler.Nullable()),
handler.NewColumn(ExecutionIncludesCol, handler.ColumnTypeTextArray, handler.Nullable()),
handler.NewColumn(ExecutionInstanceIDCol, handler.ColumnTypeText),
},
handler.NewPrimaryKey(ExecutionInstanceIDCol, ExecutionIDCol),
),
handler.NewSuffixedTable([]*handler.InitColumn{
handler.NewColumn(ExecutionTargetInstanceIDCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionTargetExecutionIDCol, handler.ColumnTypeText),
handler.NewColumn(ExecutionTargetPositionCol, handler.ColumnTypeInt64),
handler.NewColumn(ExecutionTargetIncludeCol, handler.ColumnTypeText, handler.Nullable()),
handler.NewColumn(ExecutionTargetTargetIDCol, handler.ColumnTypeText, handler.Nullable()),
},
handler.NewPrimaryKey(ExecutionTargetInstanceIDCol, ExecutionTargetExecutionIDCol, ExecutionTargetPositionCol),
ExecutionTargetSuffix,
handler.WithForeignKey(handler.NewForeignKey("execution", []string{ExecutionTargetInstanceIDCol, ExecutionTargetExecutionIDCol}, []string{ExecutionInstanceIDCol, ExecutionIDCol})),
handler.WithIndex(handler.NewIndex("execution", []string{ExecutionTargetInstanceIDCol, ExecutionTargetExecutionIDCol})),
),
)
}
@@ -55,7 +69,7 @@ func (p *executionProjection) Reducers() []handler.AggregateReducer {
Aggregate: exec.AggregateType,
EventReducers: []handler.EventReducer{
{
Event: exec.SetEventType,
Event: exec.SetEventV2Type,
Reduce: p.reduceExecutionSet,
},
{
@@ -77,21 +91,65 @@ func (p *executionProjection) Reducers() []handler.AggregateReducer {
}
func (p *executionProjection) reduceExecutionSet(event eventstore.Event) (*handler.Statement, error) {
e, err := assertEvent[*exec.SetEvent](event)
e, err := assertEvent[*exec.SetEventV2](event)
if err != nil {
return nil, err
}
columns := []handler.Column{
handler.NewCol(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCol(ExecutionIDCol, e.Aggregate().ID),
handler.NewCol(ExecutionResourceOwnerCol, e.Aggregate().ResourceOwner),
handler.NewCol(ExecutionCreationDateCol, handler.OnlySetValueOnInsert(ExecutionTable, e.CreationDate())),
handler.NewCol(ExecutionChangeDateCol, e.CreationDate()),
handler.NewCol(ExecutionSequenceCol, e.Sequence()),
handler.NewCol(ExecutionTargetsCol, e.Targets),
handler.NewCol(ExecutionIncludesCol, e.Includes),
stmts := []func(eventstore.Event) handler.Exec{
handler.AddUpsertStatement(
[]handler.Column{
handler.NewCol(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCol(ExecutionIDCol, e.Aggregate().ID),
},
[]handler.Column{
handler.NewCol(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCol(ExecutionIDCol, e.Aggregate().ID),
handler.NewCol(ExecutionCreationDateCol, handler.OnlySetValueOnInsert(ExecutionTable, e.CreationDate())),
handler.NewCol(ExecutionChangeDateCol, e.CreationDate()),
handler.NewCol(ExecutionSequenceCol, e.Sequence()),
},
),
// cleanup execution targets to re-insert them
handler.AddDeleteStatement(
[]handler.Condition{
handler.NewCond(ExecutionTargetInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCond(ExecutionTargetExecutionIDCol, e.Aggregate().ID),
},
handler.WithTableSuffix(ExecutionTargetSuffix),
),
}
return handler.NewUpsertStatement(e, columns[0:2], columns), nil
if len(e.Targets) > 0 {
for i, target := range e.Targets {
var targetStr, includeStr string
switch target.Type {
case domain.ExecutionTargetTypeTarget:
targetStr = target.Target
case domain.ExecutionTargetTypeInclude:
includeStr = target.Target
case domain.ExecutionTargetTypeUnspecified:
continue
default:
continue
}
stmts = append(stmts,
handler.AddCreateStatement(
[]handler.Column{
handler.NewCol(ExecutionTargetInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCol(ExecutionTargetExecutionIDCol, e.Aggregate().ID),
handler.NewCol(ExecutionTargetPositionCol, i+1),
handler.NewCol(ExecutionTargetIncludeCol, includeStr),
handler.NewCol(ExecutionTargetTargetIDCol, targetStr),
},
handler.WithTableSuffix(ExecutionTargetSuffix),
),
)
}
}
return handler.NewMultiStatement(e, stmts...), nil
}
func (p *executionProjection) reduceExecutionRemoved(event eventstore.Event) (*handler.Statement, error) {
@@ -99,8 +157,8 @@ func (p *executionProjection) reduceExecutionRemoved(event eventstore.Event) (*h
if err != nil {
return nil, err
}
return handler.NewDeleteStatement(
e,
return handler.NewDeleteStatement(e,
[]handler.Condition{
handler.NewCond(ExecutionInstanceIDCol, e.Aggregate().InstanceID),
handler.NewCond(ExecutionIDCol, e.Aggregate().ID),

View File

@@ -25,11 +25,11 @@ func TestExecutionProjection_reduces(t *testing.T) {
args: args{
event: getEvent(
testEvent(
exec.SetEventType,
exec.SetEventV2Type,
exec.AggregateType,
[]byte(`{"targets": ["target"], "includes": ["include"]}`),
[]byte(`{"targets": [{"type":2,"target":"target"},{"type":1,"target":"include"}]}`),
),
eventstore.GenericEventMapper[exec.SetEvent],
eventstore.GenericEventMapper[exec.SetEventV2],
),
},
reduce: (&executionProjection{}).reduceExecutionSet,
@@ -39,16 +39,40 @@ func TestExecutionProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.executions (instance_id, id, resource_owner, creation_date, change_date, sequence, targets, includes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (instance_id, id) DO UPDATE SET (resource_owner, creation_date, change_date, sequence, targets, includes) = (EXCLUDED.resource_owner, projections.executions.creation_date, EXCLUDED.change_date, EXCLUDED.sequence, EXCLUDED.targets, EXCLUDED.includes)",
expectedStmt: "INSERT INTO projections.executions1 (instance_id, id, creation_date, change_date, sequence) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (instance_id, id) DO UPDATE SET (creation_date, change_date, sequence) = (projections.executions1.creation_date, EXCLUDED.change_date, EXCLUDED.sequence)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
"ro-id",
anyArg{},
anyArg{},
uint64(15),
[]string{"target"},
[]string{"include"},
},
},
{
expectedStmt: "DELETE FROM projections.executions1_targets WHERE (instance_id = $1) AND (execution_id = $2)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
},
},
{
expectedStmt: "INSERT INTO projections.executions1_targets (instance_id, execution_id, position, include, target_id) VALUES ($1, $2, $3, $4, $5)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
1,
"",
"target",
},
},
{
expectedStmt: "INSERT INTO projections.executions1_targets (instance_id, execution_id, position, include, target_id) VALUES ($1, $2, $3, $4, $5)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
2,
"include",
"",
},
},
},
@@ -74,7 +98,7 @@ func TestExecutionProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.executions WHERE (instance_id = $1) AND (id = $2)",
expectedStmt: "DELETE FROM projections.executions1 WHERE (instance_id = $1) AND (id = $2)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
@@ -103,7 +127,7 @@ func TestExecutionProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.executions WHERE (instance_id = $1)",
expectedStmt: "DELETE FROM projections.executions1 WHERE (instance_id = $1)",
expectedArgs: []interface{}{
"agg-id",
},

View File

@@ -269,8 +269,8 @@ func newProjectionsList() {
RestrictionsProjection,
SystemFeatureProjection,
InstanceFeatureProjection,
ExecutionProjection,
TargetProjection,
ExecutionProjection,
UserSchemaProjection,
}
}

View File

@@ -11,7 +11,7 @@ import (
)
const (
TargetTable = "projections.targets"
TargetTable = "projections.targets1"
TargetIDCol = "id"
TargetCreationDateCol = "creation_date"
TargetChangeDateCol = "change_date"
@@ -20,9 +20,8 @@ const (
TargetSequenceCol = "sequence"
TargetNameCol = "name"
TargetTargetType = "target_type"
TargetURLCol = "url"
TargetEndpointCol = "endpoint"
TargetTimeoutCol = "timeout"
TargetAsyncCol = "async"
TargetInterruptOnErrorCol = "interrupt_on_error"
)
@@ -47,10 +46,9 @@ func (*targetProjection) Init() *old_handler.Check {
handler.NewColumn(TargetTargetType, handler.ColumnTypeEnum),
handler.NewColumn(TargetSequenceCol, handler.ColumnTypeInt64),
handler.NewColumn(TargetNameCol, handler.ColumnTypeText),
handler.NewColumn(TargetURLCol, handler.ColumnTypeText, handler.Default("")),
handler.NewColumn(TargetTimeoutCol, handler.ColumnTypeInt64, handler.Default(0)),
handler.NewColumn(TargetAsyncCol, handler.ColumnTypeBool, handler.Default(false)),
handler.NewColumn(TargetInterruptOnErrorCol, handler.ColumnTypeBool, handler.Default(false)),
handler.NewColumn(TargetEndpointCol, handler.ColumnTypeText),
handler.NewColumn(TargetTimeoutCol, handler.ColumnTypeInt64),
handler.NewColumn(TargetInterruptOnErrorCol, handler.ColumnTypeBool),
},
handler.NewPrimaryKey(TargetInstanceIDCol, TargetIDCol),
),
@@ -103,10 +101,9 @@ func (p *targetProjection) reduceTargetAdded(event eventstore.Event) (*handler.S
handler.NewCol(TargetChangeDateCol, e.CreationDate()),
handler.NewCol(TargetSequenceCol, e.Sequence()),
handler.NewCol(TargetNameCol, e.Name),
handler.NewCol(TargetURLCol, e.URL),
handler.NewCol(TargetEndpointCol, e.Endpoint),
handler.NewCol(TargetTargetType, e.TargetType),
handler.NewCol(TargetTimeoutCol, e.Timeout),
handler.NewCol(TargetAsyncCol, e.Async),
handler.NewCol(TargetInterruptOnErrorCol, e.InterruptOnError),
},
), nil
@@ -128,15 +125,12 @@ func (p *targetProjection) reduceTargetChanged(event eventstore.Event) (*handler
if e.TargetType != nil {
values = append(values, handler.NewCol(TargetTargetType, *e.TargetType))
}
if e.URL != nil {
values = append(values, handler.NewCol(TargetURLCol, *e.URL))
if e.Endpoint != nil {
values = append(values, handler.NewCol(TargetEndpointCol, *e.Endpoint))
}
if e.Timeout != nil {
values = append(values, handler.NewCol(TargetTimeoutCol, *e.Timeout))
}
if e.Async != nil {
values = append(values, handler.NewCol(TargetAsyncCol, *e.Async))
}
if e.InterruptOnError != nil {
values = append(values, handler.NewCol(TargetInterruptOnErrorCol, *e.InterruptOnError))
}

View File

@@ -29,7 +29,7 @@ func TestTargetProjection_reduces(t *testing.T) {
testEvent(
target.AddedEventType,
target.AggregateType,
[]byte(`{"name": "name", "targetType":0, "url":"https://example.com", "timeout": 3000000000, "async": true, "interruptOnError": true}`),
[]byte(`{"name": "name", "targetType":0, "endpoint":"https://example.com", "timeout": 3000000000, "async": true, "interruptOnError": true}`),
),
eventstore.GenericEventMapper[target.AddedEvent],
),
@@ -41,7 +41,7 @@ func TestTargetProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "INSERT INTO projections.targets (instance_id, resource_owner, id, creation_date, change_date, sequence, name, url, target_type, timeout, async, interrupt_on_error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
expectedStmt: "INSERT INTO projections.targets1 (instance_id, resource_owner, id, creation_date, change_date, sequence, name, endpoint, target_type, timeout, interrupt_on_error) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)",
expectedArgs: []interface{}{
"instance-id",
"ro-id",
@@ -54,7 +54,6 @@ func TestTargetProjection_reduces(t *testing.T) {
domain.TargetTypeWebhook,
3 * time.Second,
true,
true,
},
},
},
@@ -68,7 +67,7 @@ func TestTargetProjection_reduces(t *testing.T) {
testEvent(
target.ChangedEventType,
target.AggregateType,
[]byte(`{"name": "name2", "targetType":0, "url":"https://example.com", "timeout": 3000000000, "async": true, "interruptOnError": true}`),
[]byte(`{"name": "name2", "targetType":0, "endpoint":"https://example.com", "timeout": 3000000000, "async": true, "interruptOnError": true}`),
),
eventstore.GenericEventMapper[target.ChangedEvent],
),
@@ -80,7 +79,7 @@ func TestTargetProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "UPDATE projections.targets SET (change_date, sequence, resource_owner, name, target_type, url, timeout, async, interrupt_on_error) = ($1, $2, $3, $4, $5, $6, $7, $8, $9) WHERE (instance_id = $10) AND (id = $11)",
expectedStmt: "UPDATE projections.targets1 SET (change_date, sequence, resource_owner, name, target_type, endpoint, timeout, interrupt_on_error) = ($1, $2, $3, $4, $5, $6, $7, $8) WHERE (instance_id = $9) AND (id = $10)",
expectedArgs: []interface{}{
anyArg{},
uint64(15),
@@ -90,7 +89,6 @@ func TestTargetProjection_reduces(t *testing.T) {
"https://example.com",
3 * time.Second,
true,
true,
"instance-id",
"agg-id",
},
@@ -118,7 +116,7 @@ func TestTargetProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.targets WHERE (instance_id = $1) AND (id = $2)",
expectedStmt: "DELETE FROM projections.targets1 WHERE (instance_id = $1) AND (id = $2)",
expectedArgs: []interface{}{
"instance-id",
"agg-id",
@@ -147,7 +145,7 @@ func TestTargetProjection_reduces(t *testing.T) {
executer: &testExecuter{
executions: []execution{
{
expectedStmt: "DELETE FROM projections.targets WHERE (instance_id = $1)",
expectedStmt: "DELETE FROM projections.targets1 WHERE (instance_id = $1)",
expectedArgs: []interface{}{
"agg-id",
},

View File

@@ -478,31 +478,31 @@ func (q *SubSelect) comp() sq.Sqlizer {
return selectQuery
}
type ListQuery struct {
type listQuery struct {
Column Column
Data interface{}
Compare ListComparison
}
func NewListQuery(column Column, value interface{}, compare ListComparison) (*ListQuery, error) {
func NewListQuery(column Column, value interface{}, compare ListComparison) (*listQuery, error) {
if compare < 0 || compare >= listCompareMax {
return nil, ErrInvalidCompare
}
if column.isZero() {
return nil, ErrMissingColumn
}
return &ListQuery{
return &listQuery{
Column: column,
Data: value,
Compare: compare,
}, nil
}
func (q *ListQuery) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
func (q *listQuery) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(q.comp())
}
func (q *ListQuery) comp() sq.Sqlizer {
func (q *listQuery) comp() sq.Sqlizer {
if q.Compare != ListIn {
return nil
}
@@ -517,7 +517,7 @@ func (q *ListQuery) comp() sq.Sqlizer {
return sq.Eq{q.Column.identifier(): q.Data}
}
func (q *ListQuery) Col() Column {
func (q *listQuery) Col() Column {
return q.Column
}
@@ -720,6 +720,25 @@ type listContains struct {
args interface{}
}
func NewListContains(c Column, value interface{}) (*listContains, error) {
return &listContains{
col: c,
args: value,
}, nil
}
func (q *listContains) Col() Column {
return q.col
}
func (q *listContains) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
return query.Where(q.comp())
}
func (q *listContains) ToSql() (string, []interface{}, error) {
return q.col.identifier() + " @> ? ", []interface{}{q.args}, nil
}
func (q *listContains) comp() sq.Sqlizer {
return q
}

View File

@@ -521,7 +521,7 @@ func TestNewListQuery(t *testing.T) {
tests := []struct {
name string
args args
want *ListQuery
want *listQuery
wantErr func(error) bool
}{
{
@@ -575,7 +575,7 @@ func TestNewListQuery(t *testing.T) {
data: []interface{}{"hurst"},
compare: ListIn,
},
want: &ListQuery{
want: &listQuery{
Column: testCol,
Data: []interface{}{"hurst"},
Compare: ListIn,
@@ -588,7 +588,7 @@ func TestNewListQuery(t *testing.T) {
data: &SubSelect{Column: testCol, Queries: []SearchQuery{&textQuery{testCol, "horst1", TextEquals}}},
compare: ListIn,
},
want: &ListQuery{
want: &listQuery{
Column: testCol,
Data: &SubSelect{Column: testCol, Queries: []SearchQuery{&textQuery{testCol, "horst1", TextEquals}}},
Compare: ListIn,
@@ -751,7 +751,7 @@ func TestListQuery_comp(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &ListQuery{
s := &listQuery{
Column: tt.fields.Column,
Data: tt.fields.Data,
Compare: tt.fields.Compare,

View File

@@ -52,17 +52,13 @@ var (
table: targetTable,
}
TargetColumnURL = Column{
name: projection.TargetURLCol,
name: projection.TargetEndpointCol,
table: targetTable,
}
TargetColumnTimeout = Column{
name: projection.TargetTimeoutCol,
table: targetTable,
}
TargetColumnAsync = Column{
name: projection.TargetAsyncCol,
table: targetTable,
}
TargetColumnInterruptOnError = Column{
name: projection.TargetInterruptOnErrorCol,
table: targetTable,
@@ -84,9 +80,8 @@ type Target struct {
Name string
TargetType domain.TargetType
URL string
Endpoint string
Timeout time.Duration
Async bool
InterruptOnError bool
}
@@ -138,7 +133,6 @@ func prepareTargetsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuil
TargetColumnTargetType.identifier(),
TargetColumnTimeout.identifier(),
TargetColumnURL.identifier(),
TargetColumnAsync.identifier(),
TargetColumnInterruptOnError.identifier(),
countColumn.identifier(),
).From(targetTable.identifier()).
@@ -156,8 +150,7 @@ func prepareTargetsQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuil
&target.Name,
&target.TargetType,
&target.Timeout,
&target.URL,
&target.Async,
&target.Endpoint,
&target.InterruptOnError,
&count,
)
@@ -190,7 +183,6 @@ func prepareTargetQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuild
TargetColumnTargetType.identifier(),
TargetColumnTimeout.identifier(),
TargetColumnURL.identifier(),
TargetColumnAsync.identifier(),
TargetColumnInterruptOnError.identifier(),
).From(targetTable.identifier()).
PlaceholderFormat(sq.Dollar),
@@ -204,8 +196,7 @@ func prepareTargetQuery(ctx context.Context, db prepareDatabase) (sq.SelectBuild
&target.Name,
&target.TargetType,
&target.Timeout,
&target.URL,
&target.Async,
&target.Endpoint,
&target.InterruptOnError,
)
if err != nil {

View File

@@ -14,18 +14,17 @@ import (
)
var (
prepareTargetsStmt = `SELECT projections.targets.id,` +
` projections.targets.change_date,` +
` projections.targets.resource_owner,` +
` projections.targets.sequence,` +
` projections.targets.name,` +
` projections.targets.target_type,` +
` projections.targets.timeout,` +
` projections.targets.url,` +
` projections.targets.async,` +
` projections.targets.interrupt_on_error,` +
prepareTargetsStmt = `SELECT projections.targets1.id,` +
` projections.targets1.change_date,` +
` projections.targets1.resource_owner,` +
` projections.targets1.sequence,` +
` projections.targets1.name,` +
` projections.targets1.target_type,` +
` projections.targets1.timeout,` +
` projections.targets1.endpoint,` +
` projections.targets1.interrupt_on_error,` +
` COUNT(*) OVER ()` +
` FROM projections.targets`
` FROM projections.targets1`
prepareTargetsCols = []string{
"id",
"change_date",
@@ -34,23 +33,21 @@ var (
"name",
"target_type",
"timeout",
"url",
"async",
"endpoint",
"interrupt_on_error",
"count",
}
prepareTargetStmt = `SELECT projections.targets.id,` +
` projections.targets.change_date,` +
` projections.targets.resource_owner,` +
` projections.targets.sequence,` +
` projections.targets.name,` +
` projections.targets.target_type,` +
` projections.targets.timeout,` +
` projections.targets.url,` +
` projections.targets.async,` +
` projections.targets.interrupt_on_error` +
` FROM projections.targets`
prepareTargetStmt = `SELECT projections.targets1.id,` +
` projections.targets1.change_date,` +
` projections.targets1.resource_owner,` +
` projections.targets1.sequence,` +
` projections.targets1.name,` +
` projections.targets1.target_type,` +
` projections.targets1.timeout,` +
` projections.targets1.endpoint,` +
` projections.targets1.interrupt_on_error` +
` FROM projections.targets1`
prepareTargetCols = []string{
"id",
"change_date",
@@ -59,8 +56,7 @@ var (
"name",
"target_type",
"timeout",
"url",
"async",
"endpoint",
"interrupt_on_error",
}
)
@@ -106,7 +102,6 @@ func Test_TargetPrepares(t *testing.T) {
1 * time.Second,
"https://example.com",
true,
true,
},
},
),
@@ -126,8 +121,7 @@ func Test_TargetPrepares(t *testing.T) {
Name: "target-name",
TargetType: domain.TargetTypeWebhook,
Timeout: 1 * time.Second,
URL: "https://example.com",
Async: true,
Endpoint: "https://example.com",
InterruptOnError: true,
},
},
@@ -151,7 +145,6 @@ func Test_TargetPrepares(t *testing.T) {
1 * time.Second,
"https://example.com",
true,
false,
},
{
"id-2",
@@ -163,14 +156,24 @@ func Test_TargetPrepares(t *testing.T) {
1 * time.Second,
"https://example.com",
false,
true,
},
{
"id-3",
testNow,
"ro",
uint64(20211110),
"target-name3",
domain.TargetTypeAsync,
1 * time.Second,
"https://example.com",
false,
},
},
),
},
object: &Targets{
SearchResponse: SearchResponse{
Count: 2,
Count: 3,
},
Targets: []*Target{
{
@@ -183,9 +186,8 @@ func Test_TargetPrepares(t *testing.T) {
Name: "target-name1",
TargetType: domain.TargetTypeWebhook,
Timeout: 1 * time.Second,
URL: "https://example.com",
Async: true,
InterruptOnError: false,
Endpoint: "https://example.com",
InterruptOnError: true,
},
{
ID: "id-2",
@@ -197,9 +199,21 @@ func Test_TargetPrepares(t *testing.T) {
Name: "target-name2",
TargetType: domain.TargetTypeWebhook,
Timeout: 1 * time.Second,
URL: "https://example.com",
Async: false,
InterruptOnError: true,
Endpoint: "https://example.com",
InterruptOnError: false,
},
{
ID: "id-3",
ObjectDetails: domain.ObjectDetails{
EventDate: testNow,
ResourceOwner: "ro",
Sequence: 20211110,
},
Name: "target-name3",
TargetType: domain.TargetTypeAsync,
Timeout: 1 * time.Second,
Endpoint: "https://example.com",
InterruptOnError: false,
},
},
},
@@ -256,7 +270,6 @@ func Test_TargetPrepares(t *testing.T) {
1 * time.Second,
"https://example.com",
true,
false,
},
),
},
@@ -270,9 +283,8 @@ func Test_TargetPrepares(t *testing.T) {
Name: "target-name",
TargetType: domain.TargetTypeWebhook,
Timeout: 1 * time.Second,
URL: "https://example.com",
Async: true,
InterruptOnError: false,
Endpoint: "https://example.com",
InterruptOnError: true,
},
},
{

View File

@@ -0,0 +1,40 @@
WITH RECURSIVE
dissolved_execution_targets(execution_id, instance_id, position, "include", "target_id")
AS (SELECT execution_id
, instance_id
, ARRAY [position]
, "include"
, "target_id"
FROM matched_targets_and_includes
UNION ALL
SELECT e.execution_id
, p.instance_id
, e.position || p.position
, p."include"
, p."target_id"
FROM dissolved_execution_targets e
JOIN projections.executions1_targets p
ON e.instance_id = p.instance_id
AND e.include IS NOT NULL
AND e.include = p.execution_id),
matched AS (SELECT *
FROM projections.executions1
WHERE instance_id = $1
AND id = ANY($2)
ORDER BY id DESC
LIMIT 1),
matched_targets_and_includes AS (SELECT pos.*
FROM matched m
JOIN
projections.executions1_targets pos
ON m.id = pos.execution_id
AND m.instance_id = pos.instance_id
ORDER BY execution_id,
position)
select e.execution_id, e.instance_id, e.target_id, t.target_type, t.endpoint, t.timeout, t.interrupt_on_error
FROM dissolved_execution_targets e
JOIN projections.targets1 t
ON e.instance_id = t.instance_id
AND e.target_id = t.id
WHERE "include" = ''
ORDER BY position DESC;

View File

@@ -0,0 +1,47 @@
WITH RECURSIVE
dissolved_execution_targets(execution_id, instance_id, position, "include", "target_id")
AS (SELECT execution_id
, instance_id
, ARRAY [position]
, "include"
, "target_id"
FROM matched_targets_and_includes
UNION ALL
SELECT e.execution_id
, p.instance_id
, e.position || p.position
, p."include"
, p."target_id"
FROM dissolved_execution_targets e
JOIN projections.executions1_targets p
ON e.instance_id = p.instance_id
AND e.include IS NOT NULL
AND e.include = p.execution_id),
matched AS ((SELECT *
FROM projections.executions1
WHERE instance_id = $1
AND id = ANY($2)
ORDER BY id DESC
LIMIT 1)
UNION ALL
(SELECT *
FROM projections.executions1
WHERE instance_id = $1
AND id = ANY($3)
ORDER BY id DESC
LIMIT 1)),
matched_targets_and_includes AS (SELECT pos.*
FROM matched m
JOIN
projections.executions1_targets pos
ON m.id = pos.execution_id
AND m.instance_id = pos.instance_id
ORDER BY execution_id,
position)
select e.execution_id, e.instance_id, e.target_id, t.target_type, t.endpoint, t.timeout, t.interrupt_on_error
FROM dissolved_execution_targets e
JOIN projections.targets1 t
ON e.instance_id = t.instance_id
AND e.target_id = t.id
WHERE "include" = ''
ORDER BY position DESC;