start sqlite migrations

This commit is contained in:
adlerhurst
2020-09-30 10:00:05 +02:00
parent 7da344be26
commit e4d8478b04
36 changed files with 2044 additions and 180 deletions

View File

@@ -1,22 +0,0 @@
package eventstore
//AggregateType is the object name
type AggregateType string
//Aggregate represents an object
type Aggregate struct {
//ID id is the unique identifier of the aggregate
// the client must generate it by it's own
ID string
//Type describes the meaning of this aggregate
// it could an object like user
Type AggregateType
//ResourceOwner is the organisation which owns this aggregate
// an aggregate can only be managed by one organisation
// use the ID of the org
ResourceOwner string
//Events describe all the changes made on an aggregate
Events []*Event
}

View File

@@ -0,0 +1,283 @@
package eventstore_test
import (
"encoding/json"
"fmt"
"log"
)
//ReadModel is the minimum representation of a View model.
// it might be saved in a database or in memory
type ReadModel struct {
ProcessedSequence uint64
ID string
events []Event
}
//Append adds all the events to the aggregate.
// The function doesn't compute the new state of the read model
func (a *ReadModel) Append(events ...Event) {
a.events = append(a.events, events...)
}
type ProjectReadModel struct {
ReadModel
Apps []*AppReadModel
Name string
}
func (p *ProjectReadModel) Append(events ...Event) {
for _, event := range events {
switch event.(type) {
case *AddAppEvent:
app := new(AppReadModel)
app.Append(event)
p.Apps = append(p.Apps, app)
case *UpdateAppEvent:
for _, app := range p.Apps {
app.Append(event)
}
}
}
p.events = append(p.events, events...)
}
//Reduce calculates the new state of the read model
func (p *ProjectReadModel) Reduce() error {
for i := range p.Apps {
if err := p.Apps[i].Reduce(); err != nil {
return err
}
}
for _, event := range p.events {
switch e := event.(type) {
case *CreateProjectEvent:
p.ID = e.ID
p.Name = e.Name
case *RemoveAppEvent:
for i := len(p.Apps) - 1; i >= 0; i-- {
app := p.Apps[i]
if app.ID == e.GetID() {
p.Apps[i] = p.Apps[len(p.Apps)-1]
p.Apps[len(p.Apps)-1] = nil
p.Apps = p.Apps[:len(p.Apps)-1]
}
}
}
p.ProcessedSequence = event.GetSequence()
}
return nil
}
type AppReadModel struct {
ReadModel
Name string
}
//Reduce calculates the new state of the read model
func (a *AppReadModel) Reduce() error {
for _, event := range a.events {
switch e := event.(type) {
case *AddAppEvent:
a.Name = e.Name
a.ID = e.GetID()
case *UpdateAppEvent:
a.Name = e.Name
}
a.ProcessedSequence = event.GetSequence()
}
return nil
}
//Event is the minimal representation of a event
// which can be processed by the read models
type Event interface {
//GetSequence returns the event sequence
GetSequence() uint64
//GetID returns the id of the aggregate. It's not the id of the event
GetID() string
}
//DefaultEvent is the implementation of Event
type DefaultEvent struct {
Sequence uint64 `json:"-"`
ID string `json:"-"`
}
func (e *DefaultEvent) GetID() string {
return e.ID
}
func (e *DefaultEvent) GetSequence() uint64 {
return e.Sequence
}
type CreateProjectEvent struct {
DefaultEvent
Name string `json:"name,omitempty"`
}
//CreateProjectEventFromEventstore returns the specific type
// of the general EventstoreEvent
func CreateProjectEventFromEventstore(event *EventstoreEvent) (Event, error) {
e := &CreateProjectEvent{
DefaultEvent: DefaultEvent{Sequence: event.Sequence, ID: event.AggregateID},
}
err := json.Unmarshal(event.Data, e)
return e, err
}
type AddAppEvent struct {
ProjectID string `json:"-"`
AppID string `json:"id"`
Sequence uint64 `json:"-"`
Name string `json:"name,omitempty"`
}
func (e *AddAppEvent) GetID() string {
return e.AppID
}
func (e *AddAppEvent) GetSequence() uint64 {
return e.Sequence
}
func AppAddedEventFromEventstore(event *EventstoreEvent) (Event, error) {
e := &AddAppEvent{
Sequence: event.Sequence,
ProjectID: event.AggregateID,
}
err := json.Unmarshal(event.Data, e)
return e, err
}
type UpdateAppEvent struct {
ProjectID string `json:"-"`
AppID string `json:"id"`
Sequence uint64 `json:"-"`
Name string `json:"name,omitempty"`
}
func (e *UpdateAppEvent) GetID() string {
return e.AppID
}
func (e *UpdateAppEvent) GetSequence() uint64 {
return e.Sequence
}
func AppUpdatedEventFromEventstore(event *EventstoreEvent) (Event, error) {
e := &UpdateAppEvent{
Sequence: event.Sequence,
ProjectID: event.AggregateID,
}
err := json.Unmarshal(event.Data, e)
return e, err
}
type RemoveAppEvent struct {
ProjectID string `json:"-"`
AppID string `json:"id"`
Sequence uint64 `json:"-"`
}
func (e *RemoveAppEvent) GetID() string {
return e.AppID
}
func (e *RemoveAppEvent) GetSequence() uint64 {
return e.Sequence
}
func AppRemovedEventFromEventstore(event *EventstoreEvent) (Event, error) {
e := &RemoveAppEvent{
Sequence: event.Sequence,
ProjectID: event.AggregateID,
}
err := json.Unmarshal(event.Data, e)
return e, err
}
func main() {
eventstore := &Eventstore{
eventMapper: map[string]func(*EventstoreEvent) (Event, error){
"project.added": CreateProjectEventFromEventstore,
"app.added": AppAddedEventFromEventstore,
"app.updated": AppUpdatedEventFromEventstore,
"app.removed": AppRemovedEventFromEventstore,
},
events: []*EventstoreEvent{
{
AggregateID: "p1",
EventType: "project.added",
Sequence: 1,
Data: []byte(`{"name":"hodor"}`),
},
{
AggregateID: "123",
EventType: "app.added",
Sequence: 2,
Data: []byte(`{"id":"a1", "name": "ap 1"}`),
},
{
AggregateID: "123",
EventType: "app.updated",
Sequence: 3,
Data: []byte(`{"id":"a1", "name":"app 1"}`),
},
{
AggregateID: "123",
EventType: "app.added",
Sequence: 4,
Data: []byte(`{"id":"a2", "name": "app 2"}`),
},
{
AggregateID: "123",
EventType: "app.removed",
Sequence: 5,
Data: []byte(`{"id":"a1"}`),
},
},
}
events, err := eventstore.GetEvents()
if err != nil {
log.Panic(err)
}
p := &ProjectReadModel{Apps: []*AppReadModel{}}
p.Append(events...)
p.Reduce()
fmt.Printf("%+v\n", p)
for _, app := range p.Apps {
fmt.Printf("%+v\n", app)
}
}
//Eventstore is a simple abstraction of the eventstore framework
type Eventstore struct {
eventMapper map[string]func(*EventstoreEvent) (Event, error)
events []*EventstoreEvent
}
func (es *Eventstore) GetEvents() (events []Event, err error) {
events = make([]Event, len(es.events))
for i, event := range es.events {
events[i], err = es.eventMapper[event.EventType](event)
if err != nil {
return nil, err
}
}
return events, nil
}
type EventstoreEvent struct {
AggregateID string
Sequence uint64
EventType string
Data []byte
}

