Eventstore (#22)

* feat: eventstore repository

* fix: remove gorm

* version

* feat: pkg

* feat: eventstore without eventstore-lib

* rename files

* gnueg

* fix: add object

* fix: global model

* feat(eventstore): sdk

* fix(eventstore): search query

* fix(eventstore): rename app to eventstore

* delete empty test

* remove unused func

* merge master

* fix(eventstore): tests

* fix(models): delete unused struct

* feat(eventstore): implemented push events

* feat(eventstore): overwrite context data

* feat(types): SQL-config

* feat(eventstore): options to overwrite editor

* Update internal/eventstore/models/field.go

Co-Authored-By: livio-a <livio.a@gmail.com>

* fix(eventstore): code quality

* fix(eventstore): rename modifier* to editor*

* fix(eventstore): delete editor_org

Co-authored-by: Fabiennne <fabienne.gerschwiler@gmail.com>
Co-authored-by: livio-a <livio.a@gmail.com>
This commit is contained in:
Silvan
2020-04-06 06:42:21 +02:00
committed by GitHub
parent d6e97ff1fc
commit fbeab4c582
43 changed files with 11050 additions and 689 deletions

View File

@@ -0,0 +1,23 @@
package eventstore
import (
"github.com/caos/zitadel/internal/eventstore/internal/repository/sql"
"github.com/caos/zitadel/internal/eventstore/models"
)
type Config struct {
Repository sql.Config
ServiceName string
}
func Start(conf Config) (Eventstore, error) {
repo, err := sql.Start(conf.Repository)
if err != nil {
return nil, err
}
return &eventstore{
repo: repo,
aggregateCreator: models.NewAggregateCreator(conf.ServiceName),
}, nil
}

View File

@@ -0,0 +1,59 @@
package eventstore
import (
"context"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/internal/repository"
"github.com/caos/zitadel/internal/eventstore/models"
)
type Eventstore interface {
AggregateCreator() *models.AggregateCreator
Health(ctx context.Context) error
PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) error
FilterEvents(ctx context.Context, searchQuery *models.SearchQuery) (events []*models.Event, err error)
}
var _ Eventstore = (*eventstore)(nil)
type eventstore struct {
repo repository.Repository
aggregateCreator *models.AggregateCreator
}
func (es *eventstore) AggregateCreator() *models.AggregateCreator {
return es.aggregateCreator
}
func (es *eventstore) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
for _, aggregate := range aggregates {
if len(aggregate.Events) == 0 {
return errors.ThrowInvalidArgument(nil, "EVENT-cNhIj", "no events in aggregate")
}
for _, event := range aggregate.Events {
if err = event.Validate(); err != nil {
return err
}
}
}
err = es.repo.PushAggregates(ctx, aggregates...)
if err != nil {
return err
}
for _, aggregate := range aggregates {
if aggregate.Appender != nil {
aggregate.Appender(aggregate.Events...)
}
}
return nil
}
func (es *eventstore) FilterEvents(ctx context.Context, searchQuery *models.SearchQuery) ([]*models.Event, error) {
if err := searchQuery.Validate(); err != nil {
return nil, err
}
return es.repo.Filter(ctx, searchQuery)
}

View File

@@ -0,0 +1,3 @@
package eventstore
//go:generate mockgen -package mock -destination ./mock/eventstore.mock.go github.com/caos/zitadel/internal/eventstore Eventstore

View File

@@ -0,0 +1,9 @@
package eventstore
import (
"context"
)
func (app *eventstore) Health(ctx context.Context) error {
return app.repo.Health(ctx)
}

View File

@@ -0,0 +1,3 @@
package repository
//go:generate mockgen -package mock -destination ./mock/repository.mock.go github.com/caos/zitadel/internal/eventstore/internal/repository Repository

View File

@@ -0,0 +1,34 @@
package mock
import (
"context"
"testing"
"github.com/caos/zitadel/internal/eventstore/models"
gomock "github.com/golang/mock/gomock"
)
func NewMock(t *testing.T) *MockRepository {
return NewMockRepository(gomock.NewController(t))
}
func (m *MockRepository) ExpectFilter(query *models.SearchQuery, eventAmount int) *MockRepository {
events := make([]*models.Event, eventAmount)
m.EXPECT().Filter(context.Background(), query).Return(events, nil).MaxTimes(1)
return m
}
func (m *MockRepository) ExpectFilterFail(query *models.SearchQuery, err error) *MockRepository {
m.EXPECT().Filter(context.Background(), query).Return(nil, err).MaxTimes(1)
return m
}
func (m *MockRepository) ExpectPush(aggregates ...*models.Aggregate) *MockRepository {
m.EXPECT().PushEvents(context.Background(), aggregates).Return(nil).MaxTimes(1)
return m
}
func (m *MockRepository) ExpectPushError(err error, aggregates ...*models.Aggregate) *MockRepository {
m.EXPECT().PushEvents(context.Background(), aggregates).Return(err).MaxTimes(1)
return m
}

View File

@@ -0,0 +1,97 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/caos/zitadel/internal/eventstore/internal/repository (interfaces: Repository)
// Package mock is a generated GoMock package.
package mock
import (
context "context"
models "github.com/caos/zitadel/internal/eventstore/models"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockRepository is a mock of Repository interface
type MockRepository struct {
ctrl *gomock.Controller
recorder *MockRepositoryMockRecorder
}
// MockRepositoryMockRecorder is the mock recorder for MockRepository
type MockRepositoryMockRecorder struct {
mock *MockRepository
}
// NewMockRepository creates a new mock instance
func NewMockRepository(ctrl *gomock.Controller) *MockRepository {
mock := &MockRepository{ctrl: ctrl}
mock.recorder = &MockRepositoryMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockRepository) EXPECT() *MockRepositoryMockRecorder {
return m.recorder
}
// Filter mocks base method
func (m *MockRepository) Filter(arg0 context.Context, arg1 *models.SearchQuery) ([]*models.Event, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Filter", arg0, arg1)
ret0, _ := ret[0].([]*models.Event)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Filter indicates an expected call of Filter
func (mr *MockRepositoryMockRecorder) Filter(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockRepository)(nil).Filter), arg0, arg1)
}
// Health mocks base method
func (m *MockRepository) Health(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Health", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Health indicates an expected call of Health
func (mr *MockRepositoryMockRecorder) Health(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Health", reflect.TypeOf((*MockRepository)(nil).Health), arg0)
}
// PushAggregates mocks base method
func (m *MockRepository) PushAggregates(arg0 context.Context, arg1 ...*models.Aggregate) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0}
for _, a := range arg1 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "PushAggregates", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// PushAggregates indicates an expected call of PushAggregates
func (mr *MockRepositoryMockRecorder) PushAggregates(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushAggregates", reflect.TypeOf((*MockRepository)(nil).PushAggregates), varargs...)
}
// PushEvents mocks base method
func (m *MockRepository) PushEvents(arg0 context.Context, arg1 [][]*models.Event) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PushEvents", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// PushEvents indicates an expected call of PushEvents
func (mr *MockRepositoryMockRecorder) PushEvents(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushEvents", reflect.TypeOf((*MockRepository)(nil).PushEvents), arg0, arg1)
}

