Project commands (#26)

* feat: eventstore repository

* fix: remove gorm

* version

* feat: pkg

* feat: add some files for project

* feat: eventstore without eventstore-lib

* rename files

* gnueg

* fix: key json

* fix: add object

* fix: change imports

* fix: internal models

* fix: some imports

* fix: global model

* fix: add some functions on repo

* feat(eventstore): sdk

* fix(eventstore): search query

* fix(eventstore): rename app to eventstore

* delete empty test

* remove unused func

* merge master

* fix(eventstore): tests

* fix(models): delete unused struct

* fix: some funcitons

* feat(eventstore): implemented push events

* fix: move project eventstore to project package

* fix: change project eventstore funcs

* feat(eventstore): overwrite context data

* fix: change project eventstore

* fix: add project repo to mgmt server

* feat(types): SQL-config

* fix: commented code

* feat(eventstore): options to overwrite editor

* feat: auth interceptor and cockroach migrations

* fix: migrations

* fix: fix filter

* fix: not found on getbyid

* fix: add sequence

* fix: add some tests

* fix(eventstore): nullable sequence

* fix: add some tests

* merge

* fix: add some tests

* fix(migrations): correct statements for sequence

* fix: add some tests

* fix: add some tests

* fix: changes from mr

* Update internal/eventstore/models/field.go

Co-Authored-By: livio-a <livio.a@gmail.com>

* fix(eventstore): code quality

* fix: add types to aggregate/Event-types

* fix(eventstore): rename modifier* to editor*

* fix(eventstore): delete editor_org

* fix(migrations): remove editor_org field,
rename modifier_* to editor_*

* fix: generate files

* fix(eventstore): tests

* fix(eventstore): rename modifier to editor

* fix(migrations): add cluster migration,
fix(migrations): fix typo of host in clean clsuter

* fix(eventstore): move health

* fix(eventstore): AggregateTypeFilter aggregateType as param

* code quality

* feat: start implementing project members

* feat: remove member funcs

* feat: remove member model

* feat: remove member events

* feat: remove member repo model

* fix: better error func testing

* Update docs/local.md

Co-Authored-By: Silvan <silvan.reusser@gmail.com>

* Update docs/local.md

Co-Authored-By: Silvan <silvan.reusser@gmail.com>

* fix: mr requests

* fix: md file

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
Co-authored-by: livio-a <livio.a@gmail.com>
This commit is contained in:
Fabi
2020-04-07 13:23:04 +02:00
committed by GitHub
parent 007fc9e9bd
commit c07ed83c41
61 changed files with 5259 additions and 3481 deletions

View File

@@ -57,3 +57,7 @@ func (es *eventstore) FilterEvents(ctx context.Context, searchQuery *models.Sear
}
return es.repo.Filter(ctx, searchQuery)
}
func (es *eventstore) Health(ctx context.Context) error {
return es.repo.Health(ctx)
}

View File

@@ -1,9 +0,0 @@
package eventstore
import (
"context"
)
func (app *eventstore) Health(ctx context.Context) error {
return app.repo.Health(ctx)
}

View File

@@ -24,11 +24,11 @@ func (m *MockRepository) ExpectFilterFail(query *models.SearchQuery, err error)
}
func (m *MockRepository) ExpectPush(aggregates ...*models.Aggregate) *MockRepository {
m.EXPECT().PushEvents(context.Background(), aggregates).Return(nil).MaxTimes(1)
m.EXPECT().PushAggregates(context.Background(), aggregates).Return(nil).MaxTimes(1)
return m
}
func (m *MockRepository) ExpectPushError(err error, aggregates ...*models.Aggregate) *MockRepository {
m.EXPECT().PushEvents(context.Background(), aggregates).Return(err).MaxTimes(1)
m.EXPECT().PushAggregates(context.Background(), aggregates).Return(err).MaxTimes(1)
return m
}

View File

@@ -81,17 +81,3 @@ func (mr *MockRepositoryMockRecorder) PushAggregates(arg0 interface{}, arg1 ...i
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushAggregates", reflect.TypeOf((*MockRepository)(nil).PushAggregates), varargs...)
}
// PushEvents mocks base method
func (m *MockRepository) PushEvents(arg0 context.Context, arg1 [][]*models.Event) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PushEvents", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// PushEvents indicates an expected call of PushEvents
func (mr *MockRepositoryMockRecorder) PushEvents(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PushEvents", reflect.TypeOf((*MockRepository)(nil).PushEvents), arg0, arg1)
}

