mirror of
https://github.com/zitadel/zitadel.git
synced 2025-03-01 06:27:23 +00:00
fix(eventstore): correct creation date of events (#5683)
* fix: add setup step to correct creation dates * fix(eventstore): replace now with statement ts * fix(step10): correct number * fix: handle wrong instance domain removed events
This commit is contained in:
parent
c420de1533
commit
8da8fbe6ce
@ -1,7 +1,8 @@
|
|||||||
module.exports = {
|
module.exports = {
|
||||||
branches: [
|
branches: [
|
||||||
{name: 'main'},
|
{name: 'main'},
|
||||||
{name: '1.87.x', range: '1.87.x', channel: '1.87.x'}
|
{name: '1.87.x', range: '1.87.x', channel: '1.87.x'},
|
||||||
|
{name: 'next'}
|
||||||
],
|
],
|
||||||
plugins: [
|
plugins: [
|
||||||
"@semantic-release/commit-analyzer"
|
"@semantic-release/commit-analyzer"
|
||||||
|
53
cmd/setup/10.go
Normal file
53
cmd/setup/10.go
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
package setup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
_ "embed"
|
||||||
|
|
||||||
|
"github.com/zitadel/logging"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
//go:embed 10.sql
|
||||||
|
correctCreationDate10 string
|
||||||
|
)
|
||||||
|
|
||||||
|
type CorrectCreationDate struct {
|
||||||
|
dbClient *database.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *CorrectCreationDate) Execute(ctx context.Context) (err error) {
|
||||||
|
tx, err := mig.dbClient.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if mig.dbClient.Type() == "cockroach" {
|
||||||
|
if _, err := tx.Exec("SET experimental_enable_temp_tables=on"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
logging.OnError(tx.Rollback()).Debug("rollback failed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
err = tx.Commit()
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
res, err := tx.ExecContext(ctx, correctCreationDate10)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
affected, _ := res.RowsAffected()
|
||||||
|
logging.WithFields("count", affected).Info("creation dates changed")
|
||||||
|
if affected == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *CorrectCreationDate) String() string {
|
||||||
|
return "10_correct_creation_date"
|
||||||
|
}
|
28
cmd/setup/10.sql
Normal file
28
cmd/setup/10.sql
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
CREATE temporary TABLE IF NOT EXISTS wrong_events (
|
||||||
|
instance_id STRING
|
||||||
|
, event_sequence INT8
|
||||||
|
, current_cd TIMESTAMPTZ
|
||||||
|
, next_cd TIMESTAMPTZ
|
||||||
|
);
|
||||||
|
|
||||||
|
TRUNCATE wrong_events;
|
||||||
|
|
||||||
|
INSERT INTO wrong_events (
|
||||||
|
SELECT * FROM (
|
||||||
|
SELECT
|
||||||
|
instance_id
|
||||||
|
, event_sequence
|
||||||
|
, creation_date AS current_cd
|
||||||
|
, lead(creation_date) OVER (
|
||||||
|
PARTITION BY instance_id
|
||||||
|
ORDER BY event_sequence DESC
|
||||||
|
) AS next_cd
|
||||||
|
FROM
|
||||||
|
eventstore.events
|
||||||
|
) WHERE
|
||||||
|
current_cd < next_cd
|
||||||
|
ORDER BY
|
||||||
|
event_sequence DESC
|
||||||
|
);
|
||||||
|
|
||||||
|
UPDATE eventstore.events e SET creation_date = we.next_cd FROM wrong_events we WHERE e.event_sequence = we.event_sequence and e.instance_id = we.instance_id;
|
@ -65,6 +65,7 @@ type Steps struct {
|
|||||||
s7LogstoreTables *LogstoreTables
|
s7LogstoreTables *LogstoreTables
|
||||||
s8AuthTokens *AuthTokenIndexes
|
s8AuthTokens *AuthTokenIndexes
|
||||||
s9EventstoreIndexes2 *EventstoreIndexesNew
|
s9EventstoreIndexes2 *EventstoreIndexesNew
|
||||||
|
s10EventstoreCreationDate *CorrectCreationDate
|
||||||
}
|
}
|
||||||
|
|
||||||
type encryptionKeyConfig struct {
|
type encryptionKeyConfig struct {
|
||||||
|
@ -88,6 +88,7 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
steps.s7LogstoreTables = &LogstoreTables{dbClient: dbClient.DB, username: config.Database.Username(), dbType: config.Database.Type()}
|
steps.s7LogstoreTables = &LogstoreTables{dbClient: dbClient.DB, username: config.Database.Username(), dbType: config.Database.Type()}
|
||||||
steps.s8AuthTokens = &AuthTokenIndexes{dbClient: dbClient}
|
steps.s8AuthTokens = &AuthTokenIndexes{dbClient: dbClient}
|
||||||
steps.s9EventstoreIndexes2 = New09(dbClient)
|
steps.s9EventstoreIndexes2 = New09(dbClient)
|
||||||
|
steps.s10EventstoreCreationDate = &CorrectCreationDate{dbClient: dbClient}
|
||||||
|
|
||||||
err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil)
|
err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil)
|
||||||
logging.OnError(err).Fatal("unable to start projections")
|
logging.OnError(err).Fatal("unable to start projections")
|
||||||
@ -123,6 +124,8 @@ func Setup(config *Config, steps *Steps, masterKey string) {
|
|||||||
logging.OnError(err).Fatal("unable to migrate step 8")
|
logging.OnError(err).Fatal("unable to migrate step 8")
|
||||||
err = migration.Migrate(ctx, eventstoreClient, steps.s9EventstoreIndexes2)
|
err = migration.Migrate(ctx, eventstoreClient, steps.s9EventstoreIndexes2)
|
||||||
logging.OnError(err).Fatal("unable to migrate step 9")
|
logging.OnError(err).Fatal("unable to migrate step 9")
|
||||||
|
err = migration.Migrate(ctx, eventstoreClient, steps.s10EventstoreCreationDate)
|
||||||
|
logging.OnError(err).Fatal("unable to migrate step 10")
|
||||||
|
|
||||||
for _, repeatableStep := range repeatableSteps {
|
for _, repeatableStep := range repeatableSteps {
|
||||||
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
err = migration.Migrate(ctx, eventstoreClient, repeatableStep)
|
||||||
|
@ -59,12 +59,16 @@ func (wm *SystemConfigWriteModel) Reduce() error {
|
|||||||
}
|
}
|
||||||
wm.Instances[e.Aggregate().InstanceID].Domains = append(wm.Instances[e.Aggregate().InstanceID].Domains, e.Domain)
|
wm.Instances[e.Aggregate().InstanceID].Domains = append(wm.Instances[e.Aggregate().InstanceID].Domains, e.Domain)
|
||||||
case *instance.DomainRemovedEvent:
|
case *instance.DomainRemovedEvent:
|
||||||
domains := wm.Instances[e.Aggregate().InstanceID].Domains
|
instance, ok := wm.Instances[e.Aggregate().InstanceID]
|
||||||
for i, domain := range domains {
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, domain := range instance.Domains {
|
||||||
if domain == e.Domain {
|
if domain == e.Domain {
|
||||||
domains[i] = domains[len(domains)-1]
|
instance.Domains[i] = instance.Domains[len(instance.Domains)-1]
|
||||||
domains[len(domains)-1] = ""
|
instance.Domains[len(instance.Domains)-1] = ""
|
||||||
wm.Instances[e.Aggregate().InstanceID].Domains = domains[:len(domains)-1]
|
wm.Instances[e.Aggregate().InstanceID].Domains = instance.Domains[:len(instance.Domains)-1]
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ const (
|
|||||||
" $2::VARCHAR AS aggregate_type," +
|
" $2::VARCHAR AS aggregate_type," +
|
||||||
" $3::VARCHAR AS aggregate_id," +
|
" $3::VARCHAR AS aggregate_id," +
|
||||||
" $4::VARCHAR AS aggregate_version," +
|
" $4::VARCHAR AS aggregate_version," +
|
||||||
" NOW() AS creation_date," +
|
" statement_timestamp() AS creation_date," +
|
||||||
" $5::JSONB AS event_data," +
|
" $5::JSONB AS event_data," +
|
||||||
" $6::VARCHAR AS editor_user," +
|
" $6::VARCHAR AS editor_user," +
|
||||||
" $7::VARCHAR AS editor_service," +
|
" $7::VARCHAR AS editor_service," +
|
||||||
|
Loading…
x
Reference in New Issue
Block a user