View File

@@ -0,0 +1,17 @@
package repository
import (
"context"
"github.com/caos/zitadel/internal/eventstore/models"
)
type Repository interface {
Health(ctx context.Context) error
// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) error
// Filter returns all events matching the given search query
Filter(ctx context.Context, searchQuery *models.SearchQuery) (events []*models.Event, err error)
}

View File

@@ -0,0 +1,24 @@
package sql
import (
// postgres dialect
"database/sql"
"github.com/caos/zitadel/internal/config/types"
"github.com/caos/zitadel/internal/errors"
_ "github.com/lib/pq"
)
type Config struct {
SQL types.SQL
}
func Start(conf Config) (*SQL, error) {
client, err := sql.Open("postgres", conf.SQL.ConnectionString())
if err != nil {
return nil, errors.ThrowPreconditionFailed(err, "SQL-9qBtr", "unable to open database connection")
}
return &SQL{
client: client,
}, nil
}

View File

@@ -0,0 +1,188 @@
package sql
import (
"database/sql"
"fmt"
"regexp"
"testing"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/lib/pq"
)
const (
selectEscaped = `SELECT id, creation_date, event_type, event_sequence, previous_sequence, event_data, editor_service, editor_user, resource_owner, aggregate_type, aggregate_id, aggregate_version FROM eventstore\.events`
)
var (
expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence LIMIT \$1`).String()
expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence DESC`).String()
expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` WHERE aggregate_id = \$1 ORDER BY event_sequence LIMIT \$2`).String()
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` WHERE aggregate_id = \$1 AND aggregate_type IN \(\$2\) ORDER BY event_sequence LIMIT \$3`).String()
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence`).String()
expectedInsertStatement = regexp.MustCompile(`insert into eventstore\.events ` +
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence\) ` +
`select \$1, \$2, \$3, \$4, coalesce\(\$5, now\(\)\), \$6, \$7, \$8, \$9, ` +
`case \(select exists\(select event_sequence from eventstore\.events where aggregate_type = \$10 AND aggregate_id = \$11\)\) ` +
`WHEN true then \(select event_sequence from eventstore\.events where aggregate_type = \$12 AND aggregate_id = \$13 order by event_sequence desc limit 1\) ` +
`ELSE NULL ` +
`end ` +
`where \(` +
`\(select count\(id\) from eventstore\.events where event_sequence >= \$14 AND aggregate_type = \$15 AND aggregate_id = \$16\) = 1 OR ` +
`\(\(select count\(id\) from eventstore\.events where aggregate_type = \$17 and aggregate_id = \$18\) = 0 AND \$19 = 0\)\) RETURNING id, event_sequence, creation_date`).String()
)
type dbMock struct {
sqlClient *sql.DB
mock sqlmock.Sqlmock
}
func (db *dbMock) close() {
db.sqlClient.Close()
}
func mockDB(t *testing.T) *dbMock {
mockDB := dbMock{}
var err error
mockDB.sqlClient, mockDB.mock, err = sqlmock.New()
if err != nil {
t.Fatalf("error occured while creating stub db %v", err)
}
mockDB.mock.MatchExpectationsInOrder(true)
return &mockDB
}
func (db *dbMock) expectBegin(err error) *dbMock {
if err != nil {
db.mock.ExpectBegin().WillReturnError(err)
} else {
db.mock.ExpectBegin()
}
return db
}
func (db *dbMock) expectSavepoint() *dbMock {
db.mock.ExpectExec("SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
return db
}
func (db *dbMock) expectReleaseSavepoint(err error) *dbMock {
expectation := db.mock.ExpectExec("RELEASE SAVEPOINT")
if err == nil {
expectation.WillReturnResult(sqlmock.NewResult(1, 1))
} else {
expectation.WillReturnError(err)
}
return db
}
func (db *dbMock) expectCommit(err error) *dbMock {
if err != nil {
db.mock.ExpectCommit().WillReturnError(err)
} else {
db.mock.ExpectCommit()
}
return db
}
func (db *dbMock) expectRollback(err error) *dbMock {
if err != nil {
db.mock.ExpectRollback().WillReturnError(err)
} else {
db.mock.ExpectRollback()
}
return db
}
func (db *dbMock) expectInsertEvent(e *models.Event, returnedID string, returnedSequence uint64) *dbMock {
db.mock.ExpectQuery(expectedInsertStatement).
WithArgs(
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), e.Data, e.EditorUser, e.EditorService, e.ResourceOwner,
e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID,
e.PreviousSequence, e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID, e.PreviousSequence,
).
WillReturnRows(
sqlmock.NewRows([]string{"id", "event_sequence", "creation_date"}).
AddRow(returnedID, returnedSequence, time.Now().UTC()),
)
return db
}
func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock {
db.mock.ExpectQuery(expectedInsertStatement).
WithArgs(
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), e.Data, e.EditorUser, e.EditorService, e.ResourceOwner,
e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID,
e.PreviousSequence, e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID, e.PreviousSequence,
).
WillReturnError(sql.ErrTxDone)
return db
}
func (db *dbMock) expectFilterEventsLimit(limit uint64, eventCount int) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
for i := 0; i < eventCount; i++ {
rows.AddRow(fmt.Sprint("event", i), time.Now())
}
db.mock.ExpectQuery(expectedFilterEventsLimitFormat).
WithArgs(limit).
WillReturnRows(rows)
return db
}
func (db *dbMock) expectFilterEventsDesc(eventCount int) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
for i := eventCount; i > 0; i-- {
rows.AddRow(fmt.Sprint("event", i), time.Now())
}
db.mock.ExpectQuery(expectedFilterEventsDescFormat).
WillReturnRows(rows)
return db
}
func (db *dbMock) expectFilterEventsAggregateIDLimit(aggregateID string, limit uint64) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
for i := limit; i > 0; i-- {
rows.AddRow(fmt.Sprint("event", i), time.Now())
}
db.mock.ExpectQuery(expectedFilterEventsAggregateIDLimit).
WithArgs(aggregateID, limit).
WillReturnRows(rows)
return db
}
func (db *dbMock) expectFilterEventsAggregateIDTypeLimit(aggregateID, aggregateType string, limit uint64) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
for i := limit; i > 0; i-- {
rows.AddRow(fmt.Sprint("event", i), time.Now())
}
db.mock.ExpectQuery(expectedFilterEventsAggregateIDTypeLimit).
WithArgs(aggregateID, pq.Array([]string{aggregateType}), limit).
WillReturnRows(rows)
return db
}
func (db *dbMock) expectFilterEventsError(returnedErr error) *dbMock {
db.mock.ExpectQuery(expectedGetAllEvents).
WillReturnError(returnedErr)
return db
}
func (db *dbMock) expectPrepareInsert() *dbMock {
db.mock.ExpectPrepare(expectedInsertStatement)
return db
}

