mirror of
https://github.com/zitadel/zitadel.git
synced 2025-02-28 23:27:23 +00:00
fix(migration): speed up step 11 (#6085)
This commit is contained in:
parent
07fb6f8284
commit
9020c9d94f
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"embed"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/cockroach-go/v2/crdb"
|
||||
"github.com/zitadel/logging"
|
||||
@ -15,10 +14,8 @@ import (
|
||||
var (
|
||||
//go:embed 11/11_add_column.sql
|
||||
addEventCreatedAt string
|
||||
//go:embed 11/11_fetch_events.sql
|
||||
fetchCreatedAt string
|
||||
//go:embed 11/11_fill_column.sql
|
||||
fillCreatedAt string
|
||||
//go:embed 11/11_update_events.sql
|
||||
setCreatedAt string
|
||||
//go:embed 11/11_set_column.sql
|
||||
setCreatedAtDetails string
|
||||
//go:embed 11/postgres/create_index.sql
|
||||
@ -38,70 +35,33 @@ type AddEventCreatedAt struct {
|
||||
func (mig *AddEventCreatedAt) Execute(ctx context.Context) error {
|
||||
// execute step 10 again because events created after the first execution of step 10
|
||||
// could still have the wrong ordering of sequences and creation date
|
||||
logging.WithFields("step", "11").Info("ensure creation dates order")
|
||||
if err := mig.step10.Execute(ctx); err != nil {
|
||||
logging.WithFields("step", "11").WithError(err).Info("ensure creation dates order failed")
|
||||
return err
|
||||
}
|
||||
logging.WithFields("step", "11").Info("ensure creation dates order done")
|
||||
logging.WithFields("step", "11").Info("add created_at column")
|
||||
_, err := mig.dbClient.ExecContext(ctx, addEventCreatedAt)
|
||||
if err != nil {
|
||||
logging.WithFields("step", "11").WithError(err).Info("add created_at column failed")
|
||||
return err
|
||||
}
|
||||
logging.WithFields("step", "11").Info("created_at column added")
|
||||
|
||||
createIndex, err := readStmt(createdAtIndexCreateStmt, "11", mig.dbClient.Type(), "create_index.sql")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logging.WithFields("step", "11").Info("create index")
|
||||
_, err = mig.dbClient.ExecContext(ctx, createIndex)
|
||||
logging.WithFields("step", "11").WithError(err).Info("create index failed")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
logging.WithFields("step", "11").Info("index created")
|
||||
|
||||
for i := 0; ; i++ {
|
||||
logging.WithFields("step", "11", "iteration", i).Info("begin set created_at iteration")
|
||||
var count int
|
||||
err = crdb.ExecuteTx(ctx, mig.dbClient.DB, nil, func(tx *sql.Tx) error {
|
||||
rows, err := tx.Query(fetchCreatedAt, mig.BulkAmount)
|
||||
var count int64
|
||||
err := crdb.ExecuteTx(ctx, mig.dbClient.DB, nil, func(tx *sql.Tx) error {
|
||||
res, err := tx.Exec(setCreatedAt, mig.BulkAmount)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
type date struct {
|
||||
instanceID string
|
||||
eventSequence uint64
|
||||
creationDate time.Time
|
||||
}
|
||||
dates := make([]*date, 0, 20)
|
||||
|
||||
for rows.Next() {
|
||||
count++
|
||||
|
||||
d := new(date)
|
||||
err = rows.Scan(&d.instanceID, &d.eventSequence, &d.creationDate)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
dates = append(dates, d)
|
||||
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, d := range dates {
|
||||
_, err = tx.Exec(fillCreatedAt, d.creationDate, d.instanceID, d.eventSequence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
count, _ = res.RowsAffected()
|
||||
logging.WithFields("affected", count).Info("created_at updated")
|
||||
|
||||
return nil
|
||||
})
|
||||
@ -109,7 +69,7 @@ func (mig *AddEventCreatedAt) Execute(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
logging.WithFields("step", "11", "iteration", i, "count", count).Info("set created_at iteration done")
|
||||
if count < 20 {
|
||||
if count < int64(mig.BulkAmount) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -1,13 +0,0 @@
|
||||
SELECT
|
||||
instance_id
|
||||
, event_sequence
|
||||
, creation_date
|
||||
FROM
|
||||
eventstore.events
|
||||
WHERE
|
||||
created_at IS NULL
|
||||
ORDER BY
|
||||
event_sequence DESC
|
||||
, instance_id
|
||||
LIMIT $1
|
||||
FOR UPDATE
|
@ -1 +0,0 @@
|
||||
UPDATE eventstore.events SET created_at = $1 WHERE instance_id = $2 AND event_sequence = $3
|
21
cmd/setup/11/11_update_events.sql
Normal file
21
cmd/setup/11/11_update_events.sql
Normal file
@ -0,0 +1,21 @@
|
||||
UPDATE eventstore.events SET
|
||||
created_at = creation_date
|
||||
FROM (
|
||||
SELECT
|
||||
e.event_sequence as seq
|
||||
, e.instance_id as i_id
|
||||
, e.creation_date as cd
|
||||
FROM
|
||||
eventstore.events e
|
||||
WHERE
|
||||
created_at IS NULL
|
||||
ORDER BY
|
||||
event_sequence ASC
|
||||
, instance_id
|
||||
LIMIT $1
|
||||
) AS e
|
||||
WHERE
|
||||
e.seq = eventstore.events.event_sequence
|
||||
AND e.i_id = eventstore.events.instance_id
|
||||
AND e.cd = eventstore.events.creation_date
|
||||
;
|
Loading…
x
Reference in New Issue
Block a user