mirror of
				https://github.com/zitadel/zitadel.git
				synced 2025-11-04 06:38:48 +00:00 
			
		
		
		
	fix: push timeout (#4882)
* push with timeout * test: config for eventstore
This commit is contained in:
		@@ -320,6 +320,9 @@ Actions:
 | 
			
		||||
      - localhost
 | 
			
		||||
      - "127.0.0.1"
 | 
			
		||||
 | 
			
		||||
Eventstore:
 | 
			
		||||
  PushTimeout: 15s
 | 
			
		||||
 | 
			
		||||
DefaultInstance:
 | 
			
		||||
  InstanceName:
 | 
			
		||||
  DefaultLanguage: en
 | 
			
		||||
 
 | 
			
		||||
@@ -62,7 +62,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
 | 
			
		||||
	dbClient, err := database.Connect(config.Database, false)
 | 
			
		||||
	logging.OnError(err).Fatal("unable to connect to database")
 | 
			
		||||
 | 
			
		||||
	eventstoreClient, err := eventstore.Start(dbClient)
 | 
			
		||||
	eventstoreClient, err := eventstore.Start(&eventstore.Config{Client: dbClient})
 | 
			
		||||
	logging.OnError(err).Fatal("unable to start eventstore")
 | 
			
		||||
	migration.RegisterMappers(eventstoreClient)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@ import (
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/config/systemdefaults"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/crypto"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/database"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/eventstore"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/id"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/query/projection"
 | 
			
		||||
	static_config "github.com/zitadel/zitadel/internal/static/config"
 | 
			
		||||
@@ -60,6 +61,7 @@ type Config struct {
 | 
			
		||||
	CustomerPortal    string
 | 
			
		||||
	Machine           *id.Config
 | 
			
		||||
	Actions           *actions.Config
 | 
			
		||||
	Eventstore        *eventstore.Config
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func MustNewConfig(v *viper.Viper) *Config {
 | 
			
		||||
 
 | 
			
		||||
@@ -99,7 +99,8 @@ func startZitadel(config *Config, masterKey string) error {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	eventstoreClient, err := eventstore.Start(dbClient)
 | 
			
		||||
	config.Eventstore.Client = dbClient
 | 
			
		||||
	eventstoreClient, err := eventstore.Start(config.Eventstore)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("cannot start eventstore for queries: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -29,7 +29,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore {
 | 
			
		||||
	for _, e := range expects {
 | 
			
		||||
		e(m)
 | 
			
		||||
	}
 | 
			
		||||
	es := eventstore.NewEventstore(m)
 | 
			
		||||
	es := eventstore.NewEventstore(eventstore.TestConfig(m))
 | 
			
		||||
	iam_repo.RegisterEventMappers(es)
 | 
			
		||||
	org.RegisterEventMappers(es)
 | 
			
		||||
	usr_repo.RegisterEventMappers(es)
 | 
			
		||||
 
 | 
			
		||||
@@ -2,10 +2,24 @@ package eventstore
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"database/sql"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/eventstore/repository"
 | 
			
		||||
	z_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func Start(sqlClient *sql.DB) (*Eventstore, error) {
 | 
			
		||||
	return NewEventstore(z_sql.NewCRDB(sqlClient)), nil
 | 
			
		||||
type Config struct {
 | 
			
		||||
	PushTimeout time.Duration
 | 
			
		||||
	Client      *sql.DB
 | 
			
		||||
 | 
			
		||||
	repo repository.Repository
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestConfig(repo repository.Repository) *Config {
 | 
			
		||||
	return &Config{repo: repo}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Start(config *Config) (*Eventstore, error) {
 | 
			
		||||
	config.repo = z_sql.NewCRDB(config.Client)
 | 
			
		||||
	return NewEventstore(config), nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -6,6 +6,7 @@ import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/api/authz"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/errors"
 | 
			
		||||
@@ -18,17 +19,19 @@ type Eventstore struct {
 | 
			
		||||
	repo              repository.Repository
 | 
			
		||||
	interceptorMutex  sync.Mutex
 | 
			
		||||
	eventInterceptors map[EventType]eventTypeInterceptors
 | 
			
		||||
	PushTimeout       time.Duration
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type eventTypeInterceptors struct {
 | 
			
		||||
	eventMapper func(*repository.Event) (Event, error)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewEventstore(repo repository.Repository) *Eventstore {
 | 
			
		||||
func NewEventstore(config *Config) *Eventstore {
 | 
			
		||||
	return &Eventstore{
 | 
			
		||||
		repo:              repo,
 | 
			
		||||
		repo:              config.repo,
 | 
			
		||||
		eventInterceptors: map[EventType]eventTypeInterceptors{},
 | 
			
		||||
		interceptorMutex:  sync.Mutex{},
 | 
			
		||||
		PushTimeout:       config.PushTimeout,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -45,6 +48,13 @@ func (es *Eventstore) Push(ctx context.Context, cmds ...Command) ([]Event, error
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if es.PushTimeout > 0 {
 | 
			
		||||
		var cancel func()
 | 
			
		||||
		ctx, cancel = context.WithTimeout(ctx, es.PushTimeout)
 | 
			
		||||
		defer cancel()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = es.repo.Push(ctx, events, constraints...)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
 
 | 
			
		||||
@@ -9,7 +9,6 @@ import (
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/api/authz"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/eventstore"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/eventstore/repository"
 | 
			
		||||
	"github.com/zitadel/zitadel/internal/eventstore/repository/sql"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// ------------------------------------------------------------
 | 
			
		||||
@@ -287,7 +286,12 @@ func (rm *UserReadModel) Reduce() error {
 | 
			
		||||
// ------------------------------------------------------------
 | 
			
		||||
 | 
			
		||||
func TestUserReadModel(t *testing.T) {
 | 
			
		||||
	es := eventstore.NewEventstore(sql.NewCRDB(testCRDBClient))
 | 
			
		||||
	es, err := eventstore.Start(&eventstore.Config{Client: testCRDBClient})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Errorf("unable to start eventstore: %v", err)
 | 
			
		||||
		t.FailNow()
 | 
			
		||||
	}
 | 
			
		||||
	// es := eventstore.NewEventstore(&eventstore.Config{re})
 | 
			
		||||
	es.RegisterFilterEventMapper(UserAddedEventMapper()).
 | 
			
		||||
		RegisterFilterEventMapper(UserFirstNameChangedMapper()).
 | 
			
		||||
		RegisterFilterEventMapper(UserPasswordCheckedMapper()).
 | 
			
		||||
 
 | 
			
		||||
@@ -269,10 +269,11 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "fetch previous fails",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).
 | 
			
		||||
						ExpectFilterEventsError(errFilter),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -301,9 +302,10 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "no successful stmts",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -339,9 +341,10 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "update current sequence fails",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"agg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -381,9 +384,10 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "commit fails",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"agg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -423,9 +427,10 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "correct",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -455,9 +460,10 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "fetch previous stmts no additional stmts",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -486,7 +492,7 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
		{
 | 
			
		||||
			name: "fetch previous stmts additional events",
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
						&repository.Event{
 | 
			
		||||
							AggregateType:             "testAgg",
 | 
			
		||||
@@ -496,6 +502,7 @@ func TestStatementHandler_Update(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
					),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			args: args{
 | 
			
		||||
@@ -617,9 +624,10 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
 | 
			
		||||
				stmtSeq: 6,
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEventsError(errFilter),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			want: want{
 | 
			
		||||
@@ -642,9 +650,10 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
 | 
			
		||||
				stmtSeq: 6,
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			want: want{
 | 
			
		||||
@@ -666,7 +675,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
 | 
			
		||||
				stmtSeq: 10,
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
						&repository.Event{
 | 
			
		||||
							ID:                        "id",
 | 
			
		||||
@@ -690,6 +699,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
					),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			want: want{
 | 
			
		||||
@@ -712,7 +722,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
 | 
			
		||||
				stmtSeq: 10,
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
						&repository.Event{
 | 
			
		||||
							ID:                        "id",
 | 
			
		||||
@@ -726,6 +736,7 @@ func TestProjectionHandler_fetchPreviousStmts(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
					),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				aggregates: []eventstore.AggregateType{"testAgg"},
 | 
			
		||||
			},
 | 
			
		||||
			want: want{
 | 
			
		||||
 
 | 
			
		||||
@@ -68,7 +68,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEvents(),
 | 
			
		||||
						eventstore.TestConfig(es_repo_mock.NewRepo(t).ExpectFilterEvents()),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
@@ -93,7 +93,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
			"process error",
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
							&repository.Event{
 | 
			
		||||
								ID:                        "id",
 | 
			
		||||
@@ -106,6 +106,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
								AggregateType:             "testAgg",
 | 
			
		||||
							},
 | 
			
		||||
						),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
@@ -131,7 +132,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
			"process ok",
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
							&repository.Event{
 | 
			
		||||
								ID:                        "id",
 | 
			
		||||
@@ -144,6 +145,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
								AggregateType:             "testAgg",
 | 
			
		||||
							},
 | 
			
		||||
						),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
@@ -170,7 +172,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
			"process limit exceeded ok",
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).
 | 
			
		||||
							ExpectFilterEvents(
 | 
			
		||||
								&repository.Event{
 | 
			
		||||
@@ -184,6 +186,7 @@ func TestProjectionHandler_Trigger(t *testing.T) {
 | 
			
		||||
									AggregateType:             "testAgg",
 | 
			
		||||
								},
 | 
			
		||||
							).ExpectFilterEvents(),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
@@ -389,9 +392,10 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
 | 
			
		||||
				ctx: context.Background(),
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
					eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
 | 
			
		||||
						AddQuery().
 | 
			
		||||
@@ -415,7 +419,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(),
 | 
			
		||||
					eventstore.TestConfig(es_repo_mock.NewRepo(t).ExpectFilterEvents()),
 | 
			
		||||
				),
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
					eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
 | 
			
		||||
@@ -439,7 +443,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
 | 
			
		||||
				ctx: context.Background(),
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
						&repository.Event{
 | 
			
		||||
							ID:                        "id",
 | 
			
		||||
@@ -463,6 +467,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
					),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
					eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
 | 
			
		||||
						AddQuery().
 | 
			
		||||
@@ -485,7 +490,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
 | 
			
		||||
				ctx: context.Background(),
 | 
			
		||||
			},
 | 
			
		||||
			fields: fields{
 | 
			
		||||
				eventstore: eventstore.NewEventstore(
 | 
			
		||||
				eventstore: eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
					es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
						&repository.Event{
 | 
			
		||||
							ID:                        "id",
 | 
			
		||||
@@ -509,6 +514,7 @@ func TestProjectionHandler_FetchEvents(t *testing.T) {
 | 
			
		||||
						},
 | 
			
		||||
					),
 | 
			
		||||
				),
 | 
			
		||||
				),
 | 
			
		||||
				query: testQuery(
 | 
			
		||||
					eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
 | 
			
		||||
						AddQuery().
 | 
			
		||||
@@ -694,8 +700,9 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEventsError(ErrFilter),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				triggerProjection: time.NewTimer(0),
 | 
			
		||||
@@ -713,7 +720,7 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
							&repository.Event{
 | 
			
		||||
								AggregateType:             "system",
 | 
			
		||||
@@ -723,6 +730,7 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
								Type:                      "system.projections.scheduler.succeeded",
 | 
			
		||||
							}).
 | 
			
		||||
							ExpectInstanceIDsError(ErrFilter),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				triggerProjection: time.NewTimer(0),
 | 
			
		||||
@@ -740,7 +748,7 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
							&repository.Event{
 | 
			
		||||
								AggregateType:             "system",
 | 
			
		||||
@@ -749,6 +757,7 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
								InstanceID:                "",
 | 
			
		||||
								Type:                      "system.projections.scheduler.succeeded",
 | 
			
		||||
							}).ExpectInstanceIDs("instanceID1"),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				triggerProjection: time.NewTimer(0),
 | 
			
		||||
@@ -771,7 +780,7 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
			},
 | 
			
		||||
			fields{
 | 
			
		||||
				eventstore: func(t *testing.T) *eventstore.Eventstore {
 | 
			
		||||
					return eventstore.NewEventstore(
 | 
			
		||||
					return eventstore.NewEventstore(eventstore.TestConfig(
 | 
			
		||||
						es_repo_mock.NewRepo(t).ExpectFilterEvents(
 | 
			
		||||
							&repository.Event{
 | 
			
		||||
								AggregateType:             "system",
 | 
			
		||||
@@ -780,6 +789,7 @@ func TestProjection_schedule(t *testing.T) {
 | 
			
		||||
								InstanceID:                "",
 | 
			
		||||
								Type:                      "system.projections.scheduler.succeeded",
 | 
			
		||||
							}).ExpectInstanceIDs("instanceID1"),
 | 
			
		||||
					),
 | 
			
		||||
					)
 | 
			
		||||
				},
 | 
			
		||||
				triggerProjection: time.NewTimer(0),
 | 
			
		||||
 
 | 
			
		||||
@@ -24,7 +24,7 @@ func eventstoreExpect(t *testing.T, expects ...expect) *eventstore.Eventstore {
 | 
			
		||||
	for _, e := range expects {
 | 
			
		||||
		e(m)
 | 
			
		||||
	}
 | 
			
		||||
	es := eventstore.NewEventstore(m)
 | 
			
		||||
	es := eventstore.NewEventstore(eventstore.TestConfig(m))
 | 
			
		||||
	iam_repo.RegisterEventMappers(es)
 | 
			
		||||
	org.RegisterEventMappers(es)
 | 
			
		||||
	usr_repo.RegisterEventMappers(es)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user