View File

@@ -0,0 +1,156 @@
package sql
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
es_models "github.com/caos/zitadel/internal/eventstore/models"
"github.com/lib/pq"
)
const (
selectStmt = "SELECT" +
" id" +
", creation_date" +
", event_type" +
", event_sequence" +
", previous_sequence" +
", event_data" +
", editor_service" +
", editor_user" +
", resource_owner" +
", aggregate_type" +
", aggregate_id" +
", aggregate_version" +
" FROM eventstore.events"
)
func (db *SQL) Filter(ctx context.Context, searchQuery *es_models.SearchQuery) (events []*models.Event, err error) {
where, values := prepareWhere(searchQuery)
query := selectStmt + where
query += " ORDER BY event_sequence"
if searchQuery.Desc {
query += " DESC"
}
if searchQuery.Limit > 0 {
values = append(values, searchQuery.Limit)
query += " LIMIT ?"
}
query = numberPlaceholder(query, "?", "$")
rows, err := db.client.Query(query, values...)
if err != nil {
logging.Log("SQL-HP3Uk").WithError(err).Info("query failed")
return nil, errors.ThrowInternal(err, "SQL-IJuyR", "unable to filter events")
}
defer rows.Close()
events = make([]*es_models.Event, 0, searchQuery.Limit)
for rows.Next() {
event := new(models.Event)
events = append(events, event)
rows.Scan(
&event.ID,
&event.CreationDate,
&event.Type,
&event.Sequence,
&event.PreviousSequence,
&event.Data,
&event.EditorService,
&event.EditorUser,
&event.ResourceOwner,
&event.AggregateType,
&event.AggregateID,
&event.AggregateVersion,
)
}
return events, nil
}
func numberPlaceholder(query, old, new string) string {
for i, hasChanged := 1, true; hasChanged; i++ {
newQuery := strings.Replace(query, old, new+strconv.Itoa(i), 1)
hasChanged = query != newQuery
query = newQuery
}
return query
}
func prepareWhere(searchQuery *es_models.SearchQuery) (clause string, values []interface{}) {
values = make([]interface{}, len(searchQuery.Filters))
clauses := make([]string, len(searchQuery.Filters))
if len(values) == 0 {
return clause, values
}
for i, filter := range searchQuery.Filters {
value := filter.GetValue()
switch value.(type) {
case []bool, []float64, []int64, []string, *[]bool, *[]float64, *[]int64, *[]string:
value = pq.Array(value)
}
clauses[i] = getCondition(filter)
values[i] = value
}
return " WHERE " + strings.Join(clauses, " AND "), values
}
func getCondition(filter *es_models.Filter) string {
field := getField(filter.GetField())
operation := getOperation(filter.GetOperation())
format := prepareConditionFormat(filter.GetOperation())
return fmt.Sprintf(format, field, operation)
}
func prepareConditionFormat(operation es_models.Operation) string {
if operation == es_models.Operation_In {
return "%s %s (?)"
}
return "%s %s ?"
}
func getField(field es_models.Field) string {
switch field {
case es_models.Field_AggregateID:
return "aggregate_id"
case es_models.Field_AggregateType:
return "aggregate_type"
case es_models.Field_LatestSequence:
return "event_sequence"
case es_models.Field_ResourceOwner:
return "resource_owner"
case es_models.Field_ModifierService:
return "editor_service"
case es_models.Field_ModifierUser:
return "editor_user"
}
return ""
}
func getOperation(operation es_models.Operation) string {
switch operation {
case es_models.Operation_Equals:
return "="
case es_models.Operation_Greater:
return ">"
case es_models.Operation_Less:
return "<"
case es_models.Operation_In:
return "IN"
}
return ""
}

View File