View File

@@ -2,11 +2,204 @@ package eventstore
import (
"context"
"sync"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
type Eventstore interface {
Health(ctx context.Context) error
PushAggregates(ctx context.Context, aggregates ...*Aggregate) error
FilterEvents(ctx context.Context, searchQuery *SearchQueryFactory) (events []*Event, err error)
LatestSequence(ctx context.Context, searchQuery *SearchQueryFactory) (uint64, error)
type Event interface {
//CheckPrevious ensures the event order if true
// if false the previous sequence is not checked on push
CheckPrevious() bool
EditorService() string
EditorUser() string
Type() EventType
Data() interface{}
PreviousSequence() uint64
}
type eventAppender interface {
//AppendEvents appends the passed events to an internal list of events
AppendEvents(...Event) error
}
type reducer interface {
//Reduce handles the events of the internal events list
// it only appends the newly added events
Reduce() error
}
type aggregater interface {
eventAppender
reducer
//ID returns the aggreagte id
ID() string
//Type returns the aggregate type
Type() AggregateType
//Events returns the events which will be pushed
Events() []Event
//ResourceOwner returns the organisation id which manages this aggregate
ResourceOwner() string
//Version represents the semantic version of the aggregate
Version() Version
}
type readModeler interface {
eventAppender
reducer
}
type Eventstore struct {
repo repository.Repository
interceptorMutex sync.Mutex
eventMapper map[EventType]eventTypeInterceptors
}
type eventTypeInterceptors struct {
pushMapper func(Event) (*repository.Event, error)
filterMapper func(*repository.Event) (Event, error)
}
//Health checks if the eventstore can properly work
// It checks if the repository can serve load
func (es *Eventstore) Health(ctx context.Context) error {
return es.repo.Health(ctx)
}
//PushAggregates maps the events of all aggregates to an eventstore event
// based on the pushMapper
func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...aggregater) ([]Event, error) {
events, err := es.aggregatesToEvents(aggregates)
if err != nil {
return nil, err
}
err = es.repo.Push(ctx, events...)
if err != nil {
return nil, err
}
return es.mapEvents(events)
}
func (es *Eventstore) aggregatesToEvents(aggregates []aggregater) ([]*repository.Event, error) {
events := make([]*repository.Event, 0, len(aggregates))
for _, aggregate := range aggregates {
var previousEvent *repository.Event
for _, event := range aggregate.Events() {
events = append(events, &repository.Event{
AggregateID: aggregate.ID(),
AggregateType: repository.AggregateType(aggregate.Type()),
ResourceOwner: aggregate.ResourceOwner(),
EditorService: event.EditorService(),
EditorUser: event.EditorUser(),
Type: repository.EventType(event.Type()),
Version: repository.Version(aggregate.Version()),
PreviousEvent: previousEvent,
Data: event.Data(),
})
if previousEvent != nil && event.CheckPrevious() {
events[len(events)-1].PreviousSequence = event.PreviousSequence()
}
previousEvent = events[len(events)-1]
}
}
return events, nil
}
//FilterEvents filters the stored events based on the searchQuery
// and maps the events to the defined event structs
func (es *Eventstore) FilterEvents(ctx context.Context, queryFactory *SearchQueryFactory) ([]Event, error) {
query, err := queryFactory.Build()
if err != nil {
return nil, err
}
events, err := es.repo.Filter(ctx, query)
if err != nil {
return nil, err
}
return es.mapEvents(events)
}
func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Event, err error) {
mappedEvents = make([]Event, len(events))
es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock()
for i, event := range events {
interceptors, ok := es.eventMapper[EventType(event.Type)]
if !ok || interceptors.filterMapper == nil {
return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined")
}
mappedEvents[i], err = interceptors.filterMapper(event)
if err != nil {
return nil, err
}
}
return mappedEvents, nil
}
//FilterToAggregate filters the events based on the searchQuery, appends all events to the aggregate and reduces the aggregate
func (es *Eventstore) FilterToAggregate(ctx context.Context, searchQuery *SearchQueryFactory, aggregate aggregater) (err error) {
events, err := es.FilterEvents(ctx, searchQuery)
if err != nil {
return err
}
if err = aggregate.AppendEvents(events...); err != nil {
return err
}
return aggregate.Reduce()
}
//FilterToReadModel filters the events based on the searchQuery, appends all events to the readModel and reduces the readModel
func (es *Eventstore) FilterToReadModel(ctx context.Context, searchQuery *SearchQueryFactory, readModel readModeler) (err error) {
events, err := es.FilterEvents(ctx, searchQuery)
if err != nil {
return err
}
if err = readModel.AppendEvents(events...); err != nil {
return err
}
return readModel.Reduce()
}
func (es *Eventstore) LatestSequence(ctx context.Context, searchQuery *SearchQueryFactory) (uint64, error) {
return 0, nil
}
//RegisterPushEventMapper registers a function for mapping an eventstore event to an event
func (es *Eventstore) RegisterFilterEventMapper(eventType EventType, mapper func(*repository.Event) (Event, error)) error {
if eventType == "" || mapper == nil {
return errors.ThrowInvalidArgument(nil, "V2-IPpUR", "eventType and mapper must be filled")
}
es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock()
interceptor := es.eventMapper[eventType]
interceptor.filterMapper = mapper
es.eventMapper[eventType] = interceptor
return nil
}
//RegisterPushEventMapper registers a function for mapping an event to an eventstore event
func (es *Eventstore) RegisterPushEventMapper(eventType EventType, mapper func(Event) (*repository.Event, error)) error {
if eventType == "" || mapper == nil {
return errors.ThrowInvalidArgument(nil, "V2-Kexpp", "eventType and mapper must be filled")
}
es.interceptorMutex.Lock()
defer es.interceptorMutex.Unlock()
interceptor := es.eventMapper[eventType]
interceptor.pushMapper = mapper
es.eventMapper[eventType] = interceptor
return nil
}

