perf: query data AS OF SYSTEM TIME (#5231)

Queries the data in the storage layser at the timestamp when the call hit the API layer
This commit is contained in:
Silvan
2023-02-27 22:36:43 +01:00
committed by GitHub
parent 80003939ad
commit e38abdcdf3
170 changed files with 3101 additions and 3169 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/lib/pq"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
caos_errs "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
@@ -97,19 +98,19 @@ const (
)
type CRDB struct {
client *sql.DB
*database.DB
}
func NewCRDB(client *sql.DB) *CRDB {
func NewCRDB(client *database.DB) *CRDB {
return &CRDB{client}
}
func (db *CRDB) Health(ctx context.Context) error { return db.client.Ping() }
func (db *CRDB) Health(ctx context.Context) error { return db.Ping() }
// Push adds all events to the eventstreams of the aggregates.
// This call is transaction save. The transaction will be rolled back if one event fails
func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueConstraints ...*repository.UniqueConstraint) error {
err := crdb.ExecuteTx(ctx, db.client, nil, func(tx *sql.Tx) error {
err := crdb.ExecuteTx(ctx, db.DB.DB, nil, func(tx *sql.Tx) error {
var (
previousAggregateSequence Sequence
@@ -159,7 +160,7 @@ func (db *CRDB) Push(ctx context.Context, events []*repository.Event, uniqueCons
var instanceRegexp = regexp.MustCompile(`eventstore\.i_[0-9a-zA-Z]{1,}_seq`)
func (db *CRDB) CreateInstance(ctx context.Context, instanceID string) error {
row := db.client.QueryRowContext(ctx, "SELECT CONCAT('eventstore.i_', $1::TEXT, '_seq')", instanceID)
row := db.QueryRowContext(ctx, "SELECT CONCAT('eventstore.i_', $1::TEXT, '_seq')", instanceID)
if row.Err() != nil {
return caos_errs.ThrowInvalidArgument(row.Err(), "SQL-7gtFA", "Errors.InvalidArgument")
}
@@ -168,7 +169,7 @@ func (db *CRDB) CreateInstance(ctx context.Context, instanceID string) error {
return caos_errs.ThrowInvalidArgument(err, "SQL-7gtFA", "Errors.InvalidArgument")
}
if _, err := db.client.ExecContext(ctx, "CREATE SEQUENCE "+sequenceName); err != nil {
if _, err := db.ExecContext(ctx, "CREATE SEQUENCE "+sequenceName); err != nil {
return caos_errs.ThrowInternal(err, "SQL-7gtFA", "Errors.Internal")
}
@@ -249,7 +250,7 @@ func (db *CRDB) InstanceIDs(ctx context.Context, searchQuery *repository.SearchQ
}
func (db *CRDB) db() *sql.DB {
return db.client
return db.DB.DB
}
func (db *CRDB) orderByEventSequence(desc bool) string {

View File

@@ -437,7 +437,10 @@ func TestCRDB_Push_OneAggregate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{
DB: testCRDBClient,
Database: new(testDB),
},
}
if tt.args.uniqueDataType != "" && tt.args.uniqueDataField != "" {
err := fillUniqueData(tt.args.uniqueDataType, tt.args.uniqueDataField, tt.args.uniqueDataInstanceID)
@@ -561,7 +564,10 @@ func TestCRDB_Push_MultipleAggregate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{
DB: testCRDBClient,
Database: new(testDB),
},
}
if err := db.Push(context.Background(), tt.args.events); (err != nil) != tt.res.wantErr {
t.Errorf("CRDB.Push() error = %v, wantErr %v", err, tt.res.wantErr)
@@ -638,7 +644,7 @@ func TestCRDB_CreateInstance(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{DB: testCRDBClient},
}
if err := db.CreateInstance(context.Background(), tt.args.instanceID); (err != nil) != tt.res.wantErr {
@@ -776,7 +782,10 @@ func TestCRDB_Push_Parallel(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{
DB: testCRDBClient,
Database: new(testDB),
},
}
wg := sync.WaitGroup{}
@@ -897,7 +906,10 @@ func TestCRDB_Filter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{
DB: testCRDBClient,
Database: new(testDB),
},
}
// setup initial data for query
@@ -987,7 +999,10 @@ func TestCRDB_LatestSequence(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{
DB: testCRDBClient,
Database: new(testDB),
},
}
// setup initial data for query
@@ -1131,7 +1146,10 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: testCRDBClient,
DB: &database.DB{
DB: testCRDBClient,
Database: new(testDB),
},
}
if err := db.Push(context.Background(), tt.args.events); err != nil {
t.Errorf("CRDB.Push() error = %v", err)

View File

@@ -4,6 +4,7 @@ import (
"database/sql"
"os"
"testing"
"time"
"github.com/cockroachdb/cockroach-go/v2/testserver"
"github.com/zitadel/logging"
@@ -53,8 +54,8 @@ func initDB(db *sql.DB) error {
err := initialise.Init(db,
initialise.VerifyUser(config.Username(), ""),
initialise.VerifyDatabase(config.Database()),
initialise.VerifyGrant(config.Database(), config.Username()))
initialise.VerifyDatabase(config.DatabaseName()),
initialise.VerifyGrant(config.DatabaseName(), config.Username()))
if err != nil {
return err
}
@@ -66,3 +67,13 @@ func fillUniqueData(unique_type, field, instanceID string) error {
_, err := testCRDBClient.Exec("INSERT INTO eventstore.unique_constraints (unique_type, unique_field, instance_id) VALUES ($1, $2, $3)", unique_type, field, instanceID)
return err
}
type testDB struct{}
func (_ *testDB) Timetravel(time.Duration) string { return " AS OF SYSTEM TIME '-1 ms' " }
func (*testDB) DatabaseName() string { return "db" }
func (*testDB) Username() string { return "user" }
func (*testDB) Type() string { return "type" }

View File

@@ -10,6 +10,8 @@ import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/database/dialect"
z_errors "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
@@ -24,6 +26,7 @@ type querier interface {
instanceIDsQuery() string
db() *sql.DB
orderByEventSequence(desc bool) string
dialect.Database
}
type scan func(dest ...interface{}) error
@@ -34,6 +37,11 @@ func query(ctx context.Context, criteria querier, searchQuery *repository.Search
if where == "" || query == "" {
return z_errors.ThrowInvalidArgument(nil, "SQL-rWeBw", "invalid query factory")
}
if searchQuery.Tx == nil {
if travel := prepareTimeTravel(ctx, criteria, searchQuery.AllowTimeTravel); travel != "" {
query += travel
}
}
query += where
if searchQuery.Columns == repository.ColumnsEvent {
@@ -85,6 +93,14 @@ func prepareColumns(criteria querier, columns repository.Columns) (string, func(
}
}
func prepareTimeTravel(ctx context.Context, criteria querier, allow bool) string {
if !allow {
return ""
}
took := call.Took(ctx)
return criteria.Timetravel(took)
}
func maxSequenceScanner(row scan, dest interface{}) (err error) {
sequence, ok := dest.(*Sequence)
if !ok {

View File

@@ -10,6 +10,7 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/eventstore/repository"
)
@@ -537,7 +538,10 @@ func Test_query_events_with_crdb(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
db := &CRDB{
client: tt.fields.client,
DB: &database.DB{
DB: tt.fields.client,
Database: new(testDB),
},
}
// setup initial data for query
@@ -657,6 +661,36 @@ func Test_query_events_mocked(t *testing.T) {
wantErr: false,
},
},
{
name: "with limit and order by desc as of system time",
args: args{
dest: &[]*repository.Event{},
query: &repository.SearchQuery{
Columns: repository.ColumnsEvent,
Desc: true,
Limit: 5,
AllowTimeTravel: true,
Filters: [][]*repository.Filter{
{
{
Field: repository.FieldAggregateType,
Value: repository.AggregateType("user"),
Operation: repository.OperationEquals,
},
},
},
},
},
fields: fields{
mock: newMockClient(t).expectQuery(t,
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`,
[]driver.Value{repository.AggregateType("user"), uint64(5)},
),
},
res: res{
wantErr: false,
},
},
{
name: "error sql conn closed",
args: args{
@@ -786,9 +820,11 @@ func Test_query_events_mocked(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
crdb := &CRDB{}
crdb := &CRDB{DB: &database.DB{
Database: new(testDB),
}}
if tt.fields.mock != nil {
crdb.client = tt.fields.mock.client
crdb.DB.DB = tt.fields.mock.client
}
err := query(context.Background(), crdb, tt.args.query, tt.args.dest)