feat: asset storage (#1696)

* feat: remove assets

* feat: minio implementation

* fix: remove assets from tests

* feat: minio implementation

* feat: Env vars

* fix: sprintf

* fix: sprintf

* Update internal/eventstore/repository/repository.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix: error handling

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi
2021-05-03 10:15:50 +02:00
committed by GitHub
parent a5c6bf5498
commit 667cc30291
110 changed files with 306 additions and 938 deletions

View File

@@ -20,8 +20,6 @@ type EventPusher interface {
// * struct which can be marshalled to json
// * pointer to struct which can be marshalled to json
Data() interface{}
//Assets contain assets in form of []byte, these will be stored to a separate table
Assets() []*Asset
//UniqueConstraints should be added for unique attributes of an event, if nil constraints will not be checked
UniqueConstraints() []*EventUniqueConstraint
}

View File

@@ -39,11 +39,11 @@ func (es *Eventstore) Health(ctx context.Context) error {
//PushEvents pushes the events in a single transaction
// an event needs at least an aggregate
func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher) ([]EventReader, error) {
events, assets, constraints, err := eventsToRepository(pushEvents)
events, constraints, err := eventsToRepository(pushEvents)
if err != nil {
return nil, err
}
err = es.repo.Push(ctx, events, assets, constraints...)
err = es.repo.Push(ctx, events, constraints...)
if err != nil {
return nil, err
}
@@ -57,12 +57,12 @@ func (es *Eventstore) PushEvents(ctx context.Context, pushEvents ...EventPusher)
return eventReaders, nil
}
func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, assets []*repository.Asset, constraints []*repository.UniqueConstraint, err error) {
func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, constraints []*repository.UniqueConstraint, err error) {
events = make([]*repository.Event, len(pushEvents))
for i, event := range pushEvents {
data, err := EventData(event)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
events[i] = &repository.Event{
AggregateID: event.Aggregate().ID,
@@ -77,12 +77,9 @@ func eventsToRepository(pushEvents []EventPusher) (events []*repository.Event, a
if len(event.UniqueConstraints()) > 0 {
constraints = append(constraints, uniqueConstraintsToRepository(event.UniqueConstraints())...)
}
if len(event.Assets()) > 0 {
assets = append(assets, assetsToRepository(event.Assets())...)
}
}
return events, assets, constraints, nil
return events, constraints, nil
}
func uniqueConstraintsToRepository(constraints []*EventUniqueConstraint) (uniqueConstraints []*repository.UniqueConstraint) {

View File

@@ -542,7 +542,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
events, _, _, err := eventsToRepository(tt.args.events)
events, _, err := eventsToRepository(tt.args.events)
if (err != nil) != tt.res.wantErr {
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
return
@@ -575,7 +575,7 @@ func (repo *testRepo) Health(ctx context.Context) error {
return nil
}
func (repo *testRepo) Push(ctx context.Context, events []*repository.Event, assets []*repository.Asset, uniqueConstraints ...*repository.UniqueConstraint) error {
func (repo *testRepo) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
if repo.err != nil {
return repo.err
}

View File

@@ -79,10 +79,10 @@ func (mr *MockRepositoryMockRecorder) LatestSequence(arg0, arg1 interface{}) *go
}
// Push mocks base method
func (m *MockRepository) Push(arg0 context.Context, arg1 []*repository.Event, arg2 []*repository.Asset, arg3 ...*repository.UniqueConstraint) error {
func (m *MockRepository) Push(arg0 context.Context, arg1 []*repository.Event, arg2 ...*repository.UniqueConstraint) error {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1, arg2}
for _, a := range arg3 {
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "Push", varargs...)
@@ -91,8 +91,8 @@ func (m *MockRepository) Push(arg0 context.Context, arg1 []*repository.Event, ar
}
// Push indicates an expected call of Push
func (mr *MockRepositoryMockRecorder) Push(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call {
func (mr *MockRepositoryMockRecorder) Push(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1, arg2}, arg3...)
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockRepository)(nil).Push), varargs...)
}

View File

@@ -24,32 +24,28 @@ func (m *MockRepository) ExpectFilterEvents(events ...*repository.Event) *MockRe
return m
}
func (m *MockRepository) ExpectPush(expectedEvents []*repository.Event, expectedAssets []*repository.Asset, expectedUniqueConstraints ...*repository.UniqueConstraint) *MockRepository {
m.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, events []*repository.Event, assets []*repository.Asset, uniqueConstraints ...*repository.UniqueConstraint) error {
func (m *MockRepository) ExpectPush(expectedEvents []*repository.Event, expectedUniqueConstraints ...*repository.UniqueConstraint) *MockRepository {
m.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
assert.Equal(m.ctrl.T, expectedEvents, events)
if expectedUniqueConstraints == nil {
expectedUniqueConstraints = []*repository.UniqueConstraint{}
}
assert.Equal(m.ctrl.T, expectedUniqueConstraints, uniqueConstraints)
assert.Equal(m.ctrl.T, expectedAssets, assets)
return nil
},
)
return m
}
func (m *MockRepository) ExpectPushFailed(err error, expectedEvents []*repository.Event, expectedAssets []*repository.Asset, expectedUniqueConstraints ...*repository.UniqueConstraint) *MockRepository {
m.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, events []*repository.Event, assets []*repository.Asset, uniqueConstraints ...*repository.UniqueConstraint) error {
func (m *MockRepository) ExpectPushFailed(err error, expectedEvents []*repository.Event, expectedUniqueConstraints ...*repository.UniqueConstraint) *MockRepository {
m.EXPECT().Push(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
assert.Equal(m.ctrl.T, expectedEvents, events)
if expectedUniqueConstraints == nil {
expectedUniqueConstraints = []*repository.UniqueConstraint{}
}
assert.Equal(m.ctrl.T, expectedUniqueConstraints, uniqueConstraints)
assert.Equal(m.ctrl.T, expectedAssets, assets)
return err
},
)

View File

@@ -9,10 +9,9 @@ type Repository interface {
//Health checks if the connection to the storage is available
Health(ctx context.Context) error
// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates.
// if assets are pushed, they will be added to the assets table only referenced by id in eventstore (history)
// if unique constraints are pushed, they will be added to the unique table for checking unique constraint violations
// This call is transaction save. The transaction will be rolled back if one event fails
Push(ctx context.Context, events []*Event, assets []*Asset, uniqueConstraints ...*UniqueConstraint) error
Push(ctx context.Context, events []*Event, uniqueConstraints ...*UniqueConstraint) error
// Filter returns all events matching the given search query
Filter(ctx context.Context, searchQuery *SearchQuery) (events []*Event, err error)
//LatestSequence returns the latests sequence found by the the search query

View File

@@ -99,17 +99,6 @@ const (
)`
uniqueDelete = `DELETE FROM eventstore.unique_constraints
WHERE unique_type = $1 and unique_field = $2`
assetInsert = `INSERT INTO eventstore.assets
(
id,
asset
)
VALUES (
$1,
$2
)`
assetDelete = `DELETE FROM eventstore.assets
WHERE id = $1`
)
type CRDB struct {
@@ -124,7 +113,7 @@ func (db *CRDB) Health(ctx context.Context) error { return db.client.Ping() }
// Push adds all events to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
func (db *CRDB) Push(ctx context.Context, events []*repository.Event, assets []*repository.Asset, uniqueConstraints ...*repository.UniqueConstraint) error {
func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
err := crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
stmt, err := tx.PrepareContext(ctx, crdbInsert)
if err != nil {
@@ -162,11 +151,6 @@ func (db *CRDB) Push(ctx context.Context, events []*repository.Event, assets []*
if err != nil {
return err
}
err = db.handleAssets(ctx, tx, assets...)
if err != nil {
return err
}
return nil
})
if err != nil && !errors.Is(err, &caos_errs.CaosError{}) {
@@ -209,32 +193,6 @@ func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueC
return nil
}
// handleAssets adds or removes an asset
func (db *CRDB) handleAssets(ctx context.Context, tx *sql.Tx, assets ...*repository.Asset) (err error) {
if assets == nil || len(assets) == 0 || (len(assets) == 1 && assets[0] == nil) {
return nil
}
for _, asset := range assets {
if asset.Action == repository.AssetAdded {
_, err := tx.ExecContext(ctx, assetInsert, asset.ID, asset.Asset)
if err != nil {
logging.LogWithFields("SQL-M39fs",
"asset-id", asset.ID).WithError(err).Info("insert asset failed")
return caos_errs.ThrowInternal(err, "SQL-4M0gs", "unable to create asset")
}
} else if asset.Action == repository.AssetRemoved {
_, err := tx.ExecContext(ctx, assetDelete, asset.ID)
if err != nil {
logging.LogWithFields("SQL-3M9fs",
"asset-id", asset.ID).WithError(err).Info("delete asset failed")
return caos_errs.ThrowInternal(err, "SQL-Md9ds", "unable to remove unique constraint ")
}
}
}
return nil
}
// Filter returns all events matching the given search query
func (db *CRDB) Filter(ctx context.Context, searchQuery *repository.SearchQuery) (events []*repository.Event, err error) {
events = []*repository.Event{}

View File

@@ -270,11 +270,8 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
ctx context.Context
events []*repository.Event
uniqueConstraints *repository.UniqueConstraint
assets *repository.Asset
uniqueDataType string
uniqueDataField string
assetID string
asset []byte
}
type eventsRes struct {
pushedEventsCount int
@@ -388,9 +385,6 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
events: []*repository.Event{
generateEvent(t, "12"),
},
assets: generateAddAsset(t, "asset12", []byte{1}),
assetID: "asset12",
asset: []byte{1},
},
res: res{
wantErr: false,
@@ -408,9 +402,6 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
events: []*repository.Event{
generateEvent(t, "13"),
},
assets: generateRemoveAsset(t, "asset13"),
assetID: "asset13",
asset: []byte{1},
},
res: res{
wantErr: false,
@@ -434,14 +425,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
return
}
}
if tt.args.uniqueDataType != "" && tt.args.uniqueDataField != "" {
err := fillAssets(tt.args.assetID, tt.args.asset)
if err != nil {
t.Error("unable to prefill insert unique data: ", err)
return
}
}
if err := db.Push(tt.args.ctx, tt.args.events, []*repository.Asset{tt.args.assets}, tt.args.uniqueConstraints); (err != nil) != tt.res.wantErr {
if err := db.Push(tt.args.ctx, tt.args.events, tt.args.uniqueConstraints); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -467,18 +451,6 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
t.Errorf("expected unique count %d got %d", tt.res.eventsRes.uniqueCount, uniqueCount)
}
}
if tt.args.assets != nil {
countAssetRow := testCRDBClient.QueryRow("SELECT COUNT(*) FROM eventstore.assets where id = $1", tt.args.assets.ID)
var assetCount int
err := countAssetRow.Scan(&assetCount)
if err != nil {
t.Error("unable to query inserted rows: ", err)
return
}
if assetCount != tt.res.eventsRes.assetCount {
t.Errorf("expected asset count %d got %d", tt.res.eventsRes.assetCount, assetCount)
}
}
})
}
}
@@ -486,7 +458,6 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
func TestCRDB_Push_MultipleAggregate(t *testing.T) {
type args struct {
events []*repository.Event
assets []*repository.Asset
}
type eventsRes struct {
pushedEventsCount int
@@ -571,7 +542,7 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
}
if err := db.Push(context.Background(), tt.args.events, tt.args.assets); (err != nil) != tt.res.wantErr {
if err := db.Push(context.Background(), tt.args.events); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -762,7 +733,6 @@ func TestCRDB_Filter(t *testing.T) {
}
type fields struct {
existingEvents []*repository.Event
assets []*repository.Asset
}
type res struct {
eventCount int
@@ -828,7 +798,7 @@ func TestCRDB_Filter(t *testing.T) {
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents, tt.fields.assets); err != nil {
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
t.Errorf("error in setup = %v", err)
return
}
@@ -851,7 +821,6 @@ func TestCRDB_LatestSequence(t *testing.T) {
}
type fields struct {
existingEvents []*repository.Event
existingAssets []*repository.Asset
}
type res struct {
sequence uint64
@@ -915,7 +884,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents, tt.fields.existingAssets); err != nil {
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
t.Errorf("error in setup = %v", err)
return
}
@@ -935,7 +904,6 @@ func TestCRDB_LatestSequence(t *testing.T) {
func TestCRDB_Push_ResourceOwner(t *testing.T) {
type args struct {
events []*repository.Event
assets []*repository.Asset
}
type res struct {
resourceOwners []string
@@ -1058,7 +1026,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
}
if err := db.Push(context.Background(), tt.args.events, tt.args.assets); err != nil {
if err := db.Push(context.Background(), tt.args.events); err != nil {
t.Errorf("CRDB.Push() error = %v", err)
}

View File

@@ -522,7 +522,7 @@ func Test_query_events_with_crdb(t *testing.T) {
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents, tt.fields.existingAssets); err != nil {
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
t.Errorf("error in setup = %v", err)
return
}