zitadel/internal/eventstore/local_crdb_test.go

268 lines
7.2 KiB
Go
Raw Normal View History

2020-10-23 16:16:46 +02:00
package eventstore_test
import (
"context"
"encoding/json"
2020-10-23 16:16:46 +02:00
"os"
"testing"
"time"
2020-10-23 16:16:46 +02:00
"github.com/cockroachdb/cockroach-go/v2/testserver"
pgxdecimal "github.com/jackc/pgx-shopspring-decimal"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgx/v5/stdlib"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/cmd/initialise"
feat(database): support for postgres (#3998) * beginning with postgres statements * try pgx * use pgx * database * init works for postgres * arrays working * init for cockroach * init * start tests * tests * TESTS * ch * ch * chore: use go 1.18 * read stmts * fix typo * tests * connection string * add missing error handler * cleanup * start all apis * go mod tidy * old update * switch back to minute * on conflict * replace string slice with `database.StringArray` in db models * fix tests and start * update go version in dockerfile * setup go * clean up * remove notification migration * update * docs: add deploy guide for postgres * fix: revert sonyflake * use `database.StringArray` for daos * use `database.StringArray` every where * new tables * index naming, metadata primary key, project grant role key type * docs(postgres): change to beta * chore: correct compose * fix(defaults): add empty postgres config * refactor: remove unused code * docs: add postgres to self hosted * fix broken link * so? * change title * add mdx to link * fix stmt * update goreleaser in test-code * docs: improve postgres example * update more projections * fix: add beta log for postgres * revert index name change * prerelease * fix: add sequence to v1 "reduce paniced" * log if nil * add logging * fix: log output * fix(import): check if org exists and user * refactor: imports * fix(user): ignore malformed events * refactor: method naming * fix: test * refactor: correct errors.Is call * ci: don't build dev binaries on main * fix(go releaser): update version to 1.11.0 * fix(user): projection should not break * fix(user): handle error properly * docs: correct config example * Update .releaserc.js * Update .releaserc.js Co-authored-by: Livio Amstutz <livio.a@gmail.com> Co-authored-by: Elio Bischof <eliobischof@gmail.com>
2022-08-31 09:52:43 +02:00
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/database/cockroach"
"github.com/zitadel/zitadel/internal/eventstore"
es_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
2020-10-23 16:16:46 +02:00
)
var (
testCRDBClient *database.DB
queriers map[string]eventstore.Querier = make(map[string]eventstore.Querier)
pushers map[string]eventstore.Pusher = make(map[string]eventstore.Pusher)
clients map[string]*database.DB = make(map[string]*database.DB)
2020-10-23 16:16:46 +02:00
)
func TestMain(m *testing.M) {
opts := make([]testserver.TestServerOpt, 0, 1)
if version := os.Getenv("ZITADEL_CRDB_VERSION"); version != "" {
opts = append(opts, testserver.CustomVersionOpt(version))
}
ts, err := testserver.NewTestServer(opts...)
2020-10-23 16:16:46 +02:00
if err != nil {
logging.WithFields("error", err).Fatal("unable to start db")
2020-10-23 16:16:46 +02:00
}
testCRDBClient = &database.DB{
Database: new(testDB),
}
config, err := pgxpool.ParseConfig(ts.PGURL().String())
2020-10-23 16:16:46 +02:00
if err != nil {
logging.WithFields("error", err).Fatal("unable to parse db config")
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
logging.WithFields("error", err).Fatal("unable to create db pool")
}
testCRDBClient.DB = stdlib.OpenDBFromPool(pool)
if err = testCRDBClient.Ping(); err != nil {
logging.WithFields("error", err).Fatal("unable to ping db")
2020-10-23 16:16:46 +02:00
}
v2 := &es_sql.CRDB{DB: testCRDBClient}
queriers["v2(inmemory)"] = v2
clients["v2(inmemory)"] = testCRDBClient
pushers["v3(inmemory)"] = new_es.NewEventstore(testCRDBClient)
clients["v3(inmemory)"] = testCRDBClient
if localDB, err := connectLocalhost(); err == nil {
if err = initDB(localDB); err != nil {
logging.WithFields("error", err).Fatal("migrations failed")
}
pushers["v3(singlenode)"] = new_es.NewEventstore(localDB)
clients["v3(singlenode)"] = localDB
}
// pushers["v2(inmemory)"] = v2
2020-10-23 16:16:46 +02:00
defer func() {
testCRDBClient.Close()
ts.Stop()
}()
if err = initDB(testCRDBClient); err != nil {
logging.WithFields("error", err).Fatal("migrations failed")
2020-10-23 16:16:46 +02:00
}
os.Exit(m.Run())
}
func initDB(db *database.DB) error {
feat(database): support for postgres (#3998) * beginning with postgres statements * try pgx * use pgx * database * init works for postgres * arrays working * init for cockroach * init * start tests * tests * TESTS * ch * ch * chore: use go 1.18 * read stmts * fix typo * tests * connection string * add missing error handler * cleanup * start all apis * go mod tidy * old update * switch back to minute * on conflict * replace string slice with `database.StringArray` in db models * fix tests and start * update go version in dockerfile * setup go * clean up * remove notification migration * update * docs: add deploy guide for postgres * fix: revert sonyflake * use `database.StringArray` for daos * use `database.StringArray` every where * new tables * index naming, metadata primary key, project grant role key type * docs(postgres): change to beta * chore: correct compose * fix(defaults): add empty postgres config * refactor: remove unused code * docs: add postgres to self hosted * fix broken link * so? * change title * add mdx to link * fix stmt * update goreleaser in test-code * docs: improve postgres example * update more projections * fix: add beta log for postgres * revert index name change * prerelease * fix: add sequence to v1 "reduce paniced" * log if nil * add logging * fix: log output * fix(import): check if org exists and user * refactor: imports * fix(user): ignore malformed events * refactor: method naming * fix: test * refactor: correct errors.Is call * ci: don't build dev binaries on main * fix(go releaser): update version to 1.11.0 * fix(user): projection should not break * fix(user): handle error properly * docs: correct config example * Update .releaserc.js * Update .releaserc.js Co-authored-by: Livio Amstutz <livio.a@gmail.com> Co-authored-by: Elio Bischof <eliobischof@gmail.com>
2022-08-31 09:52:43 +02:00
initialise.ReadStmts("cockroach")
config := new(database.Config)
config.SetConnector(&cockroach.Config{
User: cockroach.User{
Username: "zitadel",
},
Database: "zitadel",
})
err := initialise.Init(db,
initialise.VerifyUser(config.Username(), ""),
initialise.VerifyDatabase(config.DatabaseName()),
initialise.VerifyGrant(config.DatabaseName(), config.Username()),
initialise.VerifySettings(config.DatabaseName(), config.Username()))
2020-10-23 16:16:46 +02:00
if err != nil {
return err
}
err = initialise.VerifyZitadel(context.Background(), db, *config)
if err != nil {
return err
}
// create old events
_, err = db.Exec(oldEventsTable)
return err
}
func connectLocalhost() (*database.DB, error) {
config, err := pgxpool.ParseConfig("postgresql://root@localhost:26257/defaultdb?sslmode=disable")
if err != nil {
return nil, err
}
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
pgxdecimal.Register(conn.TypeMap())
return nil
}
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
return nil, err
}
client := stdlib.OpenDBFromPool(pool)
if err = client.Ping(); err != nil {
return nil, err
}
return &database.DB{
DB: client,
Database: new(testDB),
}, nil
2020-10-23 16:16:46 +02:00
}
type testDB struct{}
func (_ *testDB) Timetravel(time.Duration) string { return " AS OF SYSTEM TIME '-1 ms' " }
func (*testDB) DatabaseName() string { return "db" }
func (*testDB) Username() string { return "user" }
func (*testDB) Type() string { return "cockroach" }
func generateCommand(aggregateType eventstore.AggregateType, aggregateID string, opts ...func(*testEvent)) eventstore.Command {
e := &testEvent{
BaseEvent: eventstore.BaseEvent{
Agg: &eventstore.Aggregate{
ID: aggregateID,
Type: aggregateType,
ResourceOwner: "ro",
Version: "v1",
},
Service: "svc",
EventType: "test.created",
},
}
for _, opt := range opts {
opt(e)
}
return e
}
type testEvent struct {
eventstore.BaseEvent
uniqueConstraints []*eventstore.UniqueConstraint
}
func (e *testEvent) Payload() any {
return e.BaseEvent.Data
}
func (e *testEvent) UniqueConstraints() []*eventstore.UniqueConstraint {
return e.uniqueConstraints
}
func canceledCtx() context.Context {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}
func fillUniqueData(unique_type, field, instanceID string) error {
_, err := testCRDBClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field, instance_id) VALUES ($1, $2, $3)", unique_type, field, instanceID)
return err
}
func generateAddUniqueConstraint(table, uniqueField string) func(e *testEvent) {
return func(e *testEvent) {
e.uniqueConstraints = append(e.uniqueConstraints,
&eventstore.UniqueConstraint{
UniqueType: table,
UniqueField: uniqueField,
Action: eventstore.UniqueConstraintAdd,
},
)
}
}
func generateRemoveUniqueConstraint(table, uniqueField string) func(e *testEvent) {
return func(e *testEvent) {
e.uniqueConstraints = append(e.uniqueConstraints,
&eventstore.UniqueConstraint{
UniqueType: table,
UniqueField: uniqueField,
Action: eventstore.UniqueConstraintRemove,
},
)
}
}
func withTestData(data any) func(e *testEvent) {
return func(e *testEvent) {
d, err := json.Marshal(data)
if err != nil {
panic("marshal data failed")
}
e.BaseEvent.Data = d
}
}
func cleanupEventstore(client *database.DB) func() {
return func() {
_, err := client.Exec("TRUNCATE eventstore.events")
if err != nil {
logging.Warnf("unable to truncate events: %v", err)
}
_, err = client.Exec("TRUNCATE eventstore.events2")
if err != nil {
logging.Warnf("unable to truncate events: %v", err)
}
_, err = client.Exec("TRUNCATE eventstore.unique_constraints")
if err != nil {
logging.Warnf("unable to truncate unique constraints: %v", err)
}
}
}
const oldEventsTable = `CREATE TABLE IF NOT EXISTS eventstore.events (
id UUID DEFAULT gen_random_uuid()
, event_type TEXT NOT NULL
, aggregate_type TEXT NOT NULL
, aggregate_id TEXT NOT NULL
, aggregate_version TEXT NOT NULL
, event_sequence BIGINT NOT NULL
, previous_aggregate_sequence BIGINT
, previous_aggregate_type_sequence INT8
, creation_date TIMESTAMPTZ NOT NULL DEFAULT now()
, created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp()
, event_data JSONB
, editor_user TEXT NOT NULL
, editor_service TEXT
, resource_owner TEXT NOT NULL
, instance_id TEXT NOT NULL
, "position" DECIMAL NOT NULL
, in_tx_order INTEGER NOT NULL
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, event_sequence DESC)
);`