@@ -0,0 +1,155 @@
package sql
import (
"context"
"database/sql"
"testing"
"github.com/caos/zitadel/internal/errors"
es_models "github.com/caos/zitadel/internal/eventstore/models"
)
func TestSQL_Filter(t *testing.T) {
type fields struct {
client *dbMock
}
type args struct {
events *mockEvents
searchQuery *es_models.SearchQuery
}
tests := []struct {
name string
fields fields
args args
eventsLen int
wantErr bool
isErrFunc func(error) bool
}{
{
name: "only limit filter",
fields: fields{
client: mockDB(t).expectFilterEventsLimit(34, 3),
},
args: args{
events: &mockEvents{t: t},
searchQuery: es_models.NewSearchQuery().SetLimit(34),
},
eventsLen: 3,
wantErr: false,
},
{
name: "only desc filter",
fields: fields{
client: mockDB(t).expectFilterEventsDesc(34),
},
args: args{
events: &mockEvents{t: t},
searchQuery: es_models.NewSearchQuery().OrderDesc(),
},
eventsLen: 34,
wantErr: false,
},
{
name: "no events found",
fields: fields{
client: mockDB(t).expectFilterEventsError(sql.ErrNoRows),
},
args: args{
events: &mockEvents{t: t},
searchQuery: &es_models.SearchQuery{},
},
wantErr: true,
isErrFunc: errors.IsInternal,
},
{
name: "filter fails because sql internal error",
fields: fields{
client: mockDB(t).expectFilterEventsError(sql.ErrConnDone),
},
args: args{
events: &mockEvents{t: t},
searchQuery: &es_models.SearchQuery{},
},
wantErr: true,
isErrFunc: errors.IsInternal,
},
{
name: "filter by aggregate id",
fields: fields{
client: mockDB(t).expectFilterEventsAggregateIDLimit("hop", 5),
},
args: args{
events: &mockEvents{t: t},
searchQuery: es_models.NewSearchQuery().SetLimit(5).AggregateIDFilter("hop"),
},
wantErr: false,
isErrFunc: nil,
},
{
name: "filter by aggregate id and aggregate type",
fields: fields{
client: mockDB(t).expectFilterEventsAggregateIDTypeLimit("hop", "user", 5),
},
args: args{
events: &mockEvents{t: t},
searchQuery: es_models.NewSearchQuery().SetLimit(5).AggregateIDFilter("hop").AggregateTypeFilter("user"),
},
wantErr: false,
isErrFunc: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sql := &SQL{
client: tt.fields.client.sqlClient,
}
events, err := sql.Filter(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("SQL.UnlockAggregates() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.eventsLen != 0 && len(events) != tt.eventsLen {
t.Errorf("events has wrong length got: %d want %d", len(events), tt.eventsLen)
}
if tt.wantErr && !tt.isErrFunc(err) {
t.Errorf("got wrong error %v", err)
}
if err := tt.fields.client.mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled expectations: %s", err)
}
tt.fields.client.close()
})
}
}
func Test_getCondition(t *testing.T) {
type args struct {
filter *es_models.Filter
}
tests := []struct {
name string
args args
want string
}{
{
name: "single value",
args: args{
filter: es_models.NewFilter(es_models.Field_LatestSequence, 34, es_models.Operation_Greater),
},
want: "event_sequence > ?",
},
{
name: "list value",
args: args{
filter: es_models.NewFilter(es_models.Field_AggregateType, []string{"a", "b"}, es_models.Operation_In),
},
want: "aggregate_type IN (?)",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := getCondition(tt.args.filter); got != tt.want {
t.Errorf("getCondition() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,91 @@
package sql
import (
"context"
"database/sql"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/cockroachdb/cockroach-go/crdb"
)
const insertStmt = "insert into eventstore.events " +
"(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, previous_sequence) " +
"select $1, $2, $3, $4, coalesce($5, now()), $6, $7, $8, $9, " +
// case is to set the highest sequence or NULL in previous_sequence
"case (select exists(select event_sequence from eventstore.events where aggregate_type = $10 AND aggregate_id = $11)) " +
"WHEN true then (select event_sequence from eventstore.events where aggregate_type = $12 AND aggregate_id = $13 order by event_sequence desc limit 1) " +
"ELSE NULL " +
"end " +
"where (" +
// exactly one event of requested aggregate must have a >= sequence (last inserted event)
"(select count(id) from eventstore.events where event_sequence >= $14 AND aggregate_type = $15 AND aggregate_id = $16) = 1 OR " +
// previous sequence = 0, no events must exist for the requested aggregate
"((select count(id) from eventstore.events where aggregate_type = $17 and aggregate_id = $18) = 0 AND $19 = 0)) " +
"RETURNING id, event_sequence, creation_date"
func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
err = crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
stmt, err := tx.Prepare(insertStmt)
if err != nil {
tx.Rollback()
logging.Log("SQL-9ctx5").WithError(err).Warn("prepare failed")
return errors.ThrowInternal(err, "SQL-juCgA", "prepare failed")
}
for _, aggregate := range aggregates {
err = insertEvents(stmt, aggregate.Events)
if err != nil {
tx.Rollback()
return err
}
}
return nil
})
if _, ok := err.(*errors.CaosError); !ok && err != nil {
err = errors.ThrowInternal(err, "SQL-DjgtG", "unable to store events")
}
return err
}
func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
previousSequence := events[0].PreviousSequence
for _, event := range events {
event.PreviousSequence = previousSequence
if event.Data == nil || len(event.Data) == 0 {
//json decoder failes with EOF if json text is empty
event.Data = []byte("{}")
}
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, event.Data, event.EditorUser, event.EditorService, event.ResourceOwner,
event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID,
event.PreviousSequence, event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID, event.PreviousSequence)
if err != nil {
logging.Log("SQL-EXA0q").WithError(err).Info("query failed")
return errors.ThrowInternal(err, "SQL-SBP37", "unable to create event")
}
defer rows.Close()
rowInserted := false
for rows.Next() {
rowInserted = true
err = rows.Scan(&event.ID, &event.Sequence, &event.CreationDate)
logging.Log("SQL-rAvLD").OnError(err).Info("unable to scan result into event")
}
if !rowInserted {
return errors.ThrowAlreadyExists(nil, "SQL-GKcAa", "wrong sequence")
}
previousSequence = event.Sequence
}
return nil
}

View File

@@ -0,0 +1,323 @@
package sql
import (
"context"
"database/sql"
"reflect"
"runtime"
"testing"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/models"
)
type mockEvents struct {
events []*models.Event
t *testing.T
}
func TestSQL_PushAggregates(t *testing.T) {
type fields struct {
client *dbMock
}
type args struct {
aggregates []*models.Aggregate
}
tests := []struct {
name string
fields fields
args args
isError func(error) bool
shouldCheckEvents bool
}{
{
name: "no aggregates",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert().
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{aggregates: []*models.Aggregate{}},
shouldCheckEvents: false,
isError: noErr,
},
{
name: "no aggregates release fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert().
expectReleaseSavepoint(sql.ErrConnDone).
expectCommit(nil),
},
args: args{aggregates: []*models.Aggregate{}},
isError: errors.IsInternal,
shouldCheckEvents: false,
},
{
name: "one aggregate two events success",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert().
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Type: "eventTyp",
Data: []byte("{}"),
AggregateVersion: "v0.0.1",
},
"asdfölk-234", 45).
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc2",
EditorUser: "usr2",
ResourceOwner: "ro2",
PreviousSequence: 45,
Type: "eventTyp",
Data: []byte("{}"),
AggregateVersion: "v0.0.1",
}, "asdfölk-233", 46).
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
Events: []*models.Event{
&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc2",
EditorUser: "usr2",
ResourceOwner: "ro2",
Type: "eventTyp",
PreviousSequence: 0,
},
},
},
},
},
shouldCheckEvents: true,
isError: noErr,
},
{
name: "two aggregates one event per aggregate success",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectPrepareInsert().
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Data: []byte("{}"),
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}, "asdfölk-233", 47).
expectInsertEvent(&models.Event{
AggregateID: "aggID2",
AggregateType: "aggType2",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 40,
Data: []byte("{}"),
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}, "asdfölk-233", 48).
expectReleaseSavepoint(nil).
expectCommit(nil),
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
Events: []*models.Event{
&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
},
},
&models.Aggregate{
Events: []*models.Event{
&models.Event{
AggregateID: "aggID2",
AggregateType: "aggType2",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 40,
},
},
},
},
},
shouldCheckEvents: true,
isError: noErr,
},
{
name: "first event fails no action with second event",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectSavepoint().
expectInsertEventError(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Data: []byte("{}"),
Type: "eventTyp",
AggregateVersion: "v0.0.1",
}).
expectReleaseSavepoint(nil).
expectRollback(nil),
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
Events: []*models.Event{
&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 0,
},
},
},
},
},
isError: errors.IsInternal,
shouldCheckEvents: false,
},
{
name: "one event, release savepoint fails",
fields: fields{
client: mockDB(t).
expectBegin(nil).
expectPrepareInsert().
expectSavepoint().
expectInsertEvent(&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
PreviousSequence: 34,
Type: "eventTyp",
Data: []byte("{}"),
AggregateVersion: "v0.0.1",
}, "asdfölk-233", 47).
expectReleaseSavepoint(sql.ErrConnDone).
expectCommit(nil).
expectRollback(nil),
},
args: args{
aggregates: []*models.Aggregate{
&models.Aggregate{
Events: []*models.Event{
&models.Event{
AggregateID: "aggID",
AggregateType: "aggType",
AggregateVersion: "v0.0.1",
EditorService: "svc",
EditorUser: "usr",
ResourceOwner: "ro",
Type: "eventTyp",
PreviousSequence: 34,
},
},
},
},
},
isError: errors.IsInternal,
shouldCheckEvents: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sql := &SQL{
client: tt.fields.client.sqlClient,
}
err := sql.PushAggregates(context.Background(), tt.args.aggregates...)
if err != nil && !tt.isError(err) {
t.Errorf("wrong error type = %v, errFunc %s", err, functionName(tt.isError))
}
if !tt.shouldCheckEvents {
return
}
for _, aggregate := range tt.args.aggregates {
for _, event := range aggregate.Events {
if event.Sequence == 0 {
t.Error("sequence of returned event is not set")
}
if event.AggregateType == "" || event.AggregateID == "" {
t.Error("aggregate of event is not set")
}
}
}
if err := tt.fields.client.mock.ExpectationsWereMet(); err != nil {
t.Errorf("not all database expectaions met: %s", err)
}
})
}
}
func noErr(err error) bool {
return err == nil
}
func functionName(i interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(i).Pointer()).Name()
}

