From 296f1c3c71aa73ca04d04cc091866bcf6c50f664 Mon Sep 17 00:00:00 2001 From: Silvan Date: Wed, 1 Sep 2021 11:25:52 +0200 Subject: [PATCH] fix(eventstore): fill new column with data (#2288) * fix: smaller outage on events migration first part * fix: fill old events with sequence * fix: migration add transactions * fix: mig * rename mig * replace migration with setup step * regenerate mock * add step 20 to config * log * simplify step * limit 1000 * limit 500 --- internal/command/setup_step20.go | 26 +++++++++ internal/domain/step.go | 1 + internal/eventstore/eventstore.go | 4 ++ internal/eventstore/eventstore_test.go | 2 + .../repository/mock/repository.mock.go | 41 ++++++++----- internal/eventstore/repository/repository.go | 2 + internal/eventstore/repository/sql/setup.go | 58 +++++++++++++++++++ internal/setup/config.go | 2 + migrations/cockroach/V1.67__update_events.sql | 1 + 9 files changed, 124 insertions(+), 13 deletions(-) create mode 100644 internal/command/setup_step20.go create mode 100644 internal/eventstore/repository/sql/setup.go create mode 100644 migrations/cockroach/V1.67__update_events.sql diff --git a/internal/command/setup_step20.go b/internal/command/setup_step20.go new file mode 100644 index 0000000000..4f7a2c9f7b --- /dev/null +++ b/internal/command/setup_step20.go @@ -0,0 +1,26 @@ +package command + +import ( + "context" + + "github.com/caos/zitadel/internal/domain" + "github.com/caos/zitadel/internal/eventstore" +) + +type Step20 struct{} + +func (s *Step20) Step() domain.Step { + return domain.Step20 +} + +func (s *Step20) execute(ctx context.Context, commandSide *Commands) error { + return commandSide.SetupStep20(ctx, s) +} + +func (c *Commands) SetupStep20(ctx context.Context, step *Step20) error { + fn := func(iam *IAMWriteModel) ([]eventstore.EventPusher, error) { + err := c.eventstore.Step20(ctx, iam.Events[len(iam.Events)-1].Sequence()) + return nil, err + } + return c.setup(ctx, step, fn) +} diff --git a/internal/domain/step.go b/internal/domain/step.go index bdfd1bf259..883682306b 100644 --- a/internal/domain/step.go +++ b/internal/domain/step.go @@ -22,6 +22,7 @@ const ( Step17 Step18 Step19 + Step20 //StepCount marks the the length of possible steps (StepCount-1 == last possible step) StepCount ) diff --git a/internal/eventstore/eventstore.go b/internal/eventstore/eventstore.go index ef4691e4bb..d25fb5c511 100644 --- a/internal/eventstore/eventstore.go +++ b/internal/eventstore/eventstore.go @@ -229,3 +229,7 @@ func uniqueConstraintActionToRepository(action UniqueConstraintAction) repositor return repository.UniqueConstraintAdd } } + +func (es *Eventstore) Step20(ctx context.Context, latestSequence uint64) error { + return es.repo.Step20(ctx, latestSequence) +} diff --git a/internal/eventstore/eventstore_test.go b/internal/eventstore/eventstore_test.go index 94dbaa1849..351de3b588 100644 --- a/internal/eventstore/eventstore_test.go +++ b/internal/eventstore/eventstore_test.go @@ -549,6 +549,8 @@ func (repo *testRepo) Health(ctx context.Context) error { return nil } +func (repo *testRepo) Step20(context.Context, uint64) error { return nil } + func (repo *testRepo) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error { if repo.err != nil { return repo.err diff --git a/internal/eventstore/repository/mock/repository.mock.go b/internal/eventstore/repository/mock/repository.mock.go index 70f3d1283a..419186eef0 100644 --- a/internal/eventstore/repository/mock/repository.mock.go +++ b/internal/eventstore/repository/mock/repository.mock.go @@ -6,35 +6,36 @@ package mock import ( context "context" + reflect "reflect" + repository "github.com/caos/zitadel/internal/eventstore/repository" gomock "github.com/golang/mock/gomock" - reflect "reflect" ) -// MockRepository is a mock of Repository interface +// MockRepository is a mock of Repository interface. type MockRepository struct { ctrl *gomock.Controller recorder *MockRepositoryMockRecorder } -// MockRepositoryMockRecorder is the mock recorder for MockRepository +// MockRepositoryMockRecorder is the mock recorder for MockRepository. type MockRepositoryMockRecorder struct { mock *MockRepository } -// NewMockRepository creates a new mock instance +// NewMockRepository creates a new mock instance. func NewMockRepository(ctrl *gomock.Controller) *MockRepository { mock := &MockRepository{ctrl: ctrl} mock.recorder = &MockRepositoryMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockRepository) EXPECT() *MockRepositoryMockRecorder { return m.recorder } -// Filter mocks base method +// Filter mocks base method. func (m *MockRepository) Filter(arg0 context.Context, arg1 *repository.SearchQuery) ([]*repository.Event, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Filter", arg0, arg1) @@ -43,13 +44,13 @@ func (m *MockRepository) Filter(arg0 context.Context, arg1 *repository.SearchQue return ret0, ret1 } -// Filter indicates an expected call of Filter +// Filter indicates an expected call of Filter. func (mr *MockRepositoryMockRecorder) Filter(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockRepository)(nil).Filter), arg0, arg1) } -// Health mocks base method +// Health mocks base method. func (m *MockRepository) Health(arg0 context.Context) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Health", arg0) @@ -57,13 +58,13 @@ func (m *MockRepository) Health(arg0 context.Context) error { return ret0 } -// Health indicates an expected call of Health +// Health indicates an expected call of Health. func (mr *MockRepositoryMockRecorder) Health(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Health", reflect.TypeOf((*MockRepository)(nil).Health), arg0) } -// LatestSequence mocks base method +// LatestSequence mocks base method. func (m *MockRepository) LatestSequence(arg0 context.Context, arg1 *repository.SearchQuery) (uint64, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LatestSequence", arg0, arg1) @@ -72,13 +73,13 @@ func (m *MockRepository) LatestSequence(arg0 context.Context, arg1 *repository.S return ret0, ret1 } -// LatestSequence indicates an expected call of LatestSequence +// LatestSequence indicates an expected call of LatestSequence. func (mr *MockRepositoryMockRecorder) LatestSequence(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LatestSequence", reflect.TypeOf((*MockRepository)(nil).LatestSequence), arg0, arg1) } -// Push mocks base method +// Push mocks base method. func (m *MockRepository) Push(arg0 context.Context, arg1 []*repository.Event, arg2 ...*repository.UniqueConstraint) error { m.ctrl.T.Helper() varargs := []interface{}{arg0, arg1} @@ -90,9 +91,23 @@ func (m *MockRepository) Push(arg0 context.Context, arg1 []*repository.Event, ar return ret0 } -// Push indicates an expected call of Push +// Push indicates an expected call of Push. func (mr *MockRepositoryMockRecorder) Push(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() varargs := append([]interface{}{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Push", reflect.TypeOf((*MockRepository)(nil).Push), varargs...) } + +// Step20 mocks base method. +func (m *MockRepository) Step20(arg0 context.Context, arg1 uint64) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Step20", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Step20 indicates an expected call of Step20. +func (mr *MockRepositoryMockRecorder) Step20(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Step20", reflect.TypeOf((*MockRepository)(nil).Step20), arg0, arg1) +} diff --git a/internal/eventstore/repository/repository.go b/internal/eventstore/repository/repository.go index 74d05ff0fc..da591b3bbf 100644 --- a/internal/eventstore/repository/repository.go +++ b/internal/eventstore/repository/repository.go @@ -16,4 +16,6 @@ type Repository interface { Filter(ctx context.Context, searchQuery *SearchQuery) (events []*Event, err error) //LatestSequence returns the latests sequence found by the the search query LatestSequence(ctx context.Context, queryFactory *SearchQuery) (uint64, error) + + Step20(ctx context.Context, latestSequence uint64) error } diff --git a/internal/eventstore/repository/sql/setup.go b/internal/eventstore/repository/sql/setup.go new file mode 100644 index 0000000000..4f57d4f2cc --- /dev/null +++ b/internal/eventstore/repository/sql/setup.go @@ -0,0 +1,58 @@ +package sql + +import ( + "context" + + "github.com/caos/logging" + repo "github.com/caos/zitadel/internal/eventstore/repository" +) + +func (db *CRDB) Step20(ctx context.Context, latestSequence uint64) error { + currentSequence := uint64(0) + limit := uint64(500) + previousSequences := make(map[repo.AggregateType]Sequence) + for currentSequence < latestSequence { + events, err := db.Filter(ctx, &repo.SearchQuery{ + Columns: repo.ColumnsEvent, + Limit: limit, + Filters: [][]*repo.Filter{ + { + &repo.Filter{ + Field: repo.FieldSequence, + Operation: repo.OperationGreater, + Value: currentSequence, + }, + }, + }, + }) + if err != nil { + return err + } + + tx, err := db.client.Begin() + if err != nil { + return err + } + + for _, event := range events { + if _, err := tx.Exec("SAVEPOINT event_update"); err != nil { + return err + } + seq := Sequence(previousSequences[event.AggregateType]) + if _, err = tx.Exec("UPDATE eventstore.events SET previous_aggregate_type_sequence = $1 WHERE event_sequence = $2", &seq, event.Sequence); err != nil { + return err + } + if _, err = tx.Exec("RELEASE SAVEPOINT event_update"); err != nil { + return err + } + previousSequences[event.AggregateType] = Sequence(event.Sequence) + currentSequence = event.Sequence + } + + if err = tx.Commit(); err != nil { + return err + } + logging.LogWithFields("SQL-bXVwS", "currentSeq", currentSequence, "events", len(events)).Info("events updated") + } + return nil +} diff --git a/internal/setup/config.go b/internal/setup/config.go index a169d38b8c..3d75ba14e6 100644 --- a/internal/setup/config.go +++ b/internal/setup/config.go @@ -25,6 +25,7 @@ type IAMSetUp struct { Step17 *command.Step17 Step18 *command.Step18 Step19 *command.Step19 + Step20 *command.Step20 } func (setup *IAMSetUp) Steps(currentDone domain.Step) ([]command.Step, error) { @@ -50,6 +51,7 @@ func (setup *IAMSetUp) Steps(currentDone domain.Step) ([]command.Step, error) { setup.Step17, setup.Step18, setup.Step19, + setup.Step20, } { if step.Step() <= currentDone { continue diff --git a/migrations/cockroach/V1.67__update_events.sql b/migrations/cockroach/V1.67__update_events.sql new file mode 100644 index 0000000000..5a27f8be80 --- /dev/null +++ b/migrations/cockroach/V1.67__update_events.sql @@ -0,0 +1 @@ +GRANT UPDATE ON TABLE eventstore.events TO eventstore; \ No newline at end of file