fix(init): prepare database (#3191)

* fix(init): prepare database

* fix(defaults): cockroach local defaults
This commit is contained in:
Silvan
2022-02-11 11:02:47 +01:00
committed by GitHub
parent 9d471d0d30
commit e8ab237ada
13 changed files with 375 additions and 48 deletions

View File

@@ -0,0 +1,7 @@
package initialise
import "github.com/caos/zitadel/internal/database"
type Config struct {
Database database.Config
}

View File

@@ -2,21 +2,53 @@ package initialise
import (
_ "embed"
"fmt"
"github.com/caos/logging"
"github.com/spf13/cobra"
"github.com/spf13/viper"
//sql import
_ "github.com/lib/pq"
)
var (
conn string
)
func New() *cobra.Command {
return &cobra.Command{
cmd := &cobra.Command{
Use: "init",
Short: "initialize ZITADEL instance",
Long: `init sets up the minimum requirements to start ZITADEL.
Prereqesits:
- cockroachdb`,
RunE: func(cmd *cobra.Command, args []string) error {
logging.New().Info("hello world")
return nil
config := new(Config)
if err := viper.Unmarshal(config); err != nil {
return err
}
return initialise(config)
},
}
// cmd.PersistentFlags().StringArrayVar(&configFiles, "config", nil, "path to config file to overwrite system defaults")
//TODO(hust): simplify to multiple flags
cmd.PersistentFlags().StringVar(&conn, "connection", "", "connection string to connect with a user which is allowed to create the database and user")
return cmd
}
func initialise(config *Config) error {
logging.Info("initialization started")
if conn == "" {
return fmt.Errorf("connection not defined")
}
if err := prepareDB(config.Database); err != nil {
return err
}
return prepareZitadel(config.Database)
}

View File

@@ -0,0 +1,87 @@
package initialise
import (
"database/sql"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/database"
)
func prepareDB(config database.Config) error {
db, err := sql.Open("postgres", conn)
if err != nil {
return err
}
logging.Info("verify user")
if err = verifyUser(db, config); err != nil {
return err
}
logging.Info("verify database")
if err = verifyDB(db, config); err != nil {
return err
}
logging.Info("verify grant")
if err = verifyGrant(db, config); err != nil {
return err
}
return db.Close()
}
func verifyUser(db *sql.DB, config database.Config) error {
exists, err := existsUser(db, config)
if exists || err != nil {
return err
}
return createUser(db, config)
}
func existsUser(db *sql.DB, config database.Config) (exists bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT username FROM [show roles] WHERE username = $1)", config.User)
err = row.Scan(&exists)
return exists, err
}
func createUser(db *sql.DB, config database.Config) error {
_, err := db.Exec("CREATE USER $1 WITH PASSWORD $2", config.User, &sql.NullString{String: config.Password, Valid: config.Password != ""})
return err
}
func verifyDB(db *sql.DB, config database.Config) error {
exists, err := existsDatabase(db, config)
if exists || err != nil {
return err
}
return createDatabase(db, config)
}
func existsDatabase(db *sql.DB, config database.Config) (exists bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT database_name FROM [show databases] WHERE database_name = $1)", config.Database)
err = row.Scan(&exists)
return exists, err
}
func createDatabase(db *sql.DB, config database.Config) error {
_, err := db.Exec("CREATE DATABASE " + config.Database)
return err
}
func verifyGrant(db *sql.DB, config database.Config) error {
exists, err := hasGrant(db, config)
if exists || err != nil {
return err
}
return grant(db, config)
}
func hasGrant(db *sql.DB, config database.Config) (has bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT * FROM [SHOW GRANTS ON DATABASE "+config.Database+"] where grantee = $1 AND privilege_type = 'ALL')", config.User)
err = row.Scan(&has)
return has, err
}
func grant(db *sql.DB, config database.Config) error {
_, err := db.Exec("GRANT ALL ON DATABASE " + config.Database + " TO " + config.User)
return err
}

View File

