mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-04 23:45:07 +00:00
fix(setup): optimise events indexes (#5316)
This commit is contained in:
parent
48f9815b7c
commit
3dbb6f7c67
@ -1,36 +1,23 @@
|
||||
package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"embed"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 04/cockroach/index.sql
|
||||
//go:embed 04/postgres/index.sql
|
||||
stmts embed.FS
|
||||
stmts04 embed.FS
|
||||
)
|
||||
|
||||
type EventstoreIndexes struct {
|
||||
dbClient *sql.DB
|
||||
dbType string
|
||||
}
|
||||
|
||||
func (mig *EventstoreIndexes) Execute(ctx context.Context) error {
|
||||
stmt, err := readStmt(stmts, "04", mig.dbType, "index.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
func New04(db *database.DB) *EventstoreIndexesNew {
|
||||
return &EventstoreIndexesNew{
|
||||
dbClient: db,
|
||||
name: "04_eventstore_indexes",
|
||||
step: "04",
|
||||
fileName: "index.sql",
|
||||
stmts: stmts04,
|
||||
}
|
||||
_, err = mig.dbClient.ExecContext(ctx, stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mig *EventstoreIndexes) String() string {
|
||||
return "04_eventstore_indexes"
|
||||
}
|
||||
|
||||
func readStmt(fs embed.FS, folder, typ, filename string) (string, error) {
|
||||
stmt, err := fs.ReadFile(folder + "/" + typ + "/" + filename)
|
||||
return string(stmt), err
|
||||
}
|
||||
|
@ -2,21 +2,27 @@ package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"embed"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 08.sql
|
||||
tokenIndexes08 string
|
||||
//go:embed 08/cockroach/08.sql
|
||||
//go:embed 08/postgres/08.sql
|
||||
tokenIndexes08 embed.FS
|
||||
)
|
||||
|
||||
type AuthTokenIndexes struct {
|
||||
dbClient *sql.DB
|
||||
dbClient *database.DB
|
||||
}
|
||||
|
||||
func (mig *AuthTokenIndexes) Execute(ctx context.Context) error {
|
||||
_, err := mig.dbClient.ExecContext(ctx, tokenIndexes08)
|
||||
stmt, err := readStmt(tokenIndexes08, "08", mig.dbClient.Type(), "08.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = mig.dbClient.ExecContext(ctx, stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
|
5
cmd/setup/08/postgres/08.sql
Normal file
5
cmd/setup/08/postgres/08.sql
Normal file
@ -0,0 +1,5 @@
|
||||
CREATE INDEX IF NOT EXISTS inst_refresh_tkn_idx ON auth.tokens(instance_id, refresh_token_id);
|
||||
CREATE INDEX IF NOT EXISTS inst_app_tkn_idx ON auth.tokens(instance_id, application_id);
|
||||
CREATE INDEX IF NOT EXISTS inst_ro_tkn_idx ON auth.tokens(instance_id, resource_owner);
|
||||
DROP INDEX IF EXISTS auth.user_user_agent_idx;
|
||||
CREATE INDEX IF NOT EXISTS inst_usr_agnt_tkn_idx ON auth.tokens(instance_id, user_id, user_agent_id);
|
23
cmd/setup/09.go
Normal file
23
cmd/setup/09.go
Normal file
@ -0,0 +1,23 @@
|
||||
package setup
|
||||
|
||||
import (
|
||||
"embed"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
)
|
||||
|
||||
var (
|
||||
//go:embed 09/cockroach/index.sql
|
||||
//go:embed 09/postgres/index.sql
|
||||
stmts09 embed.FS
|
||||
)
|
||||
|
||||
func New09(db *database.DB) *EventstoreIndexesNew {
|
||||
return &EventstoreIndexesNew{
|
||||
dbClient: db,
|
||||
name: "09_optimise_indexes",
|
||||
step: "09",
|
||||
fileName: "index.sql",
|
||||
stmts: stmts09,
|
||||
}
|
||||
}
|
65
cmd/setup/09/cockroach/index.sql
Normal file
65
cmd/setup/09/cockroach/index.sql
Normal file
@ -0,0 +1,65 @@
|
||||
-- replace agg_type_agg_id
|
||||
BEGIN;
|
||||
DROP INDEX eventstore.events@agg_type_agg_id;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE INDEX agg_type_agg_id ON eventstore.events (
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
) STORING (
|
||||
event_type
|
||||
, aggregate_version
|
||||
, previous_aggregate_sequence
|
||||
, previous_aggregate_type_sequence
|
||||
, creation_date
|
||||
, event_data
|
||||
, editor_user
|
||||
, editor_service
|
||||
, resource_owner
|
||||
);
|
||||
COMMIT;
|
||||
|
||||
-- replace agg_type
|
||||
BEGIN;
|
||||
DROP INDEX eventstore.events@agg_type;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE INDEX agg_type ON eventstore.events (
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, event_sequence
|
||||
) STORING (
|
||||
event_type
|
||||
, aggregate_id
|
||||
, aggregate_version
|
||||
, previous_aggregate_sequence
|
||||
, previous_aggregate_type_sequence
|
||||
, creation_date
|
||||
, event_data
|
||||
, editor_user
|
||||
, editor_service
|
||||
, resource_owner
|
||||
);
|
||||
COMMIT;
|
||||
|
||||
-- drop unused index
|
||||
BEGIN;
|
||||
DROP INDEX IF EXISTS eventstore.events@agg_type_seq;
|
||||
COMMIT;
|
||||
|
||||
-- index to search event payload
|
||||
BEGIN;
|
||||
DROP INDEX IF EXISTS eventstore.events@event_search;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE INVERTED INDEX event_search ON eventstore.events (
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, event_type
|
||||
, event_data
|
||||
);
|
||||
COMMIT;
|
39
cmd/setup/09/postgres/index.sql
Normal file
39
cmd/setup/09/postgres/index.sql
Normal file
@ -0,0 +1,39 @@
|
||||
-- replace agg_type_agg_id
|
||||
BEGIN;
|
||||
DROP INDEX IF EXISTS eventstore.agg_type_agg_id;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE INDEX agg_type_agg_id ON eventstore.events (
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, aggregate_id
|
||||
);
|
||||
COMMIT;
|
||||
|
||||
-- replace agg_type
|
||||
BEGIN;
|
||||
DROP INDEX IF EXISTS eventstore.agg_type;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE INDEX agg_type ON eventstore.events (
|
||||
instance_id
|
||||
, aggregate_type
|
||||
, event_sequence
|
||||
);
|
||||
COMMIT;
|
||||
|
||||
-- drop unused index
|
||||
BEGIN;
|
||||
DROP INDEX IF EXISTS eventstore.agg_type_seq;
|
||||
COMMIT;
|
||||
|
||||
-- index to search event payload
|
||||
BEGIN;
|
||||
DROP INDEX IF EXISTS eventstore.event_search;
|
||||
COMMIT;
|
||||
|
||||
BEGIN;
|
||||
CREATE INDEX event_search ON eventstore.events USING gin (event_data);
|
||||
COMMIT;
|
@ -59,11 +59,12 @@ type Steps struct {
|
||||
s1ProjectionTable *ProjectionTable
|
||||
s2AssetsTable *AssetTable
|
||||
FirstInstance *FirstInstance
|
||||
s4EventstoreIndexes *EventstoreIndexes
|
||||
s4EventstoreIndexes *EventstoreIndexesNew
|
||||
s5LastFailed *LastFailed
|
||||
s6OwnerRemoveColumns *OwnerRemoveColumns
|
||||
s7LogstoreTables *LogstoreTables
|
||||
s8AuthTokens *AuthTokenIndexes
|
||||
s9EventstoreIndexes2 *EventstoreIndexesNew
|
||||
}
|
||||
|
||||
type encryptionKeyConfig struct {
|
||||
|
29
cmd/setup/eventstore_index.go
Normal file
29
cmd/setup/eventstore_index.go
Normal file
@ -0,0 +1,29 @@
|
||||
package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/database"
|
||||
)
|
||||
|
||||
type EventstoreIndexesNew struct {
|
||||
dbClient *database.DB
|
||||
name string
|
||||
step string
|
||||
fileName string
|
||||
stmts embed.FS
|
||||
}
|
||||
|
||||
func (mig *EventstoreIndexesNew) Execute(ctx context.Context) error {
|
||||
stmt, err := readStmt(mig.stmts, mig.step, mig.dbClient.Type(), mig.fileName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = mig.dbClient.ExecContext(ctx, stmt)
|
||||
return err
|
||||
}
|
||||
|
||||
func (mig *EventstoreIndexesNew) String() string {
|
||||
return mig.name
|
||||
}
|
@ -2,6 +2,7 @@ package setup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
_ "embed"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
@ -81,11 +82,12 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
||||
steps.FirstInstance.externalSecure = config.ExternalSecure
|
||||
steps.FirstInstance.externalPort = config.ExternalPort
|
||||
|
||||
steps.s4EventstoreIndexes = &EventstoreIndexes{dbClient: dbClient.DB, dbType: config.Database.Type()}
|
||||
steps.s4EventstoreIndexes = New04(dbClient)
|
||||
steps.s5LastFailed = &LastFailed{dbClient: dbClient.DB}
|
||||
steps.s6OwnerRemoveColumns = &OwnerRemoveColumns{dbClient: dbClient.DB}
|
||||
steps.s7LogstoreTables = &LogstoreTables{dbClient: dbClient.DB, username: config.Database.Username(), dbType: config.Database.Type()}
|
||||
steps.s8AuthTokens = &AuthTokenIndexes{dbClient: dbClient.DB}
|
||||
steps.s8AuthTokens = &AuthTokenIndexes{dbClient: dbClient}
|
||||
steps.s9EventstoreIndexes2 = New09(dbClient)
|
||||
|
||||
err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil)
|
||||
logging.OnError(err).Fatal("unable to start projections")
|
||||
@ -119,9 +121,16 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
||||
logging.OnError(err).Fatal("unable to migrate step 7")
|
||||
err = migration.Migrate(ctx, eventstoreClient, steps.s8AuthTokens)
|
||||
logging.OnError(err).Fatal("unable to migrate step 8")
|
||||
err = migration.Migrate(ctx, eventstoreClient, steps.s9EventstoreIndexes2)
|
||||
logging.OnError(err).Fatal("unable to migrate step 9")
|
||||
|
||||
for _, repeatableStep := range repeatableSteps {
|
||||
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
||||
logging.OnError(err).Fatalf("unable to migrate repeatable step: %s", repeatableStep.String())
|
||||
}
|
||||
}
|
||||
|
||||
func readStmt(fs embed.FS, folder, typ, filename string) (string, error) {
|
||||
stmt, err := fs.ReadFile(folder + "/" + typ + "/" + filename)
|
||||
return string(stmt), err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user