mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:47:32 +00:00
fix: v2 setup sequence (#3437)
* add/register human command done * validations * crypto * move clientid * keys * fix: clientID * remove v2 package * tests * tests running * fix: add init instance to eventstore * fix: mig * test(eventstore): create instance * revert old code * instance domain from ctx * chore: rename zitadel app ids * comments * fix: test * fix: mock * fix: test
This commit is contained in:
@@ -153,6 +153,25 @@ func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueCons
|
||||
return err
|
||||
}
|
||||
|
||||
var instanceRegexp = regexp.MustCompile(`eventstore\.i_[0-9a-zA-Z]{1,}_seq`)
|
||||
|
||||
func (db *CRDB) CreateInstance(ctx context.Context, instanceID string) error {
|
||||
row := db.client.QueryRowContext(ctx, "SELECT CONCAT('eventstore.i_', $1, '_seq')", instanceID)
|
||||
if row.Err() != nil {
|
||||
return caos_errs.ThrowInvalidArgument(row.Err(), "SQL-7gtFA", "Errors.InvalidArgument")
|
||||
}
|
||||
var sequenceName string
|
||||
if err := row.Scan(&sequenceName); err != nil || !instanceRegexp.MatchString(sequenceName) {
|
||||
return caos_errs.ThrowInvalidArgument(err, "SQL-7gtFA", "Errors.InvalidArgument")
|
||||
}
|
||||
|
||||
if _, err := db.client.ExecContext(ctx, "CREATE SEQUENCE "+sequenceName); err != nil {
|
||||
return caos_errs.ThrowInternal(err, "SQL-7gtFA", "Errors.Internal")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleUniqueConstraints adds or removes unique constraints
|
||||
func (db *CRDB) handleUniqueConstraints(ctx context.Context, tx *sql.Tx, uniqueConstraints ...*repository.UniqueConstraint) (err error) {
|
||||
if len(uniqueConstraints) == 0 || (len(uniqueConstraints) == 1 && uniqueConstraints[0] == nil) {
|
||||
|
@@ -561,6 +561,84 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRDB_CreateInstance(t *testing.T) {
|
||||
type args struct {
|
||||
instanceID string
|
||||
}
|
||||
type res struct {
|
||||
wantErr bool
|
||||
exists bool
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
res res
|
||||
}{
|
||||
{
|
||||
name: "no number",
|
||||
args: args{
|
||||
instanceID: "asdf;use defaultdb;DROP DATABASE zitadel;--",
|
||||
},
|
||||
res: res{
|
||||
wantErr: true,
|
||||
exists: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no instance id",
|
||||
args: args{
|
||||
instanceID: "",
|
||||
},
|
||||
res: res{
|
||||
wantErr: true,
|
||||
exists: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "correct number",
|
||||
args: args{
|
||||
instanceID: "1235",
|
||||
},
|
||||
res: res{
|
||||
wantErr: false,
|
||||
exists: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "correct text",
|
||||
args: args{
|
||||
instanceID: "system",
|
||||
},
|
||||
res: res{
|
||||
wantErr: false,
|
||||
exists: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
db := &CRDB{
|
||||
client: testCRDBClient,
|
||||
}
|
||||
|
||||
if err := db.CreateInstance(context.Background(), tt.args.instanceID); (err != nil) != tt.res.wantErr {
|
||||
t.Errorf("CRDB.CreateInstance() error = %v, wantErr %v", err, tt.res.wantErr)
|
||||
}
|
||||
|
||||
sequenceRow := testCRDBClient.QueryRow("SELECT EXISTS(SELECT 1 FROM [SHOW SEQUENCES FROM eventstore] WHERE sequence_name like $1)", "i_"+tt.args.instanceID+"%")
|
||||
var exists bool
|
||||
err := sequenceRow.Scan(&exists)
|
||||
if err != nil {
|
||||
t.Error("unable to query inserted rows: ", err)
|
||||
return
|
||||
}
|
||||
if exists != tt.res.exists {
|
||||
t.Errorf("expected exists %v got %v", tt.res.exists, exists)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCRDB_Push_Parallel(t *testing.T) {
|
||||
type args struct {
|
||||
events [][]*repository.Event
|
||||
|
@@ -1,58 +0,0 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/caos/logging"
|
||||
repo "github.com/caos/zitadel/internal/eventstore/repository"
|
||||
)
|
||||
|
||||
func (db *CRDB) Step20(ctx context.Context, latestSequence uint64) error {
|
||||
currentSequence := uint64(0)
|
||||
limit := uint64(500)
|
||||
previousSequences := make(map[repo.AggregateType]Sequence)
|
||||
for currentSequence < latestSequence {
|
||||
events, err := db.Filter(ctx, &repo.SearchQuery{
|
||||
Columns: repo.ColumnsEvent,
|
||||
Limit: limit,
|
||||
Filters: [][]*repo.Filter{
|
||||
{
|
||||
&repo.Filter{
|
||||
Field: repo.FieldSequence,
|
||||
Operation: repo.OperationGreater,
|
||||
Value: currentSequence,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tx, err := db.client.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, event := range events {
|
||||
if _, err := tx.Exec("SAVEPOINT event_update"); err != nil {
|
||||
return err
|
||||
}
|
||||
seq := Sequence(previousSequences[event.AggregateType])
|
||||
if _, err = tx.Exec("UPDATE eventstore.events SET previous_aggregate_type_sequence = $1 WHERE event_sequence = $2", &seq, event.Sequence); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err = tx.Exec("RELEASE SAVEPOINT event_update"); err != nil {
|
||||
return err
|
||||
}
|
||||
previousSequences[event.AggregateType] = Sequence(event.Sequence)
|
||||
currentSequence = event.Sequence
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
logging.WithFields("currentSeq", currentSequence, "events", len(events)).Info("events updated")
|
||||
}
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user