feat: actions v2 execution targets command side (#7384)

Adds the API to create, update, delete targets for execution in a new ExecutionService (v3alpha)
This commit is contained in:
Stefan Benz
2024-02-15 06:39:10 +01:00
committed by GitHub
parent 518c8f486e
commit 198bc017b8
33 changed files with 2552 additions and 18 deletions

View File

@@ -0,0 +1,177 @@
package command
import (
"context"
"net/url"
"time"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/repository/target"
"github.com/zitadel/zitadel/internal/zerrors"
)
type AddTarget struct {
models.ObjectRoot
Name string
TargetType domain.TargetType
URL string
Timeout time.Duration
Async bool
InterruptOnError bool
}
func (a *AddTarget) IsValid() error {
if a.Name == "" {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-ddqbm9us5p", "Errors.Target.Invalid")
}
if a.Timeout == 0 {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-39f35d8uri", "Errors.Target.NoTimeout")
}
_, err := url.Parse(a.URL)
if err != nil || a.URL == "" {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-1r2k6qo6wg", "Errors.Target.InvalidURL")
}
return nil
}
func (c *Commands) AddTarget(ctx context.Context, add *AddTarget, resourceOwner string) (_ *domain.ObjectDetails, err error) {
if resourceOwner == "" {
return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-brml926e2d", "Errors.IDMissing")
}
if err := add.IsValid(); err != nil {
return nil, err
}
if add.AggregateID == "" {
add.AggregateID, err = c.idGenerator.Next()
if err != nil {
return nil, err
}
}
wm := NewTargetWriteModel(add.AggregateID, resourceOwner)
pushedEvents, err := c.eventstore.Push(ctx, target.NewAddedEvent(
ctx,
TargetAggregateFromWriteModel(&wm.WriteModel),
add.Name,
add.TargetType,
add.URL,
add.Timeout,
add.Async,
add.InterruptOnError,
))
if err != nil {
return nil, err
}
if err := AppendAndReduce(wm, pushedEvents...); err != nil {
return nil, err
}
return writeModelToObjectDetails(&wm.WriteModel), nil
}
type ChangeTarget struct {
models.ObjectRoot
Name *string
TargetType *domain.TargetType
URL *string
Timeout *time.Duration
Async *bool
InterruptOnError *bool
}
func (a *ChangeTarget) IsValid() error {
if a.AggregateID == "" {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-1l6ympeagp", "Errors.IDMissing")
}
if a.Name != nil && *a.Name == "" {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-d1wx4lm0zr", "Errors.Target.Invalid")
}
if a.Timeout != nil && *a.Timeout == 0 {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-08b39vdi57", "Errors.Target.NoTimeout")
}
if a.URL != nil {
_, err := url.Parse(*a.URL)
if err != nil || *a.URL == "" {
return zerrors.ThrowInvalidArgument(nil, "COMMAND-jsbaera7b6", "Errors.Target.InvalidURL")
}
}
return nil
}
func (c *Commands) ChangeTarget(ctx context.Context, change *ChangeTarget, resourceOwner string) (*domain.ObjectDetails, error) {
if resourceOwner == "" {
return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-zqibgg0wwh", "Errors.IDMissing")
}
if err := change.IsValid(); err != nil {
return nil, err
}
existing, err := c.getTargetWriteModelByID(ctx, change.AggregateID, resourceOwner)
if err != nil {
return nil, err
}
if !existing.State.Exists() {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-xj14f2cccn", "Errors.Target.NotFound")
}
changedEvent := existing.NewChangedEvent(
ctx,
TargetAggregateFromWriteModel(&existing.WriteModel),
change.Name,
change.TargetType,
change.URL,
change.Timeout,
change.Async,
change.InterruptOnError)
if changedEvent == nil {
return writeModelToObjectDetails(&existing.WriteModel), nil
}
pushedEvents, err := c.eventstore.Push(ctx, changedEvent)
if err != nil {
return nil, err
}
err = AppendAndReduce(existing, pushedEvents...)
if err != nil {
return nil, err
}
return writeModelToObjectDetails(&existing.WriteModel), nil
}
func (c *Commands) DeleteTarget(ctx context.Context, id, resourceOwner string) (*domain.ObjectDetails, error) {
if id == "" || resourceOwner == "" {
return nil, zerrors.ThrowInvalidArgument(nil, "COMMAND-obqos2l3no", "Errors.IDMissing")
}
existing, err := c.getTargetWriteModelByID(ctx, id, resourceOwner)
if err != nil {
return nil, err
}
if !existing.State.Exists() {
return nil, zerrors.ThrowNotFound(nil, "COMMAND-k4s7ucu0ax", "Errors.Target.NotFound")
}
if err := c.pushAppendAndReduce(ctx,
existing,
target.NewRemovedEvent(ctx,
TargetAggregateFromWriteModel(&existing.WriteModel),
existing.Name,
),
); err != nil {
return nil, err
}
return writeModelToObjectDetails(&existing.WriteModel), nil
}
func (c *Commands) getTargetWriteModelByID(ctx context.Context, id string, resourceOwner string) (*TargetWriteModel, error) {
wm := NewTargetWriteModel(id, resourceOwner)
err := c.eventstore.FilterToQueryReducer(ctx, wm)
if err != nil {
return nil, err
}
return wm, nil
}

View File

@@ -0,0 +1,127 @@
package command
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/action"
"github.com/zitadel/zitadel/internal/repository/target"
)
type TargetWriteModel struct {
eventstore.WriteModel
Name string
TargetType domain.TargetType
URL string
Timeout time.Duration
Async bool
InterruptOnError bool
State domain.TargetState
}
func NewTargetWriteModel(id string, resourceOwner string) *TargetWriteModel {
return &TargetWriteModel{
WriteModel: eventstore.WriteModel{
AggregateID: id,
ResourceOwner: resourceOwner,
InstanceID: resourceOwner,
},
}
}
func (wm *TargetWriteModel) Reduce() error {
for _, event := range wm.Events {
switch e := event.(type) {
case *target.AddedEvent:
wm.Name = e.Name
wm.TargetType = e.TargetType
wm.URL = e.URL
wm.Timeout = e.Timeout
wm.Async = e.Async
wm.State = domain.TargetActive
case *target.ChangedEvent:
if e.Name != nil {
wm.Name = *e.Name
}
if e.TargetType != nil {
wm.TargetType = *e.TargetType
}
if e.URL != nil {
wm.URL = *e.URL
}
if e.Timeout != nil {
wm.Timeout = *e.Timeout
}
if e.Async != nil {
wm.Async = *e.Async
}
if e.InterruptOnError != nil {
wm.InterruptOnError = *e.InterruptOnError
}
case *action.RemovedEvent:
wm.State = domain.TargetRemoved
}
}
return wm.WriteModel.Reduce()
}
func (wm *TargetWriteModel) Query() *eventstore.SearchQueryBuilder {
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
ResourceOwner(wm.ResourceOwner).
AddQuery().
AggregateTypes(target.AggregateType).
AggregateIDs(wm.AggregateID).
EventTypes(target.AddedEventType,
target.ChangedEventType,
target.RemovedEventType).
Builder()
}
func (wm *TargetWriteModel) NewChangedEvent(
ctx context.Context,
agg *eventstore.Aggregate,
name *string,
targetType *domain.TargetType,
url *string,
timeout *time.Duration,
async *bool,
interruptOnError *bool,
) *target.ChangedEvent {
changes := make([]target.Changes, 0)
if name != nil && wm.Name != *name {
changes = append(changes, target.ChangeName(wm.Name, *name))
}
if targetType != nil && wm.TargetType != *targetType {
changes = append(changes, target.ChangeTargetType(*targetType))
}
if url != nil && wm.URL != *url {
changes = append(changes, target.ChangeURL(*url))
}
if timeout != nil && wm.Timeout != *timeout {
changes = append(changes, target.ChangeTimeout(*timeout))
}
if async != nil && wm.Async != *async {
changes = append(changes, target.ChangeAsync(*async))
}
if interruptOnError != nil && wm.InterruptOnError != *interruptOnError {
changes = append(changes, target.ChangeInterruptOnError(*interruptOnError))
}
if len(changes) == 0 {
return nil
}
return target.NewChangedEvent(ctx, agg, changes)
}
func TargetAggregateFromWriteModel(wm *eventstore.WriteModel) *eventstore.Aggregate {
return &eventstore.Aggregate{
ID: wm.AggregateID,
Type: target.AggregateType,
ResourceOwner: wm.ResourceOwner,
InstanceID: wm.InstanceID,
Version: target.AggregateVersion,
}
}

