mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:17:32 +00:00
feat: uniqueness (#1190)
* feat: uniqueness on events * fix: some tests * fix: add unique username * fix: nice error message * fix: add unique test * fix: add unique test * fix: add unique constraint to events * fix: correct unique constraint on user events * fix: remove user constraint * fix: add unique constraints * fix: add unique constraints * fix: add unique constraints * fix: unnique constraints without interface * fix: unique idp config * fix: unique constraint comments * fix: unique constraints in one table * fix: unique constraints error * fix: fix unique constraint on create user * fix: fix unique constraint on create project * fix: fix unique constraint on idp configs
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
context "context"
|
||||
eventstore "github.com/caos/zitadel/internal/eventstore"
|
||||
models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
eventstore0 "github.com/caos/zitadel/internal/eventstore/v2"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
reflect "reflect"
|
||||
)
|
||||
@@ -129,3 +130,17 @@ func (mr *MockEventstoreMockRecorder) Subscribe(arg0 ...interface{}) *gomock.Cal
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockEventstore)(nil).Subscribe), arg0...)
|
||||
}
|
||||
|
||||
// V2 mocks base method
|
||||
func (m *MockEventstore) V2() *eventstore0.Eventstore {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "V2")
|
||||
ret0, _ := ret[0].(*eventstore0.Eventstore)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// V2 indicates an expected call of V2
|
||||
func (mr *MockEventstoreMockRecorder) V2() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V2", reflect.TypeOf((*MockEventstore)(nil).V2))
|
||||
}
|
||||
|
@@ -5,92 +5,11 @@
|
||||
package mock
|
||||
|
||||
import (
|
||||
models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
reflect "reflect"
|
||||
time "time"
|
||||
)
|
||||
|
||||
// MockHandler is a mock of Handler interface
|
||||
type MockHandler struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockHandlerMockRecorder
|
||||
}
|
||||
|
||||
// MockHandlerMockRecorder is the mock recorder for MockHandler
|
||||
type MockHandlerMockRecorder struct {
|
||||
mock *MockHandler
|
||||
}
|
||||
|
||||
// NewMockHandler creates a new mock instance
|
||||
func NewMockHandler(ctrl *gomock.Controller) *MockHandler {
|
||||
mock := &MockHandler{ctrl: ctrl}
|
||||
mock.recorder = &MockHandlerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockHandler) EXPECT() *MockHandlerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// ViewModel mocks base method
|
||||
func (m *MockHandler) ViewModel() string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ViewModel")
|
||||
ret0, _ := ret[0].(string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ViewModel indicates an expected call of ViewModel
|
||||
func (mr *MockHandlerMockRecorder) ViewModel() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ViewModel", reflect.TypeOf((*MockHandler)(nil).ViewModel))
|
||||
}
|
||||
|
||||
// EventQuery mocks base method
|
||||
func (m *MockHandler) EventQuery() (*models.SearchQuery, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "EventQuery")
|
||||
ret0, _ := ret[0].(*models.SearchQuery)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// EventQuery indicates an expected call of EventQuery
|
||||
func (mr *MockHandlerMockRecorder) EventQuery() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventQuery", reflect.TypeOf((*MockHandler)(nil).EventQuery))
|
||||
}
|
||||
|
||||
// Reduce mocks base method
|
||||
func (m *MockHandler) Process(arg0 *models.Event) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Reduce", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Reduce indicates an expected call of Reduce
|
||||
func (mr *MockHandlerMockRecorder) Process(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reduce", reflect.TypeOf((*MockHandler)(nil).Process), arg0)
|
||||
}
|
||||
|
||||
// MinimumCycleDuration mocks base method
|
||||
func (m *MockHandler) MinimumCycleDuration() time.Duration {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "MinimumCycleDuration")
|
||||
ret0, _ := ret[0].(time.Duration)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// MinimumCycleDuration indicates an expected call of MinimumCycleDuration
|
||||
func (mr *MockHandlerMockRecorder) MinimumCycleDuration() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MinimumCycleDuration", reflect.TypeOf((*MockHandler)(nil).MinimumCycleDuration))
|
||||
}
|
||||
|
||||
// MockLocker is a mock of Locker interface
|
||||
type MockLocker struct {
|
||||
ctrl *gomock.Controller
|
||||
|
@@ -18,6 +18,8 @@ type EventPusher interface {
|
||||
// * struct which can be marshalled to json
|
||||
// * pointer to struct which can be marshalled to json
|
||||
Data() interface{}
|
||||
//UniqueConstraints should be added for unique attributes of an event, if nil constraints will not be checked
|
||||
UniqueConstraints() []*EventUniqueConstraint
|
||||
}
|
||||
|
||||
type EventReader interface {
|
||||
|
@@ -81,3 +81,13 @@ func NewBaseEventForPush(ctx context.Context, typ EventType) *BaseEvent {
|
||||
EventType: typ,
|
||||
}
|
||||
}
|
||||
|
||||
func NewBaseEventForPushWithResourceOwner(ctx context.Context, typ EventType, resourceOwner string) *BaseEvent {
|
||||
svcName := service.FromContext(ctx)
|
||||
return &BaseEvent{
|
||||
User: authz.GetCtxData(ctx).UserID,
|
||||
Service: svcName,
|
||||
EventType: typ,
|
||||
resourceOwner: resourceOwner,
|
||||
}
|
||||
}
|
||||
|
@@ -50,12 +50,12 @@ func (es *Eventstore) PushAggregate(ctx context.Context, writeModel queryReducer
|
||||
//PushAggregates maps the events of all aggregates to an eventstore event
|
||||
// based on the pushMapper
|
||||
func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...Aggregater) ([]EventReader, error) {
|
||||
events, err := es.aggregatesToEvents(aggregates)
|
||||
events, uniqueConstraints, err := es.aggregatesToEvents(aggregates)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = es.repo.Push(ctx, events...)
|
||||
err = es.repo.Push(ctx, events, uniqueConstraints...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -63,13 +63,14 @@ func (es *Eventstore) PushAggregates(ctx context.Context, aggregates ...Aggregat
|
||||
return es.mapEvents(events)
|
||||
}
|
||||
|
||||
func (es *Eventstore) aggregatesToEvents(aggregates []Aggregater) ([]*repository.Event, error) {
|
||||
func (es *Eventstore) aggregatesToEvents(aggregates []Aggregater) ([]*repository.Event, []*repository.UniqueConstraint, error) {
|
||||
events := make([]*repository.Event, 0, len(aggregates))
|
||||
uniqueConstraints := make([]*repository.UniqueConstraint, 0)
|
||||
for _, aggregate := range aggregates {
|
||||
for _, event := range aggregate.Events() {
|
||||
data, err := eventData(event)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
events = append(events, &repository.Event{
|
||||
AggregateID: aggregate.ID(),
|
||||
@@ -81,9 +82,21 @@ func (es *Eventstore) aggregatesToEvents(aggregates []Aggregater) ([]*repository
|
||||
Version: repository.Version(aggregate.Version()),
|
||||
Data: data,
|
||||
})
|
||||
if event.UniqueConstraints() != nil {
|
||||
for _, constraint := range event.UniqueConstraints() {
|
||||
uniqueConstraints = append(uniqueConstraints,
|
||||
&repository.UniqueConstraint{
|
||||
UniqueType: constraint.UniqueType,
|
||||
UniqueField: constraint.UniqueField,
|
||||
Action: uniqueConstraintActionToRepository(constraint.Action),
|
||||
ErrorMessage: constraint.ErrorMessage,
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return events, nil
|
||||
return events, uniqueConstraints, nil
|
||||
}
|
||||
|
||||
//FilterEvents filters the stored events based on the searchQuery
|
||||
@@ -209,3 +222,14 @@ func eventData(event EventPusher) ([]byte, error) {
|
||||
}
|
||||
return nil, errors.ThrowInvalidArgument(nil, "V2-91NRm", "wrong type of event data")
|
||||
}
|
||||
|
||||
func uniqueConstraintActionToRepository(action UniqueConstraintAction) repository.UniqueConstraintAction {
|
||||
switch action {
|
||||
case UniqueConstraintAdd:
|
||||
return repository.UniqueConstraintAdd
|
||||
case UniqueConstraintRemove:
|
||||
return repository.UniqueConstraintRemoved
|
||||
default:
|
||||
return repository.UniqueConstraintAdd
|
||||
}
|
||||
}
|
||||
|
@@ -63,6 +63,10 @@ func (e *testEvent) Data() interface{} {
|
||||
return e.data()
|
||||
}
|
||||
|
||||
func (e *testEvent) UniqueConstraint() []EventUniqueConstraint {
|
||||
return nil
|
||||
}
|
||||
|
||||
func testFilterMapper(event *repository.Event) (EventReader, error) {
|
||||
if event == nil {
|
||||
return newTestEvent("hodor", nil, false), nil
|
||||
@@ -543,7 +547,7 @@ func TestEventstore_aggregatesToEvents(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
es := &Eventstore{}
|
||||
events, err := es.aggregatesToEvents(tt.args.aggregates)
|
||||
events, _, err := es.aggregatesToEvents(tt.args.aggregates)
|
||||
if (err != nil) != tt.res.wantErr {
|
||||
t.Errorf("Eventstore.aggregatesToEvents() error = %v, wantErr %v", err, tt.res.wantErr)
|
||||
return
|
||||
@@ -576,7 +580,7 @@ func (repo *testRepo) Health(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (repo *testRepo) Push(ctx context.Context, events ...*repository.Event) error {
|
||||
func (repo *testRepo) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
|
||||
if repo.err != nil {
|
||||
return repo.err
|
||||
}
|
||||
|
@@ -29,7 +29,6 @@ func NewUserAggregate(id string) *UserAggregate {
|
||||
"test.user",
|
||||
"caos",
|
||||
"v1",
|
||||
0,
|
||||
),
|
||||
}
|
||||
}
|
||||
@@ -84,6 +83,10 @@ func (e *UserAddedEvent) Data() interface{} {
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *UserAddedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// User first name changed event start
|
||||
// ------------------------------------------------------------
|
||||
@@ -122,6 +125,10 @@ func (e *UserFirstNameChangedEvent) Data() interface{} {
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *UserFirstNameChangedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// User password checked event start
|
||||
// ------------------------------------------------------------
|
||||
@@ -152,6 +159,10 @@ func (e *UserPasswordCheckedEvent) Data() interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *UserPasswordCheckedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// User deleted event
|
||||
// ------------------------------------------------------------
|
||||
@@ -182,6 +193,10 @@ func (e *UserDeletedEvent) Data() interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *UserDeletedEvent) UniqueConstraints() []*eventstore.EventUniqueConstraint {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ------------------------------------------------------------
|
||||
// Users read model start
|
||||
// ------------------------------------------------------------
|
||||
|
@@ -9,8 +9,9 @@ type Repository interface {
|
||||
//Health checks if the connection to the storage is available
|
||||
Health(ctx context.Context) error
|
||||
// PushEvents adds all events of the given aggregates to the eventstreams of the aggregates.
|
||||
// if unique constraints are pushed, they will be added to the unique table for checking unique constraint violations
|
||||
// This call is transaction save. The transaction will be rolled back if one event fails
|
||||
Push(ctx context.Context, events ...*Event) error
|
||||
Push(ctx context.Context, events []*Event, uniqueConstraints ...*UniqueConstraint) error
|
||||
// Filter returns all events matching the given search query
|
||||
Filter(ctx context.Context, searchQuery *SearchQuery) (events []*Event, err error)
|
||||
//LatestSequence returns the latests sequence found by the the search query
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"github.com/lib/pq"
|
||||
"regexp"
|
||||
"strconv"
|
||||
|
||||
@@ -88,6 +89,18 @@ const (
|
||||
" FROM data " +
|
||||
" ) " +
|
||||
"RETURNING id, event_sequence, previous_sequence, creation_date, resource_owner"
|
||||
uniqueInsert = `INSERT INTO eventstore.unique_constraints
|
||||
(
|
||||
unique_type,
|
||||
unique_field
|
||||
)
|
||||
VALUES (
|
||||
$1,
|
||||
$2
|
||||
)`
|
||||
|
||||
uniqueDelete = `DELETE FROM eventstore.unique_constraints
|
||||
WHERE unique_type = $1 and unique_field = $2`
|
||||
)
|
||||
|
||||
type CRDB struct {
|
||||
@@ -102,7 +115,7 @@ func (db *CRDB) Health(ctx context.Context) error { return db.client.Ping() }
|
||||
|
||||
// Push adds all events to the eventstreams of the aggregates.
|
||||
// This call is transaction save. The transaction will be rolled back if one event fails
|
||||
func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
|
||||
func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
|
||||
err := crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
|
||||
stmt, err := tx.PrepareContext(ctx, crdbInsert)
|
||||
if err != nil {
|
||||
@@ -136,6 +149,10 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
|
||||
}
|
||||
}
|
||||
|
||||
err = db.handleUniqueConstraints(ctx, tx, uniqueConstraints...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, &caos_errs.CaosError{}) {
|
||||
@@ -145,6 +162,39 @@ func (db *CRDB) Push(ctx context.Context, events ...*repository.Event) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// handleUniqueConstraints adds or removes unique constraints
|
||||
func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueConstraints ...*repository.UniqueConstraint) (err error) {
|
||||
if uniqueConstraints == nil || len(uniqueConstraints) == 0 || (len(uniqueConstraints) == 1 && uniqueConstraints[0] == nil) {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, uniqueConstraint := range uniqueConstraints {
|
||||
if uniqueConstraint.Action == repository.UniqueConstraintAdd {
|
||||
_, err := tx.ExecContext(ctx, uniqueInsert, uniqueConstraint.UniqueType, uniqueConstraint.UniqueField)
|
||||
if err != nil {
|
||||
logging.LogWithFields("SQL-IP3js",
|
||||
"unique_type", uniqueConstraint.UniqueType,
|
||||
"unique_field", uniqueConstraint.UniqueField).WithError(err).Info("insert unique constraint failed")
|
||||
|
||||
if db.isUniqueViolationError(err) {
|
||||
return caos_errs.ThrowAlreadyExists(err, "SQL-M0dsf", uniqueConstraint.ErrorMessage)
|
||||
}
|
||||
|
||||
return caos_errs.ThrowInternal(err, "SQL-dM9ds", "unable to create unique constraint ")
|
||||
}
|
||||
} else if uniqueConstraint.Action == repository.UniqueConstraintRemoved {
|
||||
_, err := tx.ExecContext(ctx, uniqueDelete, uniqueConstraint.UniqueType, uniqueConstraint.UniqueField)
|
||||
if err != nil {
|
||||
logging.LogWithFields("SQL-M0vsf",
|
||||
"unique_type", uniqueConstraint.UniqueType,
|
||||
"unique_field", uniqueConstraint.UniqueField).WithError(err).Info("delete unique constraint failed")
|
||||
return caos_errs.ThrowInternal(err, "SQL-2M9fs", "unable to remove unique constraint ")
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Filter returns all events matching the given search query
|
||||
func (db *CRDB) Filter(ctx context.Context, searchQuery *repository.SearchQuery) (events []*repository.Event, err error) {
|
||||
events = []*repository.Event{}
|
||||
@@ -262,3 +312,12 @@ func (db *CRDB) placeholder(query string) string {
|
||||
}
|
||||
return replaced
|
||||
}
|
||||
|
||||
func (db *CRDB) isUniqueViolationError(err error) bool {
|
||||
if pqErr, ok := err.(*pq.Error); ok {
|
||||
if pqErr.Code == "23505" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@@ -266,11 +266,15 @@ func TestCRDB_columnName(t *testing.T) {
|
||||
|
||||
func TestCRDB_Push_OneAggregate(t *testing.T) {
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
events []*repository.Event
|
||||
ctx context.Context
|
||||
events []*repository.Event
|
||||
uniqueConstraints *repository.UniqueConstraint
|
||||
uniqueDataType string
|
||||
uniqueDataField string
|
||||
}
|
||||
type eventsRes struct {
|
||||
pushedEventsCount int
|
||||
uniqueCount int
|
||||
aggType repository.AggregateType
|
||||
aggID []string
|
||||
}
|
||||
@@ -334,26 +338,84 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "push 1 event and add unique constraint",
|
||||
args: args{
|
||||
ctx: context.Background(),
|
||||
events: []*repository.Event{
|
||||
generateEvent(t, "10"),
|
||||
},
|
||||
uniqueConstraints: generateAddUniqueConstraint(t, "usernames", "field"),
|
||||
},
|
||||
res: res{
|
||||
wantErr: false,
|
||||
eventsRes: eventsRes{
|
||||
pushedEventsCount: 1,
|
||||
uniqueCount: 1,
|
||||
aggID: []string{"10"},
|
||||
aggType: repository.AggregateType(t.Name()),
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "push 1 event and remove unique constraint",
|
||||
args: args{
|
||||
ctx: context.Background(),
|
||||
events: []*repository.Event{
|
||||
generateEvent(t, "11"),
|
||||
},
|
||||
uniqueConstraints: generateRemoveUniqueConstraint(t, "usernames", "testremove"),
|
||||
uniqueDataType: "usernames",
|
||||
uniqueDataField: "testremove",
|
||||
},
|
||||
res: res{
|
||||
wantErr: false,
|
||||
eventsRes: eventsRes{
|
||||
pushedEventsCount: 1,
|
||||
uniqueCount: 0,
|
||||
aggID: []string{"11"},
|
||||
aggType: repository.AggregateType(t.Name()),
|
||||
}},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
db := &CRDB{
|
||||
client: testCRDBClient,
|
||||
}
|
||||
if err := db.Push(tt.args.ctx, tt.args.events...); (err != nil) != tt.res.wantErr {
|
||||
if tt.args.uniqueDataType != "" && tt.args.uniqueDataField != "" {
|
||||
err := fillUniqueData(tt.args.uniqueDataType, tt.args.uniqueDataField)
|
||||
if err != nil {
|
||||
t.Error("unable to prefill insert unique data: ", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := db.Push(tt.args.ctx, tt.args.events, tt.args.uniqueConstraints); (err != nil) != tt.res.wantErr {
|
||||
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
|
||||
}
|
||||
|
||||
countRow := testCRDBClient.QueryRow("SELECT COUNT(*) FROM eventstore.events where aggregate_type = $1 AND aggregate_id = ANY($2)", tt.res.eventsRes.aggType, pq.Array(tt.res.eventsRes.aggID))
|
||||
var count int
|
||||
err := countRow.Scan(&count)
|
||||
countEventRow := testCRDBClient.QueryRow("SELECT COUNT(*) FROM eventstore.events where aggregate_type = $1 AND aggregate_id = ANY($2)", tt.res.eventsRes.aggType, pq.Array(tt.res.eventsRes.aggID))
|
||||
var eventCount int
|
||||
err := countEventRow.Scan(&eventCount)
|
||||
if err != nil {
|
||||
t.Error("unable to query inserted rows: ", err)
|
||||
return
|
||||
}
|
||||
if count != tt.res.eventsRes.pushedEventsCount {
|
||||
t.Errorf("expected push count %d got %d", tt.res.eventsRes.pushedEventsCount, count)
|
||||
if eventCount != tt.res.eventsRes.pushedEventsCount {
|
||||
t.Errorf("expected push count %d got %d", tt.res.eventsRes.pushedEventsCount, eventCount)
|
||||
}
|
||||
if tt.args.uniqueConstraints != nil {
|
||||
countUniqueRow := testCRDBClient.QueryRow("SELECT COUNT(*) FROM eventstore.unique_constraints where unique_type = $1 AND unique_field = $2", tt.args.uniqueConstraints.UniqueType, tt.args.uniqueConstraints.UniqueField)
|
||||
var uniqueCount int
|
||||
err := countUniqueRow.Scan(&uniqueCount)
|
||||
if err != nil {
|
||||
t.Error("unable to query inserted rows: ", err)
|
||||
return
|
||||
}
|
||||
if uniqueCount != tt.res.eventsRes.uniqueCount {
|
||||
t.Errorf("expected unique count %d got %d", tt.res.eventsRes.uniqueCount, uniqueCount)
|
||||
}
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -445,7 +507,7 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
|
||||
db := &CRDB{
|
||||
client: testCRDBClient,
|
||||
}
|
||||
if err := db.Push(context.Background(), tt.args.events...); (err != nil) != tt.res.wantErr {
|
||||
if err := db.Push(context.Background(), tt.args.events); (err != nil) != tt.res.wantErr {
|
||||
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
|
||||
}
|
||||
|
||||
@@ -616,7 +678,7 @@ func TestCRDB_Push_Parallel(t *testing.T) {
|
||||
for _, events := range tt.args.events {
|
||||
wg.Add(1)
|
||||
go func(events []*repository.Event) {
|
||||
err := db.Push(context.Background(), events...)
|
||||
err := db.Push(context.Background(), events)
|
||||
if err != nil {
|
||||
errsMu.Lock()
|
||||
errs = append(errs, err)
|
||||
@@ -728,7 +790,7 @@ func TestCRDB_Filter(t *testing.T) {
|
||||
}
|
||||
|
||||
// setup initial data for query
|
||||
if err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
|
||||
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
|
||||
t.Errorf("error in setup = %v", err)
|
||||
return
|
||||
}
|
||||
@@ -814,7 +876,7 @@ func TestCRDB_LatestSequence(t *testing.T) {
|
||||
}
|
||||
|
||||
// setup initial data for query
|
||||
if err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
|
||||
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
|
||||
t.Errorf("error in setup = %v", err)
|
||||
return
|
||||
}
|
||||
@@ -956,7 +1018,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
|
||||
db := &CRDB{
|
||||
client: testCRDBClient,
|
||||
}
|
||||
if err := db.Push(context.Background(), tt.args.events...); err != nil {
|
||||
if err := db.Push(context.Background(), tt.args.events); err != nil {
|
||||
t.Errorf("CRDB.Push() error = %v", err)
|
||||
}
|
||||
|
||||
@@ -1036,3 +1098,25 @@ func generateEventWithData(t *testing.T, aggregateID string, data []byte) *repos
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func generateAddUniqueConstraint(t *testing.T, table, uniqueField string) *repository.UniqueConstraint {
|
||||
t.Helper()
|
||||
e := &repository.UniqueConstraint{
|
||||
UniqueType: table,
|
||||
UniqueField: uniqueField,
|
||||
Action: repository.UniqueConstraintAdd,
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
|
||||
func generateRemoveUniqueConstraint(t *testing.T, table, uniqueField string) *repository.UniqueConstraint {
|
||||
t.Helper()
|
||||
e := &repository.UniqueConstraint{
|
||||
UniqueType: table,
|
||||
UniqueField: uniqueField,
|
||||
Action: repository.UniqueConstraintRemoved,
|
||||
}
|
||||
|
||||
return e
|
||||
}
|
||||
|
@@ -80,6 +80,11 @@ func executeMigrations() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func fillUniqueData(unique_type, field string) error {
|
||||
_, err := testCRDBClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field) VALUES ($1, $2)", unique_type, field)
|
||||
return err
|
||||
}
|
||||
|
||||
type migrationPaths []string
|
||||
|
||||
type version struct {
|
||||
|
@@ -521,7 +521,7 @@ func Test_query_events_with_crdb(t *testing.T) {
|
||||
}
|
||||
|
||||
// setup initial data for query
|
||||
if err := db.Push(context.Background(), tt.fields.existingEvents...); err != nil {
|
||||
if err := db.Push(context.Background(), tt.fields.existingEvents); err != nil {
|
||||
t.Errorf("error in setup = %v", err)
|
||||
return
|
||||
}
|
||||
|
29
internal/eventstore/v2/repository/unique_constraint.go
Normal file
29
internal/eventstore/v2/repository/unique_constraint.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package repository
|
||||
|
||||
//UniqueCheck represents all information about a unique attribute
|
||||
type UniqueConstraint struct {
|
||||
//UniqueField is the field which should be unique
|
||||
UniqueField string
|
||||
|
||||
//UniqueType is the type of the unique field
|
||||
UniqueType string
|
||||
|
||||
//Action defines if unique constraint should be added or removed
|
||||
Action UniqueConstraintAction
|
||||
|
||||
//ErrorMessage is the message key which should be returned if constraint is violated
|
||||
ErrorMessage string
|
||||
}
|
||||
|
||||
type UniqueConstraintAction int32
|
||||
|
||||
const (
|
||||
UniqueConstraintAdd UniqueConstraintAction = iota
|
||||
UniqueConstraintRemoved
|
||||
|
||||
uniqueConstraintActionCount
|
||||
)
|
||||
|
||||
func (f UniqueConstraintAction) Valid() bool {
|
||||
return f >= 0 && f < uniqueConstraintActionCount
|
||||
}
|
43
internal/eventstore/v2/unique_constraint.go
Normal file
43
internal/eventstore/v2/unique_constraint.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package eventstore
|
||||
|
||||
type EventUniqueConstraint struct {
|
||||
// UniqueType is the table name for the unique constraint
|
||||
UniqueType string
|
||||
//UniqueField is the unique key
|
||||
UniqueField string
|
||||
//Action defines if unique constraint should be added or removed
|
||||
Action UniqueConstraintAction
|
||||
//ErrorMessage defines the translation file key for the error message
|
||||
ErrorMessage string
|
||||
}
|
||||
|
||||
type UniqueConstraintAction int32
|
||||
|
||||
const (
|
||||
UniqueConstraintAdd UniqueConstraintAction = iota
|
||||
UniqueConstraintRemove
|
||||
|
||||
uniqueConstraintActionCount
|
||||
)
|
||||
|
||||
func NewAddEventUniqueConstraint(
|
||||
uniqueType,
|
||||
uniqueField,
|
||||
errMessage string) *EventUniqueConstraint {
|
||||
return &EventUniqueConstraint{
|
||||
UniqueType: uniqueType,
|
||||
UniqueField: uniqueField,
|
||||
ErrorMessage: errMessage,
|
||||
Action: UniqueConstraintAdd,
|
||||
}
|
||||
}
|
||||
|
||||
func NewRemoveEventUniqueConstraint(
|
||||
uniqueType,
|
||||
uniqueField string) *EventUniqueConstraint {
|
||||
return &EventUniqueConstraint{
|
||||
UniqueType: uniqueType,
|
||||
UniqueField: uniqueField,
|
||||
Action: UniqueConstraintRemove,
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user