feat: add assets to eventstore and event (#1674)

* fix: add assets to eventstore and event

* fix: project member, grant member, app changed tests

* fix: asset migrations

* feat: add asset tests

* feat: add asset tests

* Update internal/eventstore/repository/repository.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* feat: add asset tests

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi
2021-04-27 12:58:18 +02:00
committed by GitHub
parent eaf966e3d2
commit f51f0ede5c
102 changed files with 1079 additions and 34 deletions

View File

@@ -97,9 +97,19 @@ const (
$1,
$2
)`
uniqueDelete = `DELETE FROM eventstore.unique_constraints
WHERE unique_type = $1 and unique_field = $2`
assetInsert = `INSERT INTO eventstore.assets
(
id,
asset
)
VALUES (
$1,
$2
)`
assetDelete = `DELETE FROM eventstore.assets
WHERE id = $1`
)
type CRDB struct {
@@ -114,7 +124,7 @@ func (db *CRDB) Health(ctx context.Context) error { return db.client.Ping() }
// Push adds all events to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
func (db *CRDB) Push(ctx context.Context, events []*repository.Event, assets []*repository.Asset, uniqueConstraints ...*repository.UniqueConstraint) error {
err := crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
stmt, err := tx.PrepareContext(ctx, crdbInsert)
if err != nil {
@@ -152,6 +162,11 @@ func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueCons
if err != nil {
return err
}
err = db.handleAssets(ctx, tx, assets...)
if err != nil {
return err
}
return nil
})
if err != nil && !errors.Is(err, &caos_errs.CaosError{}) {
@@ -194,6 +209,32 @@ func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueC
return nil
}
// handleAssets adds or removes an asset
func (db *CRDB) handleAssets(ctx context.Context, tx *sql.Tx, assets ...*repository.Asset) (err error) {
if assets == nil || len(assets) == 0 || (len(assets) == 1 && assets[0] == nil) {
return nil
}
for _, asset := range assets {
if asset.Action == repository.AssetAdded {
_, err := tx.ExecContext(ctx, assetInsert, asset.ID, asset.Asset)
if err != nil {
logging.LogWithFields("SQL-M39fs",
"asset-id", asset.ID).WithError(err).Info("insert asset failed")
return caos_errs.ThrowInternal(err, "SQL-4M0gs", "unable to create asset")
}
} else if asset.Action == repository.AssetRemoved {
_, err := tx.ExecContext(ctx, assetDelete, asset.ID)
if err != nil {
logging.LogWithFields("SQL-3M9fs",
"asset-id", asset.ID).WithError(err).Info("delete asset failed")
return caos_errs.ThrowInternal(err, "SQL-Md9ds", "unable to remove unique constraint ")
}
}
}
return nil
}
// Filter returns all events matching the given search query
func (db *CRDB) Filter(ctx context.Context, searchQuery *repository.SearchQuery) (events []*repository.Event, err error) {
events = []*repository.Event{}

View File

@@ -5,9 +5,10 @@ import (
"sync"
"testing"
"github.com/caos/zitadel/internal/eventstore/repository"
"github.com/lib/pq"
_ "github.com/lib/pq"
"github.com/caos/zitadel/internal/eventstore/repository"
)
func TestCRDB_placeholder(t *testing.T) {
@@ -269,12 +270,16 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
ctx context.Context
events []*repository.Event
uniqueConstraints *repository.UniqueConstraint
assets *repository.Asset
uniqueDataType string
uniqueDataField string
assetID string
asset []byte
}
type eventsRes struct {
pushedEventsCount int
uniqueCount int
assetCount int
aggType repository.AggregateType
aggID []string
}
@@ -376,6 +381,46 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
aggType: repository.AggregateType(t.Name()),
}},
},
{
name: "push 1 event and add asset",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "12"),
},
assets: generateAddAsset(t, "asset12", []byte{1}),
assetID: "asset12",
asset: []byte{1},
},
res: res{
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 1,
assetCount: 1,
aggID: []string{"12"},
aggType: repository.AggregateType(t.Name()),
}},
},
{
name: "push 1 event and remove asset",
args: args{
ctx: context.Background(),
events: []*repository.Event{
generateEvent(t, "13"),
},
assets: generateRemoveAsset(t, "asset13"),
assetID: "asset13",
asset: []byte{1},
},
res: res{
wantErr: false,
eventsRes: eventsRes{
pushedEventsCount: 1,
assetCount: 0,
aggID: []string{"13"},
aggType: repository.AggregateType(t.Name()),
}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -389,7 +434,14 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
return
}
}
if err := db.Push(tt.args.ctx, tt.args.events, tt.args.uniqueConstraints); (err != nil) != tt.res.wantErr {
if tt.args.uniqueDataType != "" && tt.args.uniqueDataField != "" {
err := fillAssets(tt.args.assetID, tt.args.asset)
if err != nil {
t.Error("unable to prefill insert unique data: ", err)
return
}
}
if err := db.Push(tt.args.ctx, tt.args.events, []*repository.Asset{tt.args.assets}, tt.args.uniqueConstraints); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -415,7 +467,18 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
t.Errorf("expected unique count %d got %d", tt.res.eventsRes.uniqueCount, uniqueCount)
}
}
if tt.args.assets != nil {
countAssetRow := testCRDBClient.QueryRow("SELECT COUNT(*) FROM eventstore.assets where id = $1", tt.args.assets.ID)
var assetCount int
err := countAssetRow.Scan(&assetCount)
if err != nil {
t.Error("unable to query inserted rows: ", err)
return
}
if assetCount != tt.res.eventsRes.assetCount {
t.Errorf("expected asset count %d got %d", tt.res.eventsRes.assetCount, assetCount)
}
}
})
}
}
@@ -423,6 +486,7 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
func TestCRDB_Push_MultipleAggregate(t *testing.T) {
type args struct {
events []*repository.Event
assets []*repository.Asset
}
type eventsRes struct {
pushedEventsCount int
@@ -507,7 +571,7 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
}
if err := db.Push(context.Background(), tt.args.events); (err != nil) != tt.res.wantErr {
if err := db.Push(context.Background(), tt.args.events, tt.args.assets); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
}
@@ -651,7 +715,7 @@ func TestCRDB_Push_Parallel(t *testing.T) {
for _, events := range tt.args.events {
wg.Add(1)
go func(events []*repository.Event) {
err := db.Push(context.Background(), events)
err := db.Push(context.Background(), events, nil)
if err != nil {
errsMu.Lock()
errs = append(errs, err)
@@ -698,6 +762,7 @@ func TestCRDB_Filter(t *testing.T) {
}
type fields struct {
existingEvents []*repository.Event
assets []*repository.Asset
}
type res struct {
eventCount int
@@ -763,7 +828,7 @@ func TestCRDB_Filter(t *testing.T) {
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
if err := db.Push(context.Background(), tt.fields.existingEvents, tt.fields.assets); err != nil {
t.Errorf("error in setup = %v", err)
return
}
@@ -786,6 +851,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
}
type fields struct {
existingEvents []*repository.Event
existingAssets []*repository.Asset
}
type res struct {
sequence uint64
@@ -849,7 +915,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
if err := db.Push(context.Background(), tt.fields.existingEvents, tt.fields.existingAssets); err != nil {
t.Errorf("error in setup = %v", err)
return
}
@@ -869,6 +935,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
func TestCRDB_Push_ResourceOwner(t *testing.T) {
type args struct {
events []*repository.Event
assets []*repository.Asset
}
type res struct {
resourceOwners []string
@@ -991,7 +1058,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
}
if err := db.Push(context.Background(), tt.args.events); err != nil {
if err := db.Push(context.Background(), tt.args.events, tt.args.assets); err != nil {
t.Errorf("CRDB.Push() error = %v", err)
}
@@ -1093,3 +1160,22 @@ func generateRemoveUniqueConstraint(t *testing.T, table, uniqueField string) *re
return e
}
func generateAddAsset(t *testing.T, id string, asset []byte) *repository.Asset {
t.Helper()
e := &repository.Asset{
ID: id,
Asset: asset,
Action: repository.AssetAdded,
}
return e
}
func generateRemoveAsset(t *testing.T, id string) *repository.Asset {
t.Helper()
e := &repository.Asset{
ID: id,
Action: repository.AssetRemoved,
}
return e
}

View File

@@ -100,6 +100,11 @@ func fillUniqueData(unique_type, field string) error {
return err
}
func fillAssets(id string, asset []byte) error {
_, err := testCRDBClient.Exec("INSERT INTO eventstore.assets (id, asset) VALUES ($1, $2)", id, asset)
return err
}
type migrationPaths []string
type version struct {

View File

@@ -304,6 +304,7 @@ func Test_query_events_with_crdb(t *testing.T) {
}
type fields struct {
existingEvents []*repository.Event
existingAssets []*repository.Asset
client *sql.DB
}
type res struct {
@@ -521,7 +522,7 @@ func Test_query_events_with_crdb(t *testing.T) {
}
// setup initial data for query
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
if err := db.Push(context.Background(), tt.fields.existingEvents, tt.fields.existingAssets); err != nil {
t.Errorf("error in setup = %v", err)
return
}