mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:37:32 +00:00
feat(eventstore): sdk (#39)
* sdk * fix(sdk): return correct error type * AppendEventError instead of Aggregater error * fix(tests): tests * fix(tests): wantErr to is error func
This commit is contained in:
@@ -2,8 +2,10 @@ package eventsourcing
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
caos_errs "github.com/caos/zitadel/internal/errors"
|
||||
es_int "github.com/caos/zitadel/internal/eventstore"
|
||||
es_sdk "github.com/caos/zitadel/internal/eventstore/sdk"
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
)
|
||||
|
||||
@@ -20,22 +22,17 @@ func StartProject(conf ProjectConfig) (*ProjectEventstore, error) {
|
||||
}
|
||||
|
||||
func (es *ProjectEventstore) ProjectByID(ctx context.Context, project *proj_model.Project) (*proj_model.Project, error) {
|
||||
filter, err := ProjectByIDQuery(project.ID, project.Sequence)
|
||||
query, err := ProjectByIDQuery(project.ID, project.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
events, err := es.Eventstore.FilterEvents(ctx, filter)
|
||||
|
||||
p := ProjectFromModel(project)
|
||||
err = es_sdk.Filter(ctx, es.FilterEvents, p.AppendEvents, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(events) == 0 {
|
||||
return nil, caos_errs.ThrowNotFound(nil, "EVENT-8due3", "Could not find project events")
|
||||
}
|
||||
foundProject, err := ProjectFromEvents(nil, events...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ProjectToModel(foundProject), nil
|
||||
return ProjectToModel(p), nil
|
||||
}
|
||||
|
||||
func (es *ProjectEventstore) CreateProject(ctx context.Context, project *proj_model.Project) (*proj_model.Project, error) {
|
||||
@@ -44,34 +41,29 @@ func (es *ProjectEventstore) CreateProject(ctx context.Context, project *proj_mo
|
||||
}
|
||||
project.State = proj_model.Active
|
||||
repoProject := ProjectFromModel(project)
|
||||
projectAggregate, err := ProjectCreateAggregate(ctx, es.Eventstore.AggregateCreator(), repoProject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = es.PushAggregates(ctx, projectAggregate)
|
||||
|
||||
createAggregate := ProjectCreateAggregate(es.AggregateCreator(), repoProject)
|
||||
err := es_sdk.Push(ctx, es.PushAggregates, repoProject.AppendEvents, createAggregate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repoProject.AppendEvents(projectAggregate.Events...)
|
||||
return ProjectToModel(repoProject), nil
|
||||
}
|
||||
|
||||
func (es *ProjectEventstore) UpdateProject(ctx context.Context, existing *proj_model.Project, new *proj_model.Project) (*proj_model.Project, error) {
|
||||
if !new.IsValid() {
|
||||
func (es *ProjectEventstore) UpdateProject(ctx context.Context, existingProject *proj_model.Project, project *proj_model.Project) (*proj_model.Project, error) {
|
||||
if !project.IsValid() {
|
||||
return nil, caos_errs.ThrowPreconditionFailed(nil, "EVENT-9dk45", "Name is required")
|
||||
}
|
||||
repoExisting := ProjectFromModel(existing)
|
||||
repoNew := ProjectFromModel(new)
|
||||
projectAggregate, err := ProjectUpdateAggregate(ctx, es.AggregateCreator(), repoExisting, repoNew)
|
||||
repoExisting := ProjectFromModel(existingProject)
|
||||
repoNew := ProjectFromModel(project)
|
||||
|
||||
updateAggregate := ProjectUpdateAggregate(es.AggregateCreator(), repoExisting, repoNew)
|
||||
err := es_sdk.Push(ctx, es.PushAggregates, repoExisting.AppendEvents, updateAggregate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = es.PushAggregates(ctx, projectAggregate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
repoExisting.AppendEvents(projectAggregate.Events...)
|
||||
|
||||
return ProjectToModel(repoExisting), nil
|
||||
}
|
||||
|
||||
@@ -79,16 +71,10 @@ func (es *ProjectEventstore) DeactivateProject(ctx context.Context, existing *pr
|
||||
if !existing.IsActive() {
|
||||
return nil, caos_errs.ThrowPreconditionFailed(nil, "EVENT-die45", "project must be active")
|
||||
}
|
||||
|
||||
repoExisting := ProjectFromModel(existing)
|
||||
projectAggregate, err := ProjectDeactivateAggregate(ctx, es.AggregateCreator(), repoExisting)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = es.PushAggregates(ctx, projectAggregate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
repoExisting.AppendEvents(projectAggregate.Events...)
|
||||
aggregate := ProjectDeactivateAggregate(es.AggregateCreator(), repoExisting)
|
||||
es_sdk.Push(ctx, es.PushAggregates, repoExisting.AppendEvents, aggregate)
|
||||
return ProjectToModel(repoExisting), nil
|
||||
}
|
||||
|
||||
@@ -96,15 +82,9 @@ func (es *ProjectEventstore) ReactivateProject(ctx context.Context, existing *pr
|
||||
if existing.IsActive() {
|
||||
return nil, caos_errs.ThrowPreconditionFailed(nil, "EVENT-die45", "project must be inactive")
|
||||
}
|
||||
|
||||
repoExisting := ProjectFromModel(existing)
|
||||
projectAggregate, err := ProjectReactivateAggregate(ctx, es.AggregateCreator(), repoExisting)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = es.PushAggregates(ctx, projectAggregate)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
repoExisting.AppendEvents(projectAggregate.Events...)
|
||||
aggregate := ProjectReactivateAggregate(es.AggregateCreator(), repoExisting)
|
||||
es_sdk.Push(ctx, es.PushAggregates, repoExisting.AppendEvents, aggregate)
|
||||
return ProjectToModel(repoExisting), nil
|
||||
}
|
||||
|
@@ -2,11 +2,48 @@ package eventsourcing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/caos/logging"
|
||||
es_models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/project/model"
|
||||
)
|
||||
|
||||
const (
|
||||
projectVersion = "v1"
|
||||
)
|
||||
|
||||
type Project struct {
|
||||
es_models.ObjectRoot
|
||||
Name string `json:"name,omitempty"`
|
||||
State int32 `json:"-"`
|
||||
}
|
||||
|
||||
func ProjectFromModel(project *model.Project) *Project {
|
||||
return &Project{
|
||||
ObjectRoot: es_models.ObjectRoot{
|
||||
ID: project.ObjectRoot.ID,
|
||||
Sequence: project.Sequence,
|
||||
ChangeDate: project.ChangeDate,
|
||||
CreationDate: project.CreationDate,
|
||||
},
|
||||
Name: project.Name,
|
||||
State: model.ProjectStateToInt(project.State),
|
||||
}
|
||||
}
|
||||
|
||||
func ProjectToModel(project *Project) *model.Project {
|
||||
return &model.Project{
|
||||
ObjectRoot: es_models.ObjectRoot{
|
||||
ID: project.ID,
|
||||
ChangeDate: project.ChangeDate,
|
||||
CreationDate: project.CreationDate,
|
||||
Sequence: project.Sequence,
|
||||
},
|
||||
Name: project.Name,
|
||||
State: model.ProjectStateFromInt(project.State),
|
||||
}
|
||||
}
|
||||
|
||||
func ProjectFromEvents(project *Project, events ...*es_models.Event) (*Project, error) {
|
||||
if project == nil {
|
||||
project = &Project{}
|
||||
@@ -15,6 +52,14 @@ func ProjectFromEvents(project *Project, events ...*es_models.Event) (*Project,
|
||||
return project, project.AppendEvents(events...)
|
||||
}
|
||||
|
||||
func (p *Project) Changes(changed *Project) map[string]interface{} {
|
||||
changes := make(map[string]interface{}, 1)
|
||||
if changed.Name != "" && p.Name != changed.Name {
|
||||
changes["name"] = changed.Name
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
func (p *Project) AppendEvents(events ...*es_models.Event) error {
|
||||
for _, event := range events {
|
||||
if err := p.AppendEvent(event); err != nil {
|
@@ -2,9 +2,10 @@ package eventsourcing
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
es_models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/project/model"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestProjectFromEvents(t *testing.T) {
|
||||
@@ -167,3 +168,47 @@ func TestAppendReactivatedEvent(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestChanges(t *testing.T) {
|
||||
type args struct {
|
||||
existing *Project
|
||||
new *Project
|
||||
}
|
||||
type res struct {
|
||||
changesLen int
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
res res
|
||||
}{
|
||||
{
|
||||
name: "project name changes",
|
||||
args: args{
|
||||
existing: &Project{Name: "Name"},
|
||||
new: &Project{Name: "NameChanged"},
|
||||
},
|
||||
res: res{
|
||||
changesLen: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no changes",
|
||||
args: args{
|
||||
existing: &Project{Name: "Name"},
|
||||
new: &Project{Name: "Name"},
|
||||
},
|
||||
res: res{
|
||||
changesLen: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
changes := tt.args.existing.Changes(tt.args.new)
|
||||
if len(changes) != tt.res.changesLen {
|
||||
t.Errorf("got wrong changes len: expected: %v, actual: %v ", tt.res.changesLen, len(changes))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@@ -5,6 +5,7 @@ import (
|
||||
"strconv"
|
||||
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
es_models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/sony/sonyflake"
|
||||
@@ -12,50 +13,6 @@ import (
|
||||
|
||||
var idGenerator = sonyflake.NewSonyflake(sonyflake.Settings{})
|
||||
|
||||
const (
|
||||
projectVersion = "v1"
|
||||
)
|
||||
|
||||
type Project struct {
|
||||
es_models.ObjectRoot
|
||||
Name string `json:"name,omitempty"`
|
||||
State int32 `json:"-"`
|
||||
}
|
||||
|
||||
func (p *Project) Changes(changed *Project) map[string]interface{} {
|
||||
changes := make(map[string]interface{}, 1)
|
||||
if changed.Name != "" && p.Name != changed.Name {
|
||||
changes["name"] = changed.Name
|
||||
}
|
||||
return changes
|
||||
}
|
||||
|
||||
func ProjectFromModel(project *model.Project) *Project {
|
||||
return &Project{
|
||||
ObjectRoot: es_models.ObjectRoot{
|
||||
ID: project.ObjectRoot.ID,
|
||||
Sequence: project.Sequence,
|
||||
ChangeDate: project.ChangeDate,
|
||||
CreationDate: project.CreationDate,
|
||||
},
|
||||
Name: project.Name,
|
||||
State: model.ProjectStateToInt(project.State),
|
||||
}
|
||||
}
|
||||
|
||||
func ProjectToModel(project *Project) *model.Project {
|
||||
return &model.Project{
|
||||
ObjectRoot: es_models.ObjectRoot{
|
||||
ID: project.ID,
|
||||
ChangeDate: project.ChangeDate,
|
||||
CreationDate: project.CreationDate,
|
||||
Sequence: project.Sequence,
|
||||
},
|
||||
Name: project.Name,
|
||||
State: model.ProjectStateFromInt(project.State),
|
||||
}
|
||||
}
|
||||
|
||||
func ProjectByIDQuery(id string, latestSequence uint64) (*es_models.SearchQuery, error) {
|
||||
if id == "" {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-dke74", "id should be filled")
|
||||
@@ -74,58 +31,61 @@ func ProjectAggregate(ctx context.Context, aggCreator *es_models.AggregateCreato
|
||||
return aggCreator.NewAggregate(ctx, id, model.ProjectAggregate, projectVersion, sequence)
|
||||
}
|
||||
|
||||
func ProjectCreateAggregate(ctx context.Context, aggCreator *es_models.AggregateCreator, project *Project) (*es_models.Aggregate, error) {
|
||||
if project == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-kdie6", "project should not be nil")
|
||||
}
|
||||
var err error
|
||||
id, err := idGenerator.NextID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
project.ID = strconv.FormatUint(id, 10)
|
||||
func ProjectCreateAggregate(aggCreator *es_models.AggregateCreator, project *Project) func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
return func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
if project == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-kdie6", "project should not be nil")
|
||||
}
|
||||
var err error
|
||||
id, err := idGenerator.NextID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
project.ID = strconv.FormatUint(id, 10)
|
||||
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, project.ID, project.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, project.ID, project.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return agg.AppendEvent(model.ProjectAdded, project)
|
||||
return agg.AppendEvent(model.ProjectAdded, project)
|
||||
}
|
||||
}
|
||||
|
||||
func ProjectUpdateAggregate(ctx context.Context, aggCreator *es_models.AggregateCreator, existing *Project, new *Project) (*es_models.Aggregate, error) {
|
||||
if existing == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-dk93d", "existing project should not be nil")
|
||||
func ProjectUpdateAggregate(aggCreator *es_models.AggregateCreator, existing *Project, new *Project) func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
return func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
if existing == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-dk93d", "existing project should not be nil")
|
||||
}
|
||||
if new == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-dhr74", "new project should not be nil")
|
||||
}
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, existing.ID, existing.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
changes := existing.Changes(new)
|
||||
return agg.AppendEvent(model.ProjectChanged, changes)
|
||||
}
|
||||
if new == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-dhr74", "new project should not be nil")
|
||||
}
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, existing.ID, existing.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
changes := existing.Changes(new)
|
||||
return agg.AppendEvent(model.ProjectChanged, changes)
|
||||
}
|
||||
|
||||
func ProjectDeactivateAggregate(ctx context.Context, aggCreator *es_models.AggregateCreator, existing *Project) (*es_models.Aggregate, error) {
|
||||
if existing == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-ueh45", "existing project should not be nil")
|
||||
}
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, existing.ID, existing.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return agg.AppendEvent(model.ProjectDeactivated, nil)
|
||||
func ProjectDeactivateAggregate(aggCreator *es_models.AggregateCreator, project *Project) func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
return projectStateAggregate(aggCreator, project, model.ProjectDeactivated)
|
||||
}
|
||||
|
||||
func ProjectReactivateAggregate(ctx context.Context, aggCreator *es_models.AggregateCreator, existing *Project) (*es_models.Aggregate, error) {
|
||||
if existing == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-37dur", "existing project should not be nil")
|
||||
}
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, existing.ID, existing.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return agg.AppendEvent(model.ProjectReactivated, nil)
|
||||
func ProjectReactivateAggregate(aggCreator *es_models.AggregateCreator, project *Project) func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
return projectStateAggregate(aggCreator, project, model.ProjectReactivated)
|
||||
}
|
||||
|
||||
func projectStateAggregate(aggCreator *es_models.AggregateCreator, project *Project, state models.EventType) func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
return func(ctx context.Context) (*es_models.Aggregate, error) {
|
||||
if project == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "EVENT-37dur", "existing project should not be nil")
|
||||
}
|
||||
agg, err := ProjectAggregate(ctx, aggCreator, project.ID, project.Sequence)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return agg.AppendEvent(state, nil)
|
||||
}
|
||||
}
|
||||
|
@@ -2,57 +2,14 @@ package eventsourcing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/caos/zitadel/internal/api/auth"
|
||||
caos_errs "github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/project/model"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestChanges(t *testing.T) {
|
||||
type args struct {
|
||||
existing *Project
|
||||
new *Project
|
||||
}
|
||||
type res struct {
|
||||
changesLen int
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
res res
|
||||
}{
|
||||
{
|
||||
name: "project name changes",
|
||||
args: args{
|
||||
existing: &Project{Name: "Name"},
|
||||
new: &Project{Name: "NameChanged"},
|
||||
},
|
||||
res: res{
|
||||
changesLen: 1,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no changes",
|
||||
args: args{
|
||||
existing: &Project{Name: "Name"},
|
||||
new: &Project{Name: "Name"},
|
||||
},
|
||||
res: res{
|
||||
changesLen: 0,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
changes := tt.args.existing.Changes(tt.args.new)
|
||||
if len(changes) != tt.res.changesLen {
|
||||
t.Errorf("got wrong changes len: expected: %v, actual: %v ", tt.res.changesLen, len(changes))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProjectByIDQuery(t *testing.T) {
|
||||
type args struct {
|
||||
id string
|
||||
@@ -231,7 +188,7 @@ func TestProjectCreateAggregate(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
agg, err := ProjectCreateAggregate(tt.args.ctx, tt.args.aggCreator, tt.args.new)
|
||||
agg, err := ProjectCreateAggregate(tt.args.aggCreator, tt.args.new)(tt.args.ctx)
|
||||
|
||||
if !tt.res.wantErr && len(agg.Events) != tt.res.eventLen {
|
||||
t.Errorf("got wrong event len: expected: %v, actual: %v ", tt.res.eventLen, len(agg.Events))
|
||||
@@ -312,7 +269,7 @@ func TestProjectUpdateAggregate(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
agg, err := ProjectUpdateAggregate(tt.args.ctx, tt.args.aggCreator, tt.args.existing, tt.args.new)
|
||||
agg, err := ProjectUpdateAggregate(tt.args.aggCreator, tt.args.existing, tt.args.new)(tt.args.ctx)
|
||||
|
||||
if !tt.res.wantErr && len(agg.Events) != tt.res.eventLen {
|
||||
t.Errorf("got wrong event len: expected: %v, actual: %v ", tt.res.eventLen, len(agg.Events))
|
||||
@@ -376,7 +333,7 @@ func TestProjectDeactivateAggregate(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
agg, err := ProjectDeactivateAggregate(tt.args.ctx, tt.args.aggCreator, tt.args.existing)
|
||||
agg, err := ProjectDeactivateAggregate(tt.args.aggCreator, tt.args.existing)(tt.args.ctx)
|
||||
|
||||
if !tt.res.wantErr && len(agg.Events) != tt.res.eventLen {
|
||||
t.Errorf("got wrong event len: expected: %v, actual: %v ", tt.res.eventLen, len(agg.Events))
|
||||
@@ -437,7 +394,7 @@ func TestProjectReactivateAggregate(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
agg, err := ProjectReactivateAggregate(tt.args.ctx, tt.args.aggCreator, tt.args.existing)
|
||||
agg, err := ProjectReactivateAggregate(tt.args.aggCreator, tt.args.existing)(tt.args.ctx)
|
||||
|
||||
if !tt.res.wantErr && len(agg.Events) != tt.res.eventLen {
|
||||
t.Errorf("got wrong event len: expected: %v, actual: %v ", tt.res.eventLen, len(agg.Events))
|
||||
|
Reference in New Issue
Block a user