mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-12 11:04:25 +00:00
b5564572bc
This implementation increases parallel write capabilities of the eventstore. Please have a look at the technical advisories: [05](https://zitadel.com/docs/support/advisory/a10005) and [06](https://zitadel.com/docs/support/advisory/a10006). The implementation of eventstore.push is rewritten and stored events are migrated to a new table `eventstore.events2`. If you are using cockroach: make sure that the database user of ZITADEL has `VIEWACTIVITY` grant. This is used to query events.
242 lines
6.2 KiB
Go
242 lines
6.2 KiB
Go
package eventstore_test
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"os"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/cockroachdb/cockroach-go/v2/testserver"
|
|
"github.com/zitadel/logging"
|
|
|
|
"github.com/zitadel/zitadel/cmd/initialise"
|
|
"github.com/zitadel/zitadel/internal/database"
|
|
"github.com/zitadel/zitadel/internal/database/cockroach"
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
es_sql "github.com/zitadel/zitadel/internal/eventstore/repository/sql"
|
|
new_es "github.com/zitadel/zitadel/internal/eventstore/v3"
|
|
)
|
|
|
|
var (
|
|
testCRDBClient *database.DB
|
|
queriers map[string]eventstore.Querier = make(map[string]eventstore.Querier)
|
|
pushers map[string]eventstore.Pusher = make(map[string]eventstore.Pusher)
|
|
clients map[string]*database.DB = make(map[string]*database.DB)
|
|
)
|
|
|
|
func TestMain(m *testing.M) {
|
|
ts, err := testserver.NewTestServer()
|
|
if err != nil {
|
|
logging.WithFields("error", err).Fatal("unable to start db")
|
|
}
|
|
|
|
testCRDBClient = &database.DB{
|
|
Database: new(testDB),
|
|
}
|
|
testCRDBClient.DB, err = sql.Open("postgres", ts.PGURL().String())
|
|
if err != nil {
|
|
logging.WithFields("error", err).Fatal("unable to connect to db")
|
|
}
|
|
if err = testCRDBClient.Ping(); err != nil {
|
|
logging.WithFields("error", err).Fatal("unable to ping db")
|
|
}
|
|
|
|
v2 := &es_sql.CRDB{DB: testCRDBClient}
|
|
queriers["v2(inmemory)"] = v2
|
|
clients["v2(inmemory)"] = testCRDBClient
|
|
|
|
pushers["v3(inmemory)"] = new_es.NewEventstore(testCRDBClient)
|
|
clients["v3(inmemory)"] = testCRDBClient
|
|
|
|
if localDB, err := connectLocalhost(); err == nil {
|
|
if err = initDB(localDB); err != nil {
|
|
logging.WithFields("error", err).Fatal("migrations failed")
|
|
}
|
|
pushers["v3(singlenode)"] = new_es.NewEventstore(localDB)
|
|
clients["v3(singlenode)"] = localDB
|
|
}
|
|
|
|
// pushers["v2(inmemory)"] = v2
|
|
|
|
defer func() {
|
|
testCRDBClient.Close()
|
|
ts.Stop()
|
|
}()
|
|
|
|
if err = initDB(testCRDBClient); err != nil {
|
|
logging.WithFields("error", err).Fatal("migrations failed")
|
|
}
|
|
|
|
os.Exit(m.Run())
|
|
}
|
|
|
|
func initDB(db *database.DB) error {
|
|
initialise.ReadStmts("cockroach")
|
|
config := new(database.Config)
|
|
config.SetConnector(&cockroach.Config{
|
|
User: cockroach.User{
|
|
Username: "zitadel",
|
|
},
|
|
Database: "zitadel",
|
|
})
|
|
err := initialise.Init(db,
|
|
initialise.VerifyUser(config.Username(), ""),
|
|
initialise.VerifyDatabase(config.DatabaseName()),
|
|
initialise.VerifyGrant(config.DatabaseName(), config.Username()))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = initialise.VerifyZitadel(db, *config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// create old events
|
|
_, err = db.Exec(oldEventsTable)
|
|
return err
|
|
}
|
|
|
|
func connectLocalhost() (*database.DB, error) {
|
|
client, err := sql.Open("pgx", "postgresql://root@localhost:26257/defaultdb?sslmode=disable")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err = client.Ping(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &database.DB{
|
|
DB: client,
|
|
Database: new(testDB),
|
|
}, nil
|
|
}
|
|
|
|
type testDB struct{}
|
|
|
|
func (_ *testDB) Timetravel(time.Duration) string { return " AS OF SYSTEM TIME '-1 ms' " }
|
|
|
|
func (*testDB) DatabaseName() string { return "db" }
|
|
|
|
func (*testDB) Username() string { return "user" }
|
|
|
|
func (*testDB) Type() string { return "cockroach" }
|
|
|
|
func generateCommand(aggregateType eventstore.AggregateType, aggregateID string, opts ...func(*testEvent)) eventstore.Command {
|
|
e := &testEvent{
|
|
BaseEvent: eventstore.BaseEvent{
|
|
Agg: &eventstore.Aggregate{
|
|
ID: aggregateID,
|
|
Type: aggregateType,
|
|
ResourceOwner: "ro",
|
|
Version: "v1",
|
|
},
|
|
Service: "svc",
|
|
EventType: "test.created",
|
|
},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(e)
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
type testEvent struct {
|
|
eventstore.BaseEvent
|
|
uniqueConstraints []*eventstore.UniqueConstraint
|
|
}
|
|
|
|
func (e *testEvent) Payload() any {
|
|
return e.BaseEvent.Data
|
|
}
|
|
|
|
func (e *testEvent) UniqueConstraints() []*eventstore.UniqueConstraint {
|
|
return e.uniqueConstraints
|
|
}
|
|
|
|
func canceledCtx() context.Context {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
cancel()
|
|
return ctx
|
|
}
|
|
|
|
func fillUniqueData(unique_type, field, instanceID string) error {
|
|
_, err := testCRDBClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field, instance_id) VALUES ($1, $2, $3)", unique_type, field, instanceID)
|
|
return err
|
|
}
|
|
|
|
func generateAddUniqueConstraint(table, uniqueField string) func(e *testEvent) {
|
|
return func(e *testEvent) {
|
|
e.uniqueConstraints = append(e.uniqueConstraints,
|
|
&eventstore.UniqueConstraint{
|
|
UniqueType: table,
|
|
UniqueField: uniqueField,
|
|
Action: eventstore.UniqueConstraintAdd,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
func generateRemoveUniqueConstraint(table, uniqueField string) func(e *testEvent) {
|
|
return func(e *testEvent) {
|
|
e.uniqueConstraints = append(e.uniqueConstraints,
|
|
&eventstore.UniqueConstraint{
|
|
UniqueType: table,
|
|
UniqueField: uniqueField,
|
|
Action: eventstore.UniqueConstraintRemove,
|
|
},
|
|
)
|
|
}
|
|
}
|
|
|
|
func withTestData(data any) func(e *testEvent) {
|
|
return func(e *testEvent) {
|
|
d, err := json.Marshal(data)
|
|
if err != nil {
|
|
panic("marshal data failed")
|
|
}
|
|
e.BaseEvent.Data = d
|
|
}
|
|
}
|
|
|
|
func cleanupEventstore(client *database.DB) func() {
|
|
return func() {
|
|
_, err := client.Exec("TRUNCATE eventstore.events")
|
|
if err != nil {
|
|
logging.Warnf("unable to truncate events: %v", err)
|
|
}
|
|
_, err = client.Exec("TRUNCATE eventstore.events2")
|
|
if err != nil {
|
|
logging.Warnf("unable to truncate events: %v", err)
|
|
}
|
|
_, err = client.Exec("TRUNCATE eventstore.unique_constraints")
|
|
if err != nil {
|
|
logging.Warnf("unable to truncate unique constraints: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
const oldEventsTable = `CREATE TABLE IF NOT EXISTS eventstore.events (
|
|
id UUID DEFAULT gen_random_uuid()
|
|
, event_type TEXT NOT NULL
|
|
, aggregate_type TEXT NOT NULL
|
|
, aggregate_id TEXT NOT NULL
|
|
, aggregate_version TEXT NOT NULL
|
|
, event_sequence BIGINT NOT NULL
|
|
, previous_aggregate_sequence BIGINT
|
|
, previous_aggregate_type_sequence INT8
|
|
, creation_date TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
, created_at TIMESTAMPTZ NOT NULL DEFAULT clock_timestamp()
|
|
, event_data JSONB
|
|
, editor_user TEXT NOT NULL
|
|
, editor_service TEXT
|
|
, resource_owner TEXT NOT NULL
|
|
, instance_id TEXT NOT NULL
|
|
, "position" DECIMAL NOT NULL
|
|
, in_tx_order INTEGER NOT NULL
|
|
|
|
, PRIMARY KEY (instance_id, aggregate_type, aggregate_id, event_sequence DESC)
|
|
);`
|