fix: init sub commands (#3218)

* fix(init): add sub commands

* fix(init): admin user in config,
test(init): verify functions

* refactor: config, remove second commands

* refactor: init steps

* chore: fix link in readme

* chore: numerate sql files

* Update cmd/admin/initialise/sql/README.md

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* Update cmd/admin/initialise/sql/README.md

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix(init): remove unused index

* user

* fix database username in defaults.yaml

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Silvan
2022-02-16 13:30:49 +01:00
committed by GitHub
parent 389eb4a27a
commit 4272ea6fe1
22 changed files with 639 additions and 185 deletions

View File

@@ -2,8 +2,8 @@ package initialise
import (
"database/sql"
_ "embed"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/database"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@@ -12,7 +12,19 @@ import (
const (
eventstoreSchema = "eventstore"
projectionsSchema = "projections"
eventsTable = "events"
)
var (
searchEventsTable = "SELECT table_name FROM [SHOW TABLES] WHERE table_name = 'events'"
searchSchema = "SELECT schema_name FROM [SHOW SCHEMAS] WHERE schema_name = $1"
//go:embed sql/06_enable_hash_sharded_indexes.sql
enableHashShardedIdx string
//go:embed sql/07_events_table.sql
createEventsStmt string
//go:embed sql/05_projections.sql
createProjectionsStmt string
//go:embed sql/04_eventstore.sql
createEventstoreStmt string
)
func newZitadel() *cobra.Command {
@@ -29,7 +41,7 @@ Prereqesits:
if err := viper.Unmarshal(config); err != nil {
return err
}
return initialise(config, verifyUser)
return verifyZitadel(config.Database)
},
}
}
@@ -40,93 +52,32 @@ func verifyZitadel(config database.Config) error {
return err
}
if err := verifySchema(db, config, projectionsSchema); err != nil {
if err := verify(db, exists(searchSchema, projectionsSchema), exec(createProjectionsStmt)); err != nil {
return err
}
if err := verifySchema(db, config, eventstoreSchema); err != nil {
if err := verify(db, exists(searchSchema, eventstoreSchema), exec(createEventstoreStmt)); err != nil {
return err
}
if err := verifyEvents(db, config); err != nil {
if err := verify(db, exists(searchSchema, projectionsSchema), createEvents); err != nil {
return err
}
return db.Close()
}
func verifySchema(db *sql.DB, config database.Config, schema string) error {
logging.WithFields("schema", schema).Info("verify schema")
exists, err := existsSchema(db, config, schema)
if exists || err != nil {
return err
}
return createSchema(db, config, schema)
}
func existsSchema(db *sql.DB, config database.Config, schema string) (exists bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT schema_name FROM [SHOW SCHEMAS] WHERE schema_name = $1)", schema)
err = row.Scan(&exists)
return exists, err
}
func createSchema(db *sql.DB, config database.Config, schema string) error {
_, err := db.Exec("CREATE SCHEMA " + schema)
return err
}
func verifyEvents(db *sql.DB, config database.Config) error {
logging.Info("verify events table")
exists, err := existsEvents(db, config)
if exists || err != nil {
return err
}
return createEvents(db, config)
}
func existsEvents(db *sql.DB, config database.Config) (exists bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT table_name FROM [SHOW TABLES] WHERE table_name = $1)", eventsTable)
err = row.Scan(&exists)
return exists, err
}
func createEvents(db *sql.DB, config database.Config) error {
func createEvents(db *sql.DB) error {
tx, err := db.Begin()
if err != nil {
return err
}
if _, err = tx.Exec("SET experimental_enable_hash_sharded_indexes = on"); err != nil {
if _, err = tx.Exec(enableHashShardedIdx); err != nil {
tx.Rollback()
return err
}
stmt := `CREATE TABLE eventstore.events (
id UUID DEFAULT gen_random_uuid()
, event_type TEXT NOT NULL
, aggregate_type TEXT NOT NULL
, aggregate_id TEXT NOT NULL
, aggregate_version TEXT NOT NULL
, event_sequence BIGINT NOT NULL
, previous_aggregate_sequence BIGINT
, previous_aggregate_type_sequence INT8
, creation_date TIMESTAMPTZ NOT NULL DEFAULT now()
, event_data JSONB
, editor_user TEXT NOT NULL
, editor_service TEXT NOT NULL
, resource_owner TEXT NOT NULL
, PRIMARY KEY (event_sequence DESC) USING HASH WITH BUCKET_COUNT = 10
, INDEX agg_type_agg_id (aggregate_type, aggregate_id)
, INDEX agg_type (aggregate_type)
, INDEX agg_type_seq (aggregate_type, event_sequence DESC)
STORING (id, event_type, aggregate_id, aggregate_version, previous_aggregate_sequence, creation_date, event_data, editor_user, editor_service, resource_owner, previous_aggregate_type_sequence)
, INDEX changes_idx (aggregate_type, aggregate_id, creation_date) USING HASH WITH BUCKET_COUNT = 10
, INDEX max_sequence (aggregate_type, aggregate_id, event_sequence DESC)
, CONSTRAINT previous_sequence_unique UNIQUE (previous_aggregate_sequence DESC)
, CONSTRAINT prev_agg_type_seq_unique UNIQUE(previous_aggregate_type_sequence)
)`
if _, err = tx.Exec(stmt); err != nil {
if _, err = tx.Exec(createEventsStmt); err != nil {
tx.Rollback()
return err
}