@@ -0,0 +1,114 @@
package initialise
import (
"database/sql"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/database"
)
const (
eventstoreSchema = "eventstore"
projectionsSchema = "projections"
eventsTable = "events"
)
func prepareZitadel(config database.Config) error {
db, err := database.Connect(config)
if err != nil {
return err
}
logging.Info("verify projections schema")
if err := verifySchema(db, config, projectionsSchema); err != nil {
return err
}
logging.Info("verify eventstore schema")
if err := verifySchema(db, config, eventstoreSchema); err != nil {
return err
}
logging.Info("verify events table")
if err := verifyEvents(db, config); err != nil {
return err
}
return db.Close()
}
func verifySchema(db *sql.DB, config database.Config, schema string) error {
exists, err := existsSchema(db, config, schema)
if exists || err != nil {
return err
}
return createSchema(db, config, schema)
}
func existsSchema(db *sql.DB, config database.Config, schema string) (exists bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT schema_name FROM [SHOW SCHEMAS] WHERE schema_name = $1)", schema)
err = row.Scan(&exists)
return exists, err
}
func createSchema(db *sql.DB, config database.Config, schema string) error {
_, err := db.Exec("CREATE SCHEMA " + schema)
return err
}
func verifyEvents(db *sql.DB, config database.Config) error {
exists, err := existsEvents(db, config)
if exists || err != nil {
return err
}
return createEvents(db, config)
}
func existsEvents(db *sql.DB, config database.Config) (exists bool, err error) {
row := db.QueryRow("SELECT EXISTS(SELECT table_name FROM [SHOW TABLES] WHERE table_name = $1)", eventsTable)
err = row.Scan(&exists)
return exists, err
}
func createEvents(db *sql.DB, config database.Config) error {
tx, err := db.Begin()
if err != nil {
return err
}
if _, err = tx.Exec("SET experimental_enable_hash_sharded_indexes = on"); err != nil {
tx.Rollback()
return err
}
stmt := `CREATE TABLE eventstore.events (
id UUID DEFAULT gen_random_uuid()
, event_type TEXT NOT NULL
, aggregate_type TEXT NOT NULL
, aggregate_id TEXT NOT NULL
, aggregate_version TEXT NOT NULL
, event_sequence BIGINT NOT NULL
, previous_aggregate_sequence BIGINT
, previous_aggregate_type_sequence INT8
, creation_date TIMESTAMPTZ NOT NULL DEFAULT now()
, event_data JSONB
, editor_user TEXT NOT NULL
, editor_service TEXT NOT NULL
, resource_owner TEXT NOT NULL
, PRIMARY KEY (event_sequence DESC) USING HASH WITH BUCKET_COUNT = 10
, INDEX agg_type_agg_id (aggregate_type, aggregate_id)
, INDEX agg_type (aggregate_type)
, INDEX agg_type_seq (aggregate_type, event_sequence DESC)
STORING (id, event_type, aggregate_id, aggregate_version, previous_aggregate_sequence, creation_date, event_data, editor_user, editor_service, resource_owner, previous_aggregate_type_sequence)
, INDEX changes_idx (aggregate_type, aggregate_id, creation_date) USING HASH WITH BUCKET_COUNT = 10
, INDEX max_sequence (aggregate_type, aggregate_id, event_sequence DESC)
, CONSTRAINT previous_sequence_unique UNIQUE (previous_aggregate_sequence DESC)
, CONSTRAINT prev_agg_type_seq_unique UNIQUE(previous_aggregate_type_sequence)
)`
if _, err = tx.Exec(stmt); err != nil {
tx.Rollback()
return err
}
return tx.Commit()
}

View File

@@ -15,7 +15,7 @@ func New() *cobra.Command {
Requirements:
- cockroachdb`,
RunE: func(cmd *cobra.Command, args []string) error {
logging.New().Info("hello world")
logging.Info("hello world")
return nil
},
}

View File

@@ -15,7 +15,7 @@ func New() *cobra.Command {
Requirements:
- cockroachdb`,
RunE: func(cmd *cobra.Command, args []string) error {
logging.New().Info("hello world")
logging.Info("hello world")
logging.WithFields("field", 1).Info("hello world")
return nil
},

View File

@@ -1,4 +1,20 @@
Log:
Level: debug
Formatter:
Format: text
Format: text
database:
host: localhost
port: 26257
user: zitadel
database: zitadel
password:
maxOpenConns: 3
ssl:
mode: disable
rootCert:
cert:
key:
options:
# MaxConnLifetime: 30m
# MaxConnIdleTime: 30m