View File

@@ -0,0 +1,17 @@
package sql
import (
"context"
"database/sql"
//sql import
_ "github.com/lib/pq"
)
type SQL struct {
client *sql.DB
}
func (db *SQL) Health(ctx context.Context) error {
return db.client.Ping()
}

View File

@@ -0,0 +1,97 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/caos/zitadel/internal/eventstore (interfaces: Eventstore)
// Package mock is a generated GoMock package.
package mock
import (
context "context"
models "github.com/caos/zitadel/internal/eventstore/models"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockEventstore is a mock of Eventstore interface
type MockEventstore struct {
ctrl *gomock.Controller
recorder *MockEventstoreMockRecorder
}
// MockEventstoreMockRecorder is the mock recorder for MockEventstore
type MockEventstoreMockRecorder struct {
mock *MockEventstore
}
// NewMockEventstore creates a new mock instance
func NewMockEventstore(ctrl *gomock.Controller) *MockEventstore {
mock := &MockEventstore{ctrl: ctrl}
mock.recorder = &MockEventstoreMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockEventstore) EXPECT() *MockEventstoreMockRecorder {
return m.recorder
}
// AggregateCreator mocks base method
func (m *MockEventstore) AggregateCreator() *models.AggregateCreator {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AggregateCreator")
ret0, _ := ret[0].(*models.AggregateCreator)
return ret0
}
// AggregateCreator indicates an expected call of AggregateCreator
func (mr *MockEventstoreMockRecorder) AggregateCreator() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateCreator", reflect.TypeOf((*MockEventstore)(nil).AggregateCreator))
}
// FilterEvents mocks base method
func (m *MockEventstore) FilterEvents(arg0 context.Context, arg1 *models.SearchQuery) ([]*models.Event, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FilterEvents", arg0, arg1)
ret0, _ := ret[0].([]*models.Event)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FilterEvents indicates an expected call of FilterEvents
func (mr *MockEventstoreMockRecorder) FilterEvents(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterEvents", reflect.TypeOf((*MockEventstore)(nil).FilterEvents), arg0, arg1)
}
// Health mocks base method
func (m *MockEventstore) Health(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Health", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Health indicates an expected call of Health
func (mr *MockEventstoreMockRecorder) Health(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Health", reflect.TypeOf((*MockEventstore)(nil).Health), arg0)
}
// PushAggregates mocks base method
func (m *MockEventstore) PushAggregates(arg0 context.Context, arg1 ...*models.Aggregate) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0}
for _, a := range arg1 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "PushAggregates", varargs...)
ret0, _ := ret[0].(error)
return ret0
}
// PushAggregates indicates an expected call of PushAggregates
func (mr *MockEventstoreMockRecorder) PushAggregates(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushAggregates", reflect.TypeOf((*MockEventstore)(nil).PushAggregates), varargs...)
}

View File

@@ -0,0 +1,88 @@
package models
import (
"time"
"github.com/caos/zitadel/internal/errors"
)
type AggregateType string
func (at AggregateType) String() string {
return string(at)
}
type Aggregates []*Aggregate
type Aggregate struct {
id string
typ AggregateType
latestSequence uint64
version Version
editorService string
editorUser string
resourceOwner string
Events []*Event
Appender appender
}
type appender func(...*Event)
func (a *Aggregate) AppendEvent(typ EventType, payload interface{}) (*Aggregate, error) {
if string(typ) == "" {
return a, errors.ThrowInvalidArgument(nil, "MODEL-TGoCb", "no event type")
}
data, err := eventData(payload)
if err != nil {
return a, err
}
e := &Event{
CreationDate: time.Now(),
Data: data,
Type: typ,
PreviousSequence: a.latestSequence,
AggregateID: a.id,
AggregateType: a.typ,
AggregateVersion: a.version,
EditorService: a.editorService,
EditorUser: a.editorUser,
ResourceOwner: a.resourceOwner,
}
a.Events = append(a.Events, e)
return a, nil
}
func (a *Aggregate) Validate() error {
if a == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-yi5AC", "aggregate is nil")
}
if a.id == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-FSjKV", "id not set")
}
if string(a.typ) == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-aj4t2", "type not set")
}
if err := a.version.Validate(); err != nil {
return errors.ThrowPreconditionFailed(err, "MODEL-PupjX", "invalid version")
}
if a.editorService == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-clYbY", "editor service not set")
}
if a.editorUser == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-Xcssi", "editor user not set")
}
if a.resourceOwner == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-eBYUW", "resource owner not set")
}
return nil
}
func (a *Aggregate) SetAppender(appendFn appender) *Aggregate {
a.Appender = appendFn
return a
}

View File