View File

@@ -1,28 +0,0 @@
package eventstore
import "context"
type inMemoryEventstore struct {
events []*Event
}
func (es *inMemoryEventstore) Health(ctx context.Context) error {
return nil
}
func (es *inMemoryEventstore) PushAggregates(ctx context.Context, aggregates ...*Aggregate) error {
return nil
}
func (es *inMemoryEventstore) FilterEvents(ctx context.Context, searchQuery *SearchQueryFactory) (events []*Event, err error) {
return nil, nil
}
func (es *inMemoryEventstore) LatestSequence(ctx context.Context, searchQuery *SearchQueryFactory) (uint64, error) {
query, err := searchQuery.Build()
if err != nil {
return 0, err
}
query.Filters[0].
return 0, nil
}

View File

@@ -1,24 +1,266 @@
package eventstore
import "testing"
import (
"reflect"
"testing"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
type testEvent struct {
description string
shouldCheckPrevious bool
}
func (e *testEvent) CheckPrevious() bool {
return e.shouldCheckPrevious
}
func testPushMapper(Event) (*repository.Event, error) {
return &repository.Event{AggregateID: "aggregateID"}, nil
}
func Test_eventstore_RegisterPushEventMapper(t *testing.T) {
type fields struct {
eventMapper map[EventType]eventTypeInterceptors
}
type args struct {
eventType EventType
mapper func(Event) (*repository.Event, error)
}
type res struct {
event *repository.Event
isErr func(error) bool
}
func TestPushAggregates(t *testing.T) {
type res struct{}
type args struct{}
tests := []struct {
name string
args args
res *SearchQueryFactory
}{}
name string
fields fields
args args
res res
}{
{
name: "no event type",
args: args{
eventType: "",
mapper: testPushMapper,
},
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
res: res{
isErr: errors.IsErrorInvalidArgument,
},
},
{
name: "no event mapper",
args: args{
eventType: "event.type",
mapper: nil,
},
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
res: res{
isErr: errors.IsErrorInvalidArgument,
},
},
{
name: "new interceptor",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
args: args{
eventType: "new.event",
mapper: testPushMapper,
},
res: res{
event: &repository.Event{AggregateID: "aggregateID"},
},
},
{
name: "existing interceptor new push mapper",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"existing": {},
},
},
args: args{
eventType: "new.event",
mapper: testPushMapper,
},
res: res{
event: &repository.Event{AggregateID: "aggregateID"},
},
},
{
name: "existing interceptor existing push mapper",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"existing": {
pushMapper: func(Event) (*repository.Event, error) {
return nil, errors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented")
},
},
},
},
args: args{
eventType: "new.event",
mapper: testPushMapper,
},
res: res{
event: &repository.Event{AggregateID: "aggregateID"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// factory := NewSearchQueryFactory(tt.args.aggregateTypes...)
// for _, setter := range tt.args.setters {
// factory = setter(factory)
// }
// if !reflect.DeepEqual(factory, tt.res) {
// t.Errorf("NewSearchQueryFactory() = %v, want %v", factory, tt.res)
// }
es := &Eventstore{
eventMapper: tt.fields.eventMapper,
}
err := es.RegisterPushEventMapper(tt.args.eventType, tt.args.mapper)
if (tt.res.isErr != nil && !tt.res.isErr(err)) || (tt.res.isErr == nil && err != nil) {
t.Errorf("wrong error got: %v", err)
return
}
if tt.res.isErr != nil {
return
}
mapper := es.eventMapper[tt.args.eventType]
event, err := mapper.pushMapper(nil)
if err != nil {
t.Errorf("unexpected error %v", err)
}
if !reflect.DeepEqual(tt.res.event, event) {
t.Errorf("events should be deep equal. \ngot %v\nwant %v", event, tt.res.event)
}
})
}
}
func testFilterMapper(*repository.Event) (Event, error) {
return &testEvent{description: "hodor"}, nil
}
func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
type fields struct {
eventMapper map[EventType]eventTypeInterceptors
}
type args struct {
eventType EventType
mapper func(*repository.Event) (Event, error)
}
type res struct {
event Event
isErr func(error) bool
}
tests := []struct {
name string
fields fields
args args
res res
}{
{
name: "no event type",
args: args{
eventType: "",
mapper: testFilterMapper,
},
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
res: res{
isErr: errors.IsErrorInvalidArgument,
},
},
{
name: "no event mapper",
args: args{
eventType: "event.type",
mapper: nil,
},
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
res: res{
isErr: errors.IsErrorInvalidArgument,
},
},
{
name: "new interceptor",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{},
},
args: args{
eventType: "event.type",
mapper: testFilterMapper,
},
res: res{
event: &testEvent{description: "hodor"},
},
},
{
name: "existing interceptor new filter mapper",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"event.type": {},
},
},
args: args{
eventType: "new.event",
mapper: testFilterMapper,
},
res: res{
event: &testEvent{description: "hodor"},
},
},
{
name: "existing interceptor existing filter mapper",
fields: fields{
eventMapper: map[EventType]eventTypeInterceptors{
"event.type": {
filterMapper: func(*repository.Event) (Event, error) {
return nil, errors.ThrowUnimplemented(nil, "V2-1qPvn", "unimplemented")
},
},
},
},
args: args{
eventType: "new.event",
mapper: testFilterMapper,
},
res: res{
event: &testEvent{description: "hodor"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
es := &Eventstore{
eventMapper: tt.fields.eventMapper,
}
err := es.RegisterFilterEventMapper(tt.args.eventType, tt.args.mapper)
if (tt.res.isErr != nil && !tt.res.isErr(err)) || (tt.res.isErr == nil && err != nil) {
t.Errorf("wrong error got: %v", err)
return
}
if tt.res.isErr != nil {
return
}
mapper := es.eventMapper[tt.args.eventType]
event, err := mapper.filterMapper(nil)
if err != nil {
t.Errorf("unexpected error %v", err)
}
if !reflect.DeepEqual(tt.res.event, event) {
t.Errorf("events should be deep equal. \ngot %v\nwant %v", event, tt.res.event)
}
})
}
}