View File

@@ -17,10 +17,11 @@ const (
)
var (
eventColumns = []string{"id", "creation_date", "event_type", "event_sequence", "previous_sequence", "event_data", "editor_service", "editor_user", "resource_owner", "aggregate_type", "aggregate_id", "aggregate_version"}
expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence LIMIT \$1`).String()
expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence DESC`).String()
expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` WHERE aggregate_id = \$1 ORDER BY event_sequence LIMIT \$2`).String()
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` WHERE aggregate_id = \$1 AND aggregate_type IN \(\$2\) ORDER BY event_sequence LIMIT \$3`).String()
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` WHERE aggregate_id = \$1 AND aggregate_type = ANY\(\$2\) ORDER BY event_sequence LIMIT \$3`).String()
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` ORDER BY event_sequence`).String()
expectedInsertStatement = regexp.MustCompile(`insert into eventstore\.events ` +
@@ -31,8 +32,8 @@ var (
`ELSE NULL ` +
`end ` +
`where \(` +
`\(select count\(id\) from eventstore\.events where event_sequence >= \$14 AND aggregate_type = \$15 AND aggregate_id = \$16\) = 1 OR ` +
`\(\(select count\(id\) from eventstore\.events where aggregate_type = \$17 and aggregate_id = \$18\) = 0 AND \$19 = 0\)\) RETURNING id, event_sequence, creation_date`).String()
`\(select count\(id\) from eventstore\.events where event_sequence >= COALESCE\(\$14, 0\) AND aggregate_type = \$15 AND aggregate_id = \$16\) = 1 OR ` +
`\(\(select count\(id\) from eventstore\.events where aggregate_type = \$17 and aggregate_id = \$18\) = 0 AND COALESCE\(\$19, 0\) = 0\)\) RETURNING id, event_sequence, creation_date`).String()
)
type dbMock struct {
@@ -107,8 +108,8 @@ func (db *dbMock) expectInsertEvent(e *models.Event, returnedID string, returned
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), e.Data, e.EditorUser, e.EditorService, e.ResourceOwner,
e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID,
e.PreviousSequence, e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID, e.PreviousSequence,
Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence),
).
WillReturnRows(
sqlmock.NewRows([]string{"id", "event_sequence", "creation_date"}).
@@ -124,8 +125,8 @@ func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock {
e.Type, e.AggregateType, e.AggregateID, e.AggregateVersion, sqlmock.AnyArg(), e.Data, e.EditorUser, e.EditorService, e.ResourceOwner,
e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID,
e.PreviousSequence, e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID, e.PreviousSequence,
Sequence(e.PreviousSequence), e.AggregateType, e.AggregateID,
e.AggregateType, e.AggregateID, Sequence(e.PreviousSequence),
).
WillReturnError(sql.ErrTxDone)
@@ -133,9 +134,9 @@ func (db *dbMock) expectInsertEventError(e *models.Event) *dbMock {
}
func (db *dbMock) expectFilterEventsLimit(limit uint64, eventCount int) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
rows := sqlmock.NewRows(eventColumns)
for i := 0; i < eventCount; i++ {
rows.AddRow(fmt.Sprint("event", i), time.Now())
rows.AddRow(fmt.Sprint("event", i), time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0")
}
db.mock.ExpectQuery(expectedFilterEventsLimitFormat).
WithArgs(limit).
@@ -144,9 +145,9 @@ func (db *dbMock) expectFilterEventsLimit(limit uint64, eventCount int) *dbMock
}
func (db *dbMock) expectFilterEventsDesc(eventCount int) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
rows := sqlmock.NewRows(eventColumns)
for i := eventCount; i > 0; i-- {
rows.AddRow(fmt.Sprint("event", i), time.Now())
rows.AddRow(fmt.Sprint("event", i), time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0")
}
db.mock.ExpectQuery(expectedFilterEventsDescFormat).
WillReturnRows(rows)
@@ -154,9 +155,9 @@ func (db *dbMock) expectFilterEventsDesc(eventCount int) *dbMock {
}
func (db *dbMock) expectFilterEventsAggregateIDLimit(aggregateID string, limit uint64) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
rows := sqlmock.NewRows(eventColumns)
for i := limit; i > 0; i-- {
rows.AddRow(fmt.Sprint("event", i), time.Now())
rows.AddRow(fmt.Sprint("event", i), time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0")
}
db.mock.ExpectQuery(expectedFilterEventsAggregateIDLimit).
WithArgs(aggregateID, limit).
@@ -165,9 +166,9 @@ func (db *dbMock) expectFilterEventsAggregateIDLimit(aggregateID string, limit u
}
func (db *dbMock) expectFilterEventsAggregateIDTypeLimit(aggregateID, aggregateType string, limit uint64) *dbMock {
rows := sqlmock.NewRows([]string{"id", "creation_date"})
rows := sqlmock.NewRows(eventColumns)
for i := limit; i > 0; i-- {
rows.AddRow(fmt.Sprint("event", i), time.Now())
rows.AddRow(fmt.Sprint("event", i), time.Now(), "eventType", Sequence(i+1), Sequence(i), nil, "svc", "hodor", "org", "aggType", "aggID", "v1.0.0")
}
db.mock.ExpectQuery(expectedFilterEventsAggregateIDTypeLimit).
WithArgs(aggregateID, pq.Array([]string{aggregateType}), limit).

View File

@@ -57,14 +57,14 @@ func (db *SQL) Filter(ctx context.Context, searchQuery *es_models.SearchQuery) (
for rows.Next() {
event := new(models.Event)
events = append(events, event)
var previousSequence Sequence
rows.Scan(
err = rows.Scan(
&event.ID,
&event.CreationDate,
&event.Type,
&event.Sequence,
&event.PreviousSequence,
&previousSequence,
&event.Data,
&event.EditorService,
&event.EditorUser,
@@ -73,6 +73,14 @@ func (db *SQL) Filter(ctx context.Context, searchQuery *es_models.SearchQuery) (
&event.AggregateID,
&event.AggregateVersion,
)
if err != nil {
logging.Log("SQL-wHNPo").WithError(err).Warn("unable to scan row")
return nil, errors.ThrowInternal(err, "SQL-BfZwF", "unable to scan row")
}
event.PreviousSequence = uint64(previousSequence)
events = append(events, event)
}
return events, nil
@@ -98,7 +106,7 @@ func prepareWhere(searchQuery *es_models.SearchQuery) (clause string, values []i
for i, filter := range searchQuery.Filters {
value := filter.GetValue()
switch value.(type) {
case []bool, []float64, []int64, []string, *[]bool, *[]float64, *[]int64, *[]string:
case []bool, []float64, []int64, []string, []models.AggregateType, *[]bool, *[]float64, *[]int64, *[]string, *[]models.AggregateType:
value = pq.Array(value)
}
@@ -118,7 +126,7 @@ func getCondition(filter *es_models.Filter) string {
func prepareConditionFormat(operation es_models.Operation) string {
if operation == es_models.Operation_In {
return "%s %s (?)"
return "%s %s ANY(?)"
}
return "%s %s ?"
}
@@ -133,9 +141,9 @@ func getField(field es_models.Field) string {
return "event_sequence"
case es_models.Field_ResourceOwner:
return "resource_owner"
case es_models.Field_ModifierService:
case es_models.Field_EditorService:
return "editor_service"
case es_models.Field_ModifierUser:
case es_models.Field_EditorUser:
return "editor_user"
}
return ""
@@ -143,14 +151,12 @@ func getField(field es_models.Field) string {
func getOperation(operation es_models.Operation) string {
switch operation {
case es_models.Operation_Equals:
case es_models.Operation_Equals, es_models.Operation_In:
return "="
case es_models.Operation_Greater:
return ">"
case es_models.Operation_Less:
return "<"
case es_models.Operation_In:
return "IN"
}
return ""
}

View File

@@ -105,7 +105,7 @@ func TestSQL_Filter(t *testing.T) {
}
events, err := sql.Filter(context.Background(), tt.args.searchQuery)
if (err != nil) != tt.wantErr {
t.Errorf("SQL.UnlockAggregates() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("SQL.Filter() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.eventsLen != 0 && len(events) != tt.eventsLen {
t.Errorf("events has wrong length got: %d want %d", len(events), tt.eventsLen)
@@ -142,7 +142,7 @@ func Test_getCondition(t *testing.T) {
args: args{
filter: es_models.NewFilter(es_models.Field_AggregateType, []string{"a", "b"}, es_models.Operation_In),
},
want: "aggregate_type IN (?)",
want: "aggregate_type = ANY(?)",
},
}
for _, tt := range tests {

View File

@@ -20,9 +20,9 @@ const insertStmt = "insert into eventstore.events " +
"end " +
"where (" +
// exactly one event of requested aggregate must have a >= sequence (last inserted event)
"(select count(id) from eventstore.events where event_sequence >= $14 AND aggregate_type = $15 AND aggregate_id = $16) = 1 OR " +
"(select count(id) from eventstore.events where event_sequence >= COALESCE($14, 0) AND aggregate_type = $15 AND aggregate_id = $16) = 1 OR " +
// previous sequence = 0, no events must exist for the requested aggregate
"((select count(id) from eventstore.events where aggregate_type = $17 and aggregate_id = $18) = 0 AND $19 = 0)) " +
"((select count(id) from eventstore.events where aggregate_type = $17 and aggregate_id = $18) = 0 AND COALESCE($19, 0) = 0)) " +
"RETURNING id, event_sequence, creation_date"
func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) (err error) {
@@ -52,10 +52,8 @@ func (db *SQL) PushAggregates(ctx context.Context, aggregates ...*models.Aggrega
}
func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
previousSequence := events[0].PreviousSequence
currentSequence := Sequence(events[0].PreviousSequence)
for _, event := range events {
event.PreviousSequence = previousSequence
if event.Data == nil || len(event.Data) == 0 {
//json decoder failes with EOF if json text is empty
event.Data = []byte("{}")
@@ -64,8 +62,8 @@ func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
rows, err := stmt.Query(event.Type, event.AggregateType, event.AggregateID, event.AggregateVersion, event.CreationDate, event.Data, event.EditorUser, event.EditorService, event.ResourceOwner,
event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID,
event.PreviousSequence, event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID, event.PreviousSequence)
currentSequence, event.AggregateType, event.AggregateID,
event.AggregateType, event.AggregateID, currentSequence)
if err != nil {
logging.Log("SQL-EXA0q").WithError(err).Info("query failed")
@@ -76,7 +74,7 @@ func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
rowInserted := false
for rows.Next() {
rowInserted = true
err = rows.Scan(&event.ID, &event.Sequence, &event.CreationDate)
err = rows.Scan(&event.ID, &currentSequence, &event.CreationDate)
logging.Log("SQL-rAvLD").OnError(err).Info("unable to scan result into event")
}
@@ -84,7 +82,7 @@ func insertEvents(stmt *sql.Stmt, events []*models.Event) error {
return errors.ThrowAlreadyExists(nil, "SQL-GKcAa", "wrong sequence")
}
previousSequence = event.Sequence
event.Sequence = uint64(currentSequence)
}
return nil

View File

@@ -0,0 +1,27 @@
package sql
import (
"database/sql/driver"
)
// Sequence represents a number that may be null.
// Sequence implements the sql.Scanner interface so
type Sequence uint64
// Scan implements the Scanner interface.
func (n *Sequence) Scan(value interface{}) error {
if value == nil {
*n = 0
return nil
}
*n = Sequence(value.(int64))
return nil
}
// Value implements the driver Valuer interface.
func (seq Sequence) Value() (driver.Value, error) {
if seq == 0 {
return nil, nil
}
return int64(seq), nil
}

View File

@@ -7,6 +7,6 @@ const (
Field_AggregateID
Field_LatestSequence
Field_ResourceOwner
Field_ModifierService
Field_ModifierUser
Field_EditorService
Field_EditorUser
)

View File

@@ -33,7 +33,7 @@ func (q *SearchQuery) AggregateIDFilter(id string) *SearchQuery {
return q.setFilter(NewFilter(Field_AggregateID, id, Operation_Equals))
}
func (q *SearchQuery) AggregateTypeFilter(types ...string) *SearchQuery {
func (q *SearchQuery) AggregateTypeFilter(types ...AggregateType) *SearchQuery {
return q.setFilter(NewFilter(Field_AggregateType, types, Operation_In))
}