@@ -0,0 +1,56 @@
package models
import (
"context"
"github.com/caos/zitadel/internal/api/auth"
)
type AggregateCreator struct {
serviceName string
}
func NewAggregateCreator(serviceName string) *AggregateCreator {
return &AggregateCreator{serviceName: serviceName}
}
type option func(*Aggregate)
func (c *AggregateCreator) NewAggregate(ctx context.Context, id string, typ AggregateType, version Version, latestSequence uint64, opts ...option) (*Aggregate, error) {
ctxData := auth.GetCtxData(ctx)
editorUser := ctxData.UserID
resourceOwner := ctxData.OrgID
aggregate := &Aggregate{
id: id,
typ: typ,
latestSequence: latestSequence,
version: version,
Events: make([]*Event, 0, 2),
editorService: c.serviceName,
editorUser: editorUser,
resourceOwner: resourceOwner,
}
for _, opt := range opts {
opt(aggregate)
}
if err := aggregate.Validate(); err != nil {
return nil, err
}
return aggregate, nil
}
func OverwriteEditorUser(userID string) func(*Aggregate) {
return func(a *Aggregate) {
a.editorUser = userID
}
}
func OverwriteResourceOwner(resourceOwner string) func(*Aggregate) {
return func(a *Aggregate) {
a.resourceOwner = resourceOwner
}
}

View File