View File

@@ -1,3 +0,0 @@
package eventstore
type ReadModel struct{}

View File

@@ -0,0 +1,22 @@
package repository
//AggregateType is the object name
type AggregateType string
// //Aggregate represents an object
// type Aggregate struct {
// //ID id is the unique identifier of the aggregate
// // the client must generate it by it's own
// ID string
// //Type describes the meaning of this aggregate
// // it could an object like user
// Type AggregateType
// //ResourceOwner is the organisation which owns this aggregate
// // an aggregate can only be managed by one organisation
// // use the ID of the org
// ResourceOwner string
// //Events describe all the changes made on an aggregate
// Events []*Event
// }

View File

@@ -1,6 +1,8 @@
package eventstore
package repository
import "time"
import (
"time"
)
//Event represents all information about a manipulation of an aggregate
type Event struct {
@@ -13,6 +15,10 @@ type Event struct {
// if it's 0 then it's the first event of this aggregate
PreviousSequence uint64
//PreviousEvent is needed in push to update PreviousSequence
// it implements a linked list
PreviousEvent *Event
//CreationDate is the time the event is created
// it's used for human readability.
// Don't use it for event ordering,

View File

@@ -0,0 +1,82 @@
package repository
import "context"
type InMemory struct {
events []*Event
}
func (repo *InMemory) Health(ctx context.Context) error { return nil }
// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
func (repo *InMemory) Push(ctx context.Context, events ...*Event) error {
repo.events = append(repo.events, events...)
return nil
}
// Filter returns all events matching the given search query
func (repo *InMemory) Filter(ctx context.Context, searchQuery *SearchQuery) (events []*Event, err error) {
indexes := repo.filter(searchQuery)
events = make([]*Event, len(indexes))
for i, index := range indexes {
events[i] = repo.events[index]
}
return events, nil
}
func (repo *InMemory) filter(query *SearchQuery) []int {
foundIndex := make([]int, 0, query.Limit)
events:
for i, event := range repo.events {
if query.Limit > 0 && uint64(len(foundIndex)) < query.Limit {
return foundIndex
}
for _, filter := range query.Filters {
var value interface{}
switch filter.field {
case Field_AggregateID:
value = event.AggregateID
case Field_EditorService:
value = event.EditorService
case Field_EventType:
value = event.Type
case Field_AggregateType:
value = event.AggregateType
case Field_EditorUser:
value = event.EditorUser
case Field_ResourceOwner:
value = event.ResourceOwner
case Field_LatestSequence:
value = event.Sequence
}
switch filter.operation {
case Operation_Equals:
if filter.value == value {
foundIndex = append(foundIndex, i)
}
case Operation_Greater:
fallthrough
case Operation_Less:
return nil
case Operation_In:
values := filter.Value().([]interface{})
for _, val := range values {
if val == value {
foundIndex = append(foundIndex, i)
continue events
}
}
}
}
}
return foundIndex
}
//LatestSequence returns the latests sequence found by the the search query
func (repo *InMemory) LatestSequence(ctx context.Context, queryFactory *SearchQuery) (uint64, error) {
return 0, nil
}

View File

@@ -0,0 +1,17 @@
package repository
import (
"context"
)
type Repository interface {
Health(ctx context.Context) error
// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
Push(ctx context.Context, events ...*Event) error
// Filter returns all events matching the given search query
Filter(ctx context.Context, searchQuery *SearchQuery) (events []*Event, err error)
//LatestSequence returns the latests sequence found by the the search query
LatestSequence(ctx context.Context, queryFactory *SearchQuery) (uint64, error)
}

View File

@@ -1,7 +1,21 @@
package eventstore
package repository
import (
"github.com/caos/zitadel/internal/errors"
import "github.com/caos/zitadel/internal/errors"
type SearchQuery struct {
Columns Columns
Limit uint64
Desc bool
Filters []*Filter
}
type Columns int32
const (
Columns_Event = iota
Columns_Max_Sequence
//insert new columns-types above this ColumnsCount because count is needed for validation
ColumnsCount
)
type Filter struct {
@@ -52,16 +66,16 @@ func (f *Filter) Value() interface{} {
func (f *Filter) Validate() error {
if f == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-z6KcG", "filter is nil")
return errors.ThrowPreconditionFailed(nil, "REPO-z6KcG", "filter is nil")
}
if f.field <= 0 {
return errors.ThrowPreconditionFailed(nil, "MODEL-zw62U", "field not definded")
return errors.ThrowPreconditionFailed(nil, "REPO-zw62U", "field not definded")
}
if f.value == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-GJ9ct", "no value definded")
return errors.ThrowPreconditionFailed(nil, "REPO-GJ9ct", "no value definded")
}
if f.operation <= 0 {
return errors.ThrowPreconditionFailed(nil, "MODEL-RrQTy", "operation not definded")
return errors.ThrowPreconditionFailed(nil, "REPO-RrQTy", "operation not definded")
}
return nil
}

View File

@@ -1,4 +1,4 @@
package eventstore
package repository
import (
"reflect"

View File

@@ -0,0 +1,18 @@
package repository
import (
"regexp"
"github.com/caos/zitadel/internal/errors"
)
var versionRegexp = regexp.MustCompile(`^v[0-9]+(\.[0-9]+){0,2}$`)
type Version string
func (v Version) Validate() error {
if !versionRegexp.MatchString(string(v)) {
return errors.ThrowPreconditionFailed(nil, "MODEL-luDuS", "version is not semver")
}
return nil
}

View File

@@ -1,4 +1,4 @@
package eventstore
package repository
import "testing"

View File

@@ -2,10 +2,11 @@ package eventstore
import (
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
type SearchQueryFactory struct {
columns Columns
columns repository.Columns
limit uint64
desc bool
aggregateTypes []AggregateType
@@ -15,22 +16,16 @@ type SearchQueryFactory struct {
resourceOwner string
}
type searchQuery struct {
Columns Columns
Limit uint64
Desc bool
Filters []*Filter
}
type Columns int32
type Columns repository.Columns
const (
Columns_Event = iota
Columns_Max_Sequence
//insert new columns-types above this columnsCount because count is needed for validation
columnsCount
Columns_Event Columns = repository.Columns_Event
Columns_Max_Sequence Columns = repository.Columns_Max_Sequence
)
type AggregateType repository.AggregateType
type EventType repository.EventType
func NewSearchQueryFactory(aggregateTypes ...AggregateType) *SearchQueryFactory {
return &SearchQueryFactory{
aggregateTypes: aggregateTypes,
@@ -38,7 +33,7 @@ func NewSearchQueryFactory(aggregateTypes ...AggregateType) *SearchQueryFactory
}
func (factory *SearchQueryFactory) Columns(columns Columns) *SearchQueryFactory {
factory.columns = columns
factory.columns = repository.Columns(columns)
return factory
}
@@ -77,17 +72,17 @@ func (factory *SearchQueryFactory) OrderAsc() *SearchQueryFactory {
return factory
}
func (factory *SearchQueryFactory) Build() (*searchQuery, error) {
func (factory *SearchQueryFactory) Build() (*repository.SearchQuery, error) {
if factory == nil ||
len(factory.aggregateTypes) < 1 ||
(factory.columns < 0 || factory.columns >= columnsCount) {
(factory.columns < 0 || factory.columns >= repository.ColumnsCount) {
return nil, errors.ThrowPreconditionFailed(nil, "MODEL-tGAD3", "factory invalid")
}
filters := []*Filter{
filters := []*repository.Filter{
factory.aggregateTypeFilter(),
}
for _, f := range []func() *Filter{
for _, f := range []func() *repository.Filter{
factory.aggregateIDFilter,
factory.eventSequenceFilter,
factory.eventTypeFilter,
@@ -98,55 +93,55 @@ func (factory *SearchQueryFactory) Build() (*searchQuery, error) {
}
}
return &searchQuery{
Columns: factory.columns,
return &repository.SearchQuery{
Columns: repository.Columns(factory.columns),
Limit: factory.limit,
Desc: factory.desc,
Filters: filters,
}, nil
}
func (factory *SearchQueryFactory) aggregateIDFilter() *Filter {
func (factory *SearchQueryFactory) aggregateIDFilter() *repository.Filter {
if len(factory.aggregateIDs) < 1 {
return nil
}
if len(factory.aggregateIDs) == 1 {
return NewFilter(Field_AggregateID, factory.aggregateIDs[0], Operation_Equals)
return repository.NewFilter(repository.Field_AggregateID, factory.aggregateIDs[0], repository.Operation_Equals)
}
return NewFilter(Field_AggregateID, factory.aggregateIDs, Operation_In)
return repository.NewFilter(repository.Field_AggregateID, factory.aggregateIDs, repository.Operation_In)
}
func (factory *SearchQueryFactory) eventTypeFilter() *Filter {
func (factory *SearchQueryFactory) eventTypeFilter() *repository.Filter {
if len(factory.eventTypes) < 1 {
return nil
}
if len(factory.eventTypes) == 1 {
return NewFilter(Field_EventType, factory.eventTypes[0], Operation_Equals)
return repository.NewFilter(repository.Field_EventType, factory.eventTypes[0], repository.Operation_Equals)
}
return NewFilter(Field_EventType, factory.eventTypes, Operation_In)
return repository.NewFilter(repository.Field_EventType, factory.eventTypes, repository.Operation_In)
}
func (factory *SearchQueryFactory) aggregateTypeFilter() *Filter {
func (factory *SearchQueryFactory) aggregateTypeFilter() *repository.Filter {
if len(factory.aggregateTypes) == 1 {
return NewFilter(Field_AggregateType, factory.aggregateTypes[0], Operation_Equals)
return repository.NewFilter(repository.Field_AggregateType, factory.aggregateTypes[0], repository.Operation_Equals)
}
return NewFilter(Field_AggregateType, factory.aggregateTypes, Operation_In)
return repository.NewFilter(repository.Field_AggregateType, factory.aggregateTypes, repository.Operation_In)
}
func (factory *SearchQueryFactory) eventSequenceFilter() *Filter {
func (factory *SearchQueryFactory) eventSequenceFilter() *repository.Filter {
if factory.eventSequence == 0 {
return nil
}
sortOrder := Operation_Greater
sortOrder := repository.Operation_Greater
if factory.desc {
sortOrder = Operation_Less
sortOrder = repository.Operation_Less
}
return NewFilter(Field_LatestSequence, factory.eventSequence, sortOrder)
return repository.NewFilter(repository.Field_LatestSequence, factory.eventSequence, sortOrder)
}
func (factory *SearchQueryFactory) resourceOwnerFilter() *Filter {
func (factory *SearchQueryFactory) resourceOwnerFilter() *repository.Filter {
if factory.resourceOwner == "" {
return nil
}
return NewFilter(Field_ResourceOwner, factory.resourceOwner, Operation_Equals)
return repository.NewFilter(repository.Field_ResourceOwner, factory.resourceOwner, repository.Operation_Equals)
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
func testSetColumns(columns Columns) func(factory *SearchQueryFactory) *SearchQueryFactory {
@@ -82,10 +83,10 @@ func TestSearchQueryFactorySetters(t *testing.T) {
{
name: "set columns",
args: args{
setters: []func(*SearchQueryFactory) *SearchQueryFactory{testSetColumns(Columns_Max_Sequence)},
setters: []func(*SearchQueryFactory) *SearchQueryFactory{testSetColumns(repository.Columns_Max_Sequence)},
},
res: &SearchQueryFactory{
columns: Columns_Max_Sequence,
columns: repository.Columns_Max_Sequence,
},
},
{
@@ -166,7 +167,7 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
}
type res struct {
isErr func(err error) bool
query *searchQuery
query *repository.SearchQuery
}
tests := []struct {
name string
@@ -201,7 +202,7 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
args: args{
aggregateTypes: []AggregateType{"user"},
setters: []func(*SearchQueryFactory) *SearchQueryFactory{
testSetColumns(columnsCount),
testSetColumns(repository.ColumnsCount),
},
},
res: res{
@@ -216,12 +217,12 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
},
},
},
@@ -234,12 +235,12 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, []AggregateType{"user", "org"}, Operation_In),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, []AggregateType{"user", "org"}, repository.Operation_In),
},
},
},
@@ -256,13 +257,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: true,
Limit: 5,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_LatestSequence, uint64(100), Operation_Less),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_LatestSequence, uint64(100), repository.Operation_Less),
},
},
},
@@ -279,13 +280,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 5,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_LatestSequence, uint64(100), Operation_Greater),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_LatestSequence, uint64(100), repository.Operation_Greater),
},
},
},
@@ -298,18 +299,18 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
testSetLimit(5),
testSetSortOrder(false),
testSetSequence(100),
testSetColumns(Columns_Max_Sequence),
testSetColumns(repository.Columns_Max_Sequence),
},
},
res: res{
isErr: nil,
query: &searchQuery{
Columns: Columns_Max_Sequence,
query: &repository.SearchQuery{
Columns: repository.Columns_Max_Sequence,
Desc: true,
Limit: 5,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_LatestSequence, uint64(100), Operation_Less),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_LatestSequence, uint64(100), repository.Operation_Less),
},
},
},
@@ -324,13 +325,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_AggregateID, "1234", Operation_Equals),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_AggregateID, "1234", repository.Operation_Equals),
},
},
},
@@ -345,13 +346,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_AggregateID, []string{"1234", "0815"}, Operation_In),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_AggregateID, []string{"1234", "0815"}, repository.Operation_In),
},
},
},
@@ -366,13 +367,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_LatestSequence, uint64(8), Operation_Greater),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_LatestSequence, uint64(8), repository.Operation_Greater),
},
},
},
@@ -387,13 +388,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_EventType, EventType("user.created"), Operation_Equals),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_EventType, EventType("user.created"), repository.Operation_Equals),
},
},
},
@@ -408,13 +409,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_EventType, []EventType{"user.created", "user.changed"}, Operation_In),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_EventType, []EventType{"user.created", "user.changed"}, repository.Operation_In),
},
},
},
@@ -429,13 +430,13 @@ func TestSearchQueryFactoryBuild(t *testing.T) {
},
res: res{
isErr: nil,
query: &searchQuery{
query: &repository.SearchQuery{
Columns: 0,
Desc: false,
Limit: 0,
Filters: []*Filter{
NewFilter(Field_AggregateType, AggregateType("user"), Operation_Equals),
NewFilter(Field_ResourceOwner, "hodor", Operation_Equals),
Filters: []*repository.Filter{
repository.NewFilter(repository.Field_AggregateType, AggregateType("user"), repository.Operation_Equals),
repository.NewFilter(repository.Field_ResourceOwner, "hodor", repository.Operation_Equals),
},
},
},

View File

@@ -1,18 +1,7 @@
package eventstore
import (
"regexp"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
var versionRegexp = regexp.MustCompile(`^v[0-9]+(\.[0-9]+){0,2}$`)
type Version string
func (v Version) Validate() error {
if !versionRegexp.MatchString(string(v)) {
return errors.ThrowPreconditionFailed(nil, "MODEL-luDuS", "version is not semver")
}
return nil
}
type Version repository.Version