View File

@@ -0,0 +1,676 @@
package command
import (
"context"
"testing"
"time"
"github.com/muhlemmer/gu"
"github.com/stretchr/testify/assert"
"github.com/zitadel/zitadel/internal/domain"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/id"
"github.com/zitadel/zitadel/internal/id/mock"
"github.com/zitadel/zitadel/internal/repository/target"
"github.com/zitadel/zitadel/internal/zerrors"
)
func TestCommands_AddTarget(t *testing.T) {
type fields struct {
eventstore *eventstore.Eventstore
idGenerator id.Generator
}
type args struct {
ctx context.Context
add *AddTarget
resourceOwner string
}
type res struct {
id string
details *domain.ObjectDetails
err func(error) bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
"no resourceowner, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
add: &AddTarget{},
resourceOwner: "",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"no name, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
add: &AddTarget{},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"no timeout, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
add: &AddTarget{
Name: "name",
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"no url, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
add: &AddTarget{
Name: "name",
Timeout: time.Second,
URL: "",
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"no parsable url, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
add: &AddTarget{
Name: "name",
Timeout: time.Second,
URL: "://",
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"unique constraint failed, error",
fields{
eventstore: eventstoreExpect(t,
expectPushFailed(
zerrors.ThrowPreconditionFailed(nil, "id", "name already exists"),
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
time.Second,
false,
false,
),
),
),
idGenerator: mock.ExpectID(t, "id1"),
},
args{
ctx: context.Background(),
add: &AddTarget{
Name: "name",
URL: "https://example.com",
Timeout: time.Second,
TargetType: domain.TargetTypeWebhook,
},
resourceOwner: "org1",
},
res{
err: zerrors.IsPreconditionFailed,
},
},
{
"push ok",
fields{
eventstore: eventstoreExpect(t,
expectPush(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
time.Second,
false,
false,
),
),
),
idGenerator: mock.ExpectID(t, "id1"),
},
args{
ctx: context.Background(),
add: &AddTarget{
Name: "name",
TargetType: domain.TargetTypeWebhook,
Timeout: time.Second,
URL: "https://example.com",
},
resourceOwner: "org1",
},
res{
id: "id1",
details: &domain.ObjectDetails{
ResourceOwner: "org1",
},
},
},
{
"push full ok",
fields{
eventstore: eventstoreExpect(t,
expectPush(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
time.Second,
true,
true,
),
),
),
idGenerator: mock.ExpectID(t, "id1"),
},
args{
ctx: context.Background(),
add: &AddTarget{
Name: "name",
TargetType: domain.TargetTypeWebhook,
URL: "https://example.com",
Timeout: time.Second,
Async: true,
InterruptOnError: true,
},
resourceOwner: "org1",
},
res{
id: "id1",
details: &domain.ObjectDetails{
ResourceOwner: "org1",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Commands{
eventstore: tt.fields.eventstore,
idGenerator: tt.fields.idGenerator,
}
details, err := c.AddTarget(tt.args.ctx, tt.args.add, tt.args.resourceOwner)
if tt.res.err == nil {
assert.NoError(t, err)
}
if tt.res.err != nil && !tt.res.err(err) {
t.Errorf("got wrong err: %v ", err)
}
if tt.res.err == nil {
assert.Equal(t, tt.res.id, tt.args.add.AggregateID)
assert.Equal(t, tt.res.details, details)
}
})
}
}
func TestCommands_ChangeTarget(t *testing.T) {
type fields struct {
eventstore *eventstore.Eventstore
}
type args struct {
ctx context.Context
change *ChangeTarget
resourceOwner string
}
type res struct {
details *domain.ObjectDetails
err func(error) bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
"resourceowner missing, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
change: &ChangeTarget{},
resourceOwner: "",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"id missing, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
change: &ChangeTarget{},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"name empty, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
Name: gu.Ptr(""),
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"timeout empty, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
Timeout: gu.Ptr(time.Duration(0)),
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"url empty, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
URL: gu.Ptr(""),
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"url not parsable, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
URL: gu.Ptr("://"),
},
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"not found, error",
fields{
eventstore: eventstoreExpect(t,
expectFilter(),
),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
ObjectRoot: models.ObjectRoot{
AggregateID: "id1",
},
Name: gu.Ptr("name"),
},
resourceOwner: "org1",
},
res{
err: zerrors.IsNotFound,
},
},
{
"no changes",
fields{
eventstore: eventstoreExpect(t,
expectFilter(
eventFromEventPusher(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
0,
false,
false,
),
),
),
),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
ObjectRoot: models.ObjectRoot{
AggregateID: "id1",
},
TargetType: gu.Ptr(domain.TargetTypeWebhook),
},
resourceOwner: "org1",
},
res{
details: &domain.ObjectDetails{
ResourceOwner: "org1",
},
},
},
{
"unique constraint failed, error",
fields{
eventstore: eventstoreExpect(t,
expectFilter(
eventFromEventPusher(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
0,
false,
false,
),
),
),
expectPushFailed(
zerrors.ThrowPreconditionFailed(nil, "id", "name already exists"),
target.NewChangedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
[]target.Changes{
target.ChangeName("name", "name2"),
},
),
),
),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
ObjectRoot: models.ObjectRoot{
AggregateID: "id1",
},
Name: gu.Ptr("name2"),
},
resourceOwner: "org1",
},
res{
err: zerrors.IsPreconditionFailed,
},
},
{
"push ok",
fields{
eventstore: eventstoreExpect(t,
expectFilter(
eventFromEventPusher(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
0,
false,
false,
),
),
),
expectPush(
target.NewChangedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
[]target.Changes{
target.ChangeName("name", "name2"),
},
),
),
),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
ObjectRoot: models.ObjectRoot{
AggregateID: "id1",
},
Name: gu.Ptr("name2"),
},
resourceOwner: "org1",
},
res{
details: &domain.ObjectDetails{
ResourceOwner: "org1",
},
},
},
{
"push full ok",
fields{
eventstore: eventstoreExpect(t,
expectFilter(
eventFromEventPusher(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
0,
false,
false,
),
),
),
expectPush(
target.NewChangedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
[]target.Changes{
target.ChangeName("name", "name2"),
target.ChangeURL("https://example2.com"),
target.ChangeTargetType(domain.TargetTypeRequestResponse),
target.ChangeTimeout(time.Second),
target.ChangeAsync(true),
target.ChangeInterruptOnError(true),
},
),
),
),
},
args{
ctx: context.Background(),
change: &ChangeTarget{
ObjectRoot: models.ObjectRoot{
AggregateID: "id1",
},
Name: gu.Ptr("name2"),
URL: gu.Ptr("https://example2.com"),
TargetType: gu.Ptr(domain.TargetTypeRequestResponse),
Timeout: gu.Ptr(time.Second),
Async: gu.Ptr(true),
InterruptOnError: gu.Ptr(true),
},
resourceOwner: "org1",
},
res{
details: &domain.ObjectDetails{
ResourceOwner: "org1",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Commands{
eventstore: tt.fields.eventstore,
}
details, err := c.ChangeTarget(tt.args.ctx, tt.args.change, tt.args.resourceOwner)
if tt.res.err == nil {
assert.NoError(t, err)
}
if tt.res.err != nil && !tt.res.err(err) {
t.Errorf("got wrong err: %v ", err)
}
if tt.res.err == nil {
assert.Equal(t, tt.res.details, details)
}
})
}
}
func TestCommands_DeleteTarget(t *testing.T) {
type fields struct {
eventstore *eventstore.Eventstore
}
type args struct {
ctx context.Context
id string
resourceOwner string
}
type res struct {
details *domain.ObjectDetails
err func(error) bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
"id missing, error",
fields{
eventstore: eventstoreExpect(t),
},
args{
ctx: context.Background(),
id: "",
resourceOwner: "org1",
},
res{
err: zerrors.IsErrorInvalidArgument,
},
},
{
"not found, error",
fields{
eventstore: eventstoreExpect(t,
expectFilter(),
),
},
args{
ctx: context.Background(),
id: "id1",
resourceOwner: "org1",
},
res{
err: zerrors.IsNotFound,
},
},
{
"remove ok",
fields{
eventstore: eventstoreExpect(t,
expectFilter(
eventFromEventPusher(
target.NewAddedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
domain.TargetTypeWebhook,
"https://example.com",
0,
false,
false,
),
),
),
expectPush(
target.NewRemovedEvent(context.Background(),
target.NewAggregate("id1", "org1"),
"name",
),
),
),
},
args{
ctx: context.Background(),
id: "id1",
resourceOwner: "org1",
},
res{
details: &domain.ObjectDetails{
ResourceOwner: "org1",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Commands{
eventstore: tt.fields.eventstore,
}
details, err := c.DeleteTarget(tt.args.ctx, tt.args.id, tt.args.resourceOwner)
if tt.res.err == nil {
assert.NoError(t, err)
}
if tt.res.err != nil && !tt.res.err(err) {
t.Errorf("got wrong err: %v ", err)
}
if tt.res.err == nil {
assert.Equal(t, tt.res.details, details)
}
})
}
}