@@ -0,0 +1,118 @@
package models
import (
"context"
"reflect"
"testing"
)
func TestAggregateCreator_NewAggregate(t *testing.T) {
type args struct {
ctx context.Context
id string
typ AggregateType
version Version
opts []option
}
tests := []struct {
name string
creator *AggregateCreator
args args
want *Aggregate
wantErr bool
}{
{
name: "no ctxdata and no options",
creator: &AggregateCreator{serviceName: "admin"},
wantErr: true,
want: nil,
args: args{
ctx: context.Background(),
id: "hodor",
typ: "user",
version: "v1.0.0",
},
},
{
name: "no id error",
creator: &AggregateCreator{serviceName: "admin"},
wantErr: true,
want: nil,
args: args{
ctx: context.Background(),
typ: "user",
version: "v1.0.0",
opts: []option{
OverwriteEditorUser("hodor"),
OverwriteResourceOwner("org"),
},
},
},
{
name: "no type error",
creator: &AggregateCreator{serviceName: "admin"},
wantErr: true,
want: nil,
args: args{
ctx: context.Background(),
id: "hodor",
version: "v1.0.0",
opts: []option{
OverwriteEditorUser("hodor"),
OverwriteResourceOwner("org"),
},
},
},
{
name: "invalid version error",
creator: &AggregateCreator{serviceName: "admin"},
wantErr: true,
want: nil,
args: args{
ctx: context.Background(),
id: "hodor",
typ: "user",
opts: []option{
OverwriteEditorUser("hodor"),
OverwriteResourceOwner("org"),
},
},
},
{
name: "create ok",
creator: &AggregateCreator{serviceName: "admin"},
wantErr: false,
want: &Aggregate{
id: "hodor",
Events: make([]*Event, 0, 2),
typ: "user",
version: "v1.0.0",
editorService: "admin",
editorUser: "hodor",
resourceOwner: "org",
},
args: args{
ctx: context.Background(),
id: "hodor",
typ: "user",
version: "v1.0.0",
opts: []option{
OverwriteEditorUser("hodor"),
OverwriteResourceOwner("org"),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.creator.NewAggregate(tt.args.ctx, tt.args.id, tt.args.typ, tt.args.version, 0, tt.args.opts...)
if (err != nil) != tt.wantErr {
t.Errorf("AggregateCreator.NewAggregate() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("AggregateCreator.NewAggregate() = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -0,0 +1,173 @@
package models
import (
"testing"
)
func TestAggregate_AppendEvent(t *testing.T) {
type fields struct {
aggregate *Aggregate
}
type args struct {
typ EventType
payload interface{}
}
tests := []struct {
name string
fields fields
args args
want *Aggregate
wantErr bool
}{
{
name: "no event type error",
fields: fields{aggregate: &Aggregate{}},
args: args{},
want: &Aggregate{},
wantErr: true,
},
{
name: "invalid payload error",
fields: fields{aggregate: &Aggregate{}},
args: args{typ: "user", payload: 134},
want: &Aggregate{},
wantErr: true,
},
{
name: "event added",
fields: fields{aggregate: &Aggregate{Events: []*Event{}}},
args: args{typ: "user.deactivated"},
want: &Aggregate{Events: []*Event{&Event{Type: "user.deactivated"}}},
wantErr: false,
},
{
name: "event added",
fields: fields{aggregate: &Aggregate{Events: []*Event{&Event{}}}},
args: args{typ: "user.deactivated"},
want: &Aggregate{Events: []*Event{&Event{}, &Event{Type: "user.deactivated"}}},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.fields.aggregate.AppendEvent(tt.args.typ, tt.args.payload)
if (err != nil) != tt.wantErr {
t.Errorf("Aggregate.AppendEvent() error = %v, wantErr %v", err, tt.wantErr)
return
}
if len(tt.fields.aggregate.Events) != len(got.Events) {
t.Errorf("events len should be %d but was %d", len(tt.fields.aggregate.Events), len(got.Events))
}
})
}
}
func TestAggregate_Validate(t *testing.T) {
type fields struct {
aggregate *Aggregate
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "aggregate nil error",
wantErr: true,
},
{
name: "aggregate empty error",
wantErr: true,
fields: fields{aggregate: &Aggregate{}},
},
{
name: "no id error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "no type error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
id: "aggID",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateID: "hodor",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "invalid version error",
wantErr: true,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateID: "hodor",
AggregateType: "user",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
{
name: "validation ok",
wantErr: false,
fields: fields{aggregate: &Aggregate{
id: "aggID",
typ: "user",
version: "v1.0.0",
editorService: "svc",
editorUser: "hodor",
resourceOwner: "org",
latestSequence: 5,
Events: []*Event{&Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.fields.aggregate.Validate(); (err != nil) != tt.wantErr {
t.Errorf("Aggregate.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View File

@@ -0,0 +1,89 @@
package models
import (
"encoding/json"
"reflect"
"time"
"github.com/caos/zitadel/internal/errors"
)
type EventType string
func (et EventType) String() string {
return string(et)
}
type Event struct {
ID string
Sequence uint64
CreationDate time.Time
Type EventType
PreviousSequence uint64
Data []byte
AggregateID string
AggregateType AggregateType
AggregateVersion Version
EditorService string
EditorUser string
ResourceOwner string
}
func eventData(i interface{}) ([]byte, error) {
switch v := i.(type) {
case []byte:
return v, nil
case map[string]interface{}:
bytes, err := json.Marshal(v)
if err != nil {
return nil, errors.ThrowInvalidArgument(err, "MODEL-s2fgE", "unable to marshal data")
}
return bytes, nil
case nil:
return nil, nil
default:
t := reflect.TypeOf(i)
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
if t.Kind() != reflect.Struct {
return nil, errors.ThrowInvalidArgument(nil, "MODEL-rjWdN", "data is not valid")
}
bytes, err := json.Marshal(v)
if err != nil {
return nil, errors.ThrowInvalidArgument(err, "MODEL-Y2OpM", "unable to marshal data")
}
return bytes, nil
}
}
func (e *Event) Validate() error {
if e == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-oEAG4", "event is nil")
}
if string(e.Type) == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-R2sB0", "type not defined")
}
if e.AggregateID == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-A6WwL", "aggregate id not set")
}
if e.AggregateType == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-EzdyK", "aggregate type not set")
}
if err := e.AggregateVersion.Validate(); err != nil {
return err
}
if e.EditorService == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-4Yqik", "editor service not set")
}
if e.EditorUser == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-L3NHO", "editor user not set")
}
if e.ResourceOwner == "" {
return errors.ThrowPreconditionFailed(nil, "MODEL-omFVT", "resource ow")
}
return nil
}

View File

@@ -0,0 +1,198 @@
package models
import (
"reflect"
"testing"
)
func Test_eventData(t *testing.T) {
type args struct {
i interface{}
}
tests := []struct {
name string
args args
want []byte
wantErr bool
}{
{
name: "from bytes",
args: args{[]byte(`{"hodor":"asdf"}`)},
want: []byte(`{"hodor":"asdf"}`),
wantErr: false,
},
{
name: "from pointer",
args: args{&struct {
Hodor string `json:"hodor"`
}{Hodor: "asdf"}},
want: []byte(`{"hodor":"asdf"}`),
wantErr: false,
},
{
name: "from struct",
args: args{struct {
Hodor string `json:"hodor"`
}{Hodor: "asdf"}},
want: []byte(`{"hodor":"asdf"}`),
wantErr: false,
},
{
name: "from map",
args: args{
map[string]interface{}{"hodor": "asdf"},
},
want: []byte(`{"hodor":"asdf"}`),
wantErr: false,
},
{
name: "from nil",
args: args{},
want: nil,
wantErr: false,
},
{
name: "invalid data",
args: args{876},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := eventData(tt.args.i)
if (err != nil) != tt.wantErr {
t.Errorf("eventData() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("eventData() = %s, want %s", got, tt.want)
}
})
}
}
func TestEvent_Validate(t *testing.T) {
type fields struct {
event *Event
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "event nil",
wantErr: true,
},
{
name: "event empty",
fields: fields{event: &Event{}},
wantErr: true,
},
{
name: "no aggregate id",
fields: fields{event: &Event{
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
wantErr: true,
},
{
name: "no aggregate type",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
wantErr: true,
},
{
name: "no aggregate version",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateType: "user",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
wantErr: true,
},
{
name: "no editor service",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
wantErr: true,
},
{
name: "no editor user",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
ResourceOwner: "org",
Type: "born",
}},
wantErr: true,
},
{
name: "no resource owner",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
Type: "born",
}},
wantErr: true,
},
{
name: "no type",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
}},
wantErr: true,
},
{
name: "all fields set",
fields: fields{event: &Event{
AggregateID: "hodor",
AggregateType: "user",
AggregateVersion: "v1.0.0",
EditorService: "management",
EditorUser: "hodor",
ResourceOwner: "org",
Type: "born",
}},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.fields.event.Validate(); (err != nil) != tt.wantErr {
t.Errorf("Event.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View File

@@ -0,0 +1,12 @@
package models
type Field int32
const (
Field_AggregateType Field = 1 + iota
Field_AggregateID
Field_LatestSequence
Field_ResourceOwner
Field_ModifierService
Field_ModifierUser
)

View File

@@ -0,0 +1,46 @@
package models
import (
"github.com/caos/zitadel/internal/errors"
)
type Filter struct {
field Field
value interface{}
operation Operation
}
//NewFilter is used in tests. Use searchQuery.*Filter() instead
func NewFilter(field Field, value interface{}, operation Operation) *Filter {
return &Filter{
field: field,
value: value,
operation: operation,
}
}
func (f *Filter) GetField() Field {
return f.field
}
func (f *Filter) GetOperation() Operation {
return f.operation
}
func (f *Filter) GetValue() interface{} {
return f.value
}
func (f *Filter) Validate() error {
if f == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-z6KcG", "filter is nil")
}
if f.field <= 0 {
return errors.ThrowPreconditionFailed(nil, "MODEL-zw62U", "field not definded")
}
if f.value == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-GJ9ct", "no value definded")
}
if f.operation <= 0 {
return errors.ThrowPreconditionFailed(nil, "MODEL-RrQTy", "operation not definded")
}
return nil
}

View File

@@ -0,0 +1,104 @@
package models
import (
"reflect"
"testing"
)
func TestNewFilter(t *testing.T) {
type args struct {
field Field
value interface{}
operation Operation
}
tests := []struct {
name string
args args
want *Filter
}{
{
name: "aggregateID equals",
args: args{
field: Field_AggregateID,
value: "hodor",
operation: Operation_Equals,
},
want: &Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewFilter(tt.args.field, tt.args.value, tt.args.operation); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewFilter() = %v, want %v", got, tt.want)
}
})
}
}
func TestFilter_Validate(t *testing.T) {
type fields struct {
field Field
value interface{}
operation Operation
isNil bool
}
tests := []struct {
name string
fields fields
wantErr bool
}{
{
name: "correct filter",
fields: fields{
field: Field_LatestSequence,
operation: Operation_Greater,
value: uint64(235),
},
wantErr: false,
},
{
name: "filter is nil",
fields: fields{isNil: true},
wantErr: true,
},
{
name: "no field error",
fields: fields{
operation: Operation_Greater,
value: uint64(235),
},
wantErr: true,
},
{
name: "no value error",
fields: fields{
field: Field_LatestSequence,
operation: Operation_Greater,
},
wantErr: true,
},
{
name: "no operation error",
fields: fields{
field: Field_LatestSequence,
value: uint64(235),
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var f *Filter
if !tt.fields.isNil {
f = &Filter{
field: tt.fields.field,
value: tt.fields.value,
operation: tt.fields.operation,
}
}
if err := f.Validate(); (err != nil) != tt.wantErr {
t.Errorf("Filter.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

View File

@@ -0,0 +1,25 @@
package models
import (
"time"
)
type ObjectRoot struct {
ID string `json:"-"`
Sequence uint64 `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
}
func (o *ObjectRoot) AppendEvent(event *Event) {
if o.ID == "" {
o.ID = event.AggregateID
}
o.ChangeDate = event.CreationDate
if event.PreviousSequence == 0 {
o.CreationDate = o.ChangeDate
}
o.Sequence = event.Sequence
}

View File

@@ -0,0 +1,81 @@
package models
import (
"testing"
"time"
)
func TestObjectRoot_AppendEvent(t *testing.T) {
type fields struct {
ID string
Sequence uint64
CreationDate time.Time
ChangeDate time.Time
}
type args struct {
event *Event
isNewRoot bool
}
tests := []struct {
name string
fields fields
args args
}{
{
"new root",
fields{},
args{
&Event{
AggregateID: "aggID",
Sequence: 34555,
CreationDate: time.Now(),
},
true,
},
},
{
"existing root",
fields{
"agg",
234,
time.Now().Add(-24 * time.Hour),
time.Now().Add(-12 * time.Hour),
},
args{
&Event{
AggregateID: "agg",
Sequence: 34555425,
CreationDate: time.Now(),
PreviousSequence: 22,
},
false,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
o := &ObjectRoot{
ID: tt.fields.ID,
Sequence: tt.fields.Sequence,
CreationDate: tt.fields.CreationDate,
ChangeDate: tt.fields.ChangeDate,
}
o.AppendEvent(tt.args.event)
if tt.args.isNewRoot {
if !o.CreationDate.Equal(tt.args.event.CreationDate) {
t.Error("creationDate should be equal to event on new root")
}
} else {
if o.CreationDate.Equal(o.ChangeDate) {
t.Error("creationDate and changedate should differ")
}
}
if o.Sequence != tt.args.event.Sequence {
t.Errorf("sequence not equal to event: event: %d root: %d", tt.args.event.Sequence, o.Sequence)
}
if !o.ChangeDate.Equal(tt.args.event.CreationDate) {
t.Errorf("changedate should be equal to event creation date: event: %v root: %v", tt.args.event.CreationDate, o.ChangeDate)
}
})
}
}

View File

@@ -0,0 +1,10 @@
package models
type Operation int32
const (
Operation_Equals Operation = 1 + iota
Operation_Greater
Operation_Less
Operation_In
)

View File

@@ -0,0 +1,74 @@
package models
import "github.com/caos/zitadel/internal/errors"
type SearchQuery struct {
Limit uint64
Desc bool
Filters []*Filter
}
func NewSearchQuery() *SearchQuery {
return &SearchQuery{
Filters: make([]*Filter, 0, 4),
}
}
func (q *SearchQuery) SetLimit(limit uint64) *SearchQuery {
q.Limit = limit
return q
}
func (q *SearchQuery) OrderDesc() *SearchQuery {
q.Desc = true
return q
}
func (q *SearchQuery) OrderAsc() *SearchQuery {
q.Desc = false
return q
}
func (q *SearchQuery) AggregateIDFilter(id string) *SearchQuery {
return q.setFilter(NewFilter(Field_AggregateID, id, Operation_Equals))
}
func (q *SearchQuery) AggregateTypeFilter(types ...string) *SearchQuery {
return q.setFilter(NewFilter(Field_AggregateType, types, Operation_In))
}
func (q *SearchQuery) LatestSequenceFilter(sequence uint64) *SearchQuery {
sortOrder := Operation_Greater
if q.Desc {
sortOrder = Operation_Less
}
return q.setFilter(NewFilter(Field_LatestSequence, sequence, sortOrder))
}
func (q *SearchQuery) ResourceOwnerFilter(resourceOwner string) *SearchQuery {
return q.setFilter(NewFilter(Field_ResourceOwner, resourceOwner, Operation_Equals))
}
func (q *SearchQuery) setFilter(filter *Filter) *SearchQuery {
for i, f := range q.Filters {
if f.field == filter.field {
q.Filters[i] = filter
return q
}
}
q.Filters = append(q.Filters, filter)
return q
}
func (q *SearchQuery) Validate() error {
if q == nil {
return errors.ThrowPreconditionFailed(nil, "MODEL-J5xQi", "search query is nil")
}
for _, filter := range q.Filters {
if err := filter.Validate(); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,59 @@
package models
import (
"reflect"
"testing"
)
func TestSearchQuery_setFilter(t *testing.T) {
type fields struct {
query *SearchQuery
}
type args struct {
filters []*Filter
}
tests := []struct {
name string
fields fields
args args
want *SearchQuery
}{
{
name: "set idFilter",
fields: fields{query: NewSearchQuery()},
args: args{filters: []*Filter{&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"}}},
want: &SearchQuery{Filters: []*Filter{&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"}}},
},
{
name: "overwrite idFilter",
fields: fields{query: NewSearchQuery()},
args: args{filters: []*Filter{
&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "hodor"},
&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "ursli"},
}},
want: &SearchQuery{Filters: []*Filter{&Filter{field: Field_AggregateID, operation: Operation_Equals, value: "ursli"}}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.fields.query
for _, filter := range tt.args.filters {
got = got.setFilter(filter)
}
for _, wantFilter := range tt.want.Filters {
found := false
for _, gotFilter := range got.Filters {
if gotFilter.field == wantFilter.field {
found = true
if !reflect.DeepEqual(wantFilter, gotFilter) {
t.Errorf("filter not as expected: want: %v got %v", wantFilter, gotFilter)
}
}
}
if !found {
t.Errorf("filter field %v not found", wantFilter.field)
}
}
})
}
}

View File

@@ -0,0 +1,22 @@
package models
import (
"regexp"
"github.com/caos/zitadel/internal/errors"
)
var versionRegexp = regexp.MustCompile(`^v[0-9]+(\.[0-9]+){0,2}$`)
type Version string
func (v Version) Validate() error {
if !versionRegexp.MatchString(string(v)) {
return errors.ThrowPreconditionFailed(nil, "MODEL-luDuS", "version is not semver")
}
return nil
}
func (v Version) String() string {
return string(v)
}

View File

@@ -0,0 +1,39 @@
package models
import "testing"
func TestVersion_Validate(t *testing.T) {
tests := []struct {
name string
v Version
wantErr bool
}{
{
"correct version",
"v1.23.23",
false,
},
{
"no v prefix",
"1.2.2",
true,
},
{
"letters in version",
"v1.as.3",
true,
},
{
"no version",
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := tt.v.Validate(); (err != nil) != tt.wantErr {
t.Errorf("Version.Validate() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}