2020-10-05 17:09:26 +00:00
package sql
2020-10-02 14:21:51 +00:00
import (
2020-10-21 17:00:41 +00:00
"context"
2020-10-02 14:21:51 +00:00
"database/sql"
2020-10-21 17:00:41 +00:00
"database/sql/driver"
2020-10-02 14:21:51 +00:00
"reflect"
2023-10-19 10:19:10 +00:00
"strconv"
2020-10-02 14:21:51 +00:00
"testing"
"time"
2020-10-21 17:00:41 +00:00
"github.com/DATA-DOG/go-sqlmock"
2023-10-19 15:21:31 +00:00
"github.com/stretchr/testify/assert"
2022-01-06 07:29:58 +00:00
2023-02-27 21:36:43 +00:00
"github.com/zitadel/zitadel/internal/database"
2023-10-19 10:19:10 +00:00
"github.com/zitadel/zitadel/internal/database/cockroach"
2022-04-26 23:01:45 +00:00
"github.com/zitadel/zitadel/internal/errors"
2023-10-19 10:19:10 +00:00
"github.com/zitadel/zitadel/internal/eventstore"
2022-04-26 23:01:45 +00:00
"github.com/zitadel/zitadel/internal/eventstore/repository"
2020-10-02 14:21:51 +00:00
)
func Test_getCondition ( t * testing . T ) {
type args struct {
2020-10-05 17:09:26 +00:00
filter * repository . Filter
2020-10-02 14:21:51 +00:00
}
tests := [ ] struct {
name string
args args
want string
} {
{
name : "equals" ,
2020-10-06 19:28:09 +00:00
args : args { filter : repository . NewFilter ( repository . FieldAggregateID , "" , repository . OperationEquals ) } ,
2020-10-02 14:21:51 +00:00
want : "aggregate_id = ?" ,
} ,
{
name : "greater" ,
2020-10-06 19:28:09 +00:00
args : args { filter : repository . NewFilter ( repository . FieldSequence , 0 , repository . OperationGreater ) } ,
2023-10-19 10:19:10 +00:00
want : ` "sequence" > ? ` ,
2020-10-02 14:21:51 +00:00
} ,
{
name : "less" ,
2020-10-06 19:28:09 +00:00
args : args { filter : repository . NewFilter ( repository . FieldSequence , 5000 , repository . OperationLess ) } ,
2023-10-19 10:19:10 +00:00
want : ` "sequence" < ? ` ,
2020-10-02 14:21:51 +00:00
} ,
{
name : "in list" ,
2023-10-19 10:19:10 +00:00
args : args { filter : repository . NewFilter ( repository . FieldAggregateType , [ ] eventstore . AggregateType { "movies" , "actors" } , repository . OperationIn ) } ,
2020-10-02 14:21:51 +00:00
want : "aggregate_type = ANY(?)" ,
} ,
{
name : "invalid operation" ,
2023-10-19 10:19:10 +00:00
args : args { filter : repository . NewFilter ( repository . FieldAggregateType , [ ] eventstore . AggregateType { "movies" , "actors" } , repository . Operation ( - 1 ) ) } ,
2020-10-02 14:21:51 +00:00
want : "" ,
} ,
{
name : "invalid field" ,
2023-10-19 10:19:10 +00:00
args : args { filter : repository . NewFilter ( repository . Field ( - 1 ) , [ ] eventstore . AggregateType { "movies" , "actors" } , repository . OperationEquals ) } ,
2020-10-02 14:21:51 +00:00
want : "" ,
} ,
{
name : "invalid field and operation" ,
2023-10-19 10:19:10 +00:00
args : args { filter : repository . NewFilter ( repository . Field ( - 1 ) , [ ] eventstore . AggregateType { "movies" , "actors" } , repository . Operation ( - 1 ) ) } ,
2020-10-02 14:21:51 +00:00
want : "" ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
2020-10-05 18:39:36 +00:00
db := & CRDB { }
2023-10-19 10:19:10 +00:00
if got := getCondition ( db , tt . args . filter , false ) ; got != tt . want {
2020-10-02 14:21:51 +00:00
t . Errorf ( "getCondition() = %v, want %v" , got , tt . want )
}
} )
}
}
func Test_prepareColumns ( t * testing . T ) {
2023-10-19 15:21:31 +00:00
var reducedEvents [ ] eventstore . Event
2020-10-05 18:39:36 +00:00
type fields struct {
dbRow [ ] interface { }
}
2020-10-02 14:21:51 +00:00
type args struct {
2023-10-19 10:19:10 +00:00
columns eventstore . Columns
2020-10-02 14:21:51 +00:00
dest interface { }
dbErr error
2023-10-19 10:19:10 +00:00
useV1 bool
2020-10-02 14:21:51 +00:00
}
type res struct {
query string
expected interface { }
dbErr func ( error ) bool
}
tests := [ ] struct {
2020-10-05 18:39:36 +00:00
name string
args args
res res
fields fields
2020-10-02 14:21:51 +00:00
} {
{
name : "invalid columns" ,
2023-10-19 10:19:10 +00:00
args : args { columns : eventstore . Columns ( - 1 ) } ,
2020-10-02 14:21:51 +00:00
res : res {
query : "" ,
dbErr : func ( err error ) bool { return err == nil } ,
} ,
} ,
{
name : "max column" ,
args : args {
2023-10-19 10:19:10 +00:00
columns : eventstore . ColumnsMaxSequence ,
dest : new ( sql . NullFloat64 ) ,
useV1 : true ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
2023-10-19 10:19:10 +00:00
query : ` SELECT event_sequence FROM eventstore.events ` ,
expected : sql . NullFloat64 { Float64 : 43 , Valid : true } ,
2020-10-02 14:21:51 +00:00
} ,
2020-10-05 18:39:36 +00:00
fields : fields {
2023-10-19 10:19:10 +00:00
dbRow : [ ] interface { } { sql . NullFloat64 { Float64 : 43 , Valid : true } } ,
} ,
} ,
{
name : "max column v2" ,
args : args {
columns : eventstore . ColumnsMaxSequence ,
dest : new ( sql . NullFloat64 ) ,
} ,
res : res {
query : ` SELECT "position" FROM eventstore.events2 ` ,
expected : sql . NullFloat64 { Float64 : 43 , Valid : true } ,
} ,
fields : fields {
dbRow : [ ] interface { } { sql . NullFloat64 { Float64 : 43 , Valid : true } } ,
2020-10-05 18:39:36 +00:00
} ,
2020-10-02 14:21:51 +00:00
} ,
{
name : "max sequence wrong dest type" ,
args : args {
2023-10-19 10:19:10 +00:00
columns : eventstore . ColumnsMaxSequence ,
2020-10-02 14:21:51 +00:00
dest : new ( uint64 ) ,
} ,
res : res {
2023-10-19 10:19:10 +00:00
query : ` SELECT "position" FROM eventstore.events2 ` ,
2020-10-02 14:21:51 +00:00
dbErr : errors . IsErrorInvalidArgument ,
} ,
} ,
{
2020-10-05 18:39:36 +00:00
name : "events" ,
2020-10-02 14:21:51 +00:00
args : args {
2023-10-19 10:19:10 +00:00
columns : eventstore . ColumnsEvent ,
2023-10-19 15:21:31 +00:00
dest : eventstore . Reducer ( func ( event eventstore . Event ) error {
reducedEvents = append ( reducedEvents , event )
return nil
} ) ,
useV1 : true ,
2023-10-19 10:19:10 +00:00
} ,
res : res {
query : ` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events ` ,
expected : [ ] eventstore . Event {
2023-10-19 15:21:31 +00:00
& repository . Event { AggregateID : "hodor" , AggregateType : "user" , Seq : 5 , Data : nil } ,
2023-10-19 10:19:10 +00:00
} ,
} ,
fields : fields {
dbRow : [ ] interface { } { time . Time { } , eventstore . EventType ( "" ) , uint64 ( 5 ) , sql . RawBytes ( nil ) , "" , sql . NullString { } , "" , eventstore . AggregateType ( "user" ) , "hodor" , eventstore . Version ( "" ) } ,
} ,
} ,
{
name : "events v2" ,
args : args {
columns : eventstore . ColumnsEvent ,
2023-10-19 15:21:31 +00:00
dest : eventstore . Reducer ( func ( event eventstore . Event ) error {
reducedEvents = append ( reducedEvents , event )
return nil
} ) ,
2023-10-19 10:19:10 +00:00
} ,
res : res {
query : ` SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 ` ,
expected : [ ] eventstore . Event {
2023-10-19 15:21:31 +00:00
& repository . Event { AggregateID : "hodor" , AggregateType : "user" , Seq : 5 , Pos : 42 , Data : nil , Version : "v1" } ,
2023-10-19 10:19:10 +00:00
} ,
} ,
fields : fields {
dbRow : [ ] interface { } { time . Time { } , eventstore . EventType ( "" ) , uint64 ( 5 ) , sql . NullFloat64 { Float64 : 42 , Valid : true } , sql . RawBytes ( nil ) , "" , sql . NullString { } , "" , eventstore . AggregateType ( "user" ) , "hodor" , uint8 ( 1 ) } ,
} ,
} ,
{
name : "event null position" ,
args : args {
columns : eventstore . ColumnsEvent ,
2023-10-19 15:21:31 +00:00
dest : eventstore . Reducer ( func ( event eventstore . Event ) error {
reducedEvents = append ( reducedEvents , event )
return nil
} ) ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
2023-10-19 10:19:10 +00:00
query : ` SELECT created_at, event_type, "sequence", "position", payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision FROM eventstore.events2 ` ,
expected : [ ] eventstore . Event {
2023-10-19 15:21:31 +00:00
& repository . Event { AggregateID : "hodor" , AggregateType : "user" , Seq : 5 , Pos : 0 , Data : nil , Version : "v1" } ,
2020-10-05 18:39:36 +00:00
} ,
} ,
fields : fields {
2023-10-19 10:19:10 +00:00
dbRow : [ ] interface { } { time . Time { } , eventstore . EventType ( "" ) , uint64 ( 5 ) , sql . NullFloat64 { Float64 : 0 , Valid : false } , sql . RawBytes ( nil ) , "" , sql . NullString { } , "" , eventstore . AggregateType ( "user" ) , "hodor" , uint8 ( 1 ) } ,
2020-10-02 14:21:51 +00:00
} ,
} ,
{
2020-10-05 18:39:36 +00:00
name : "events wrong dest type" ,
2020-10-02 14:21:51 +00:00
args : args {
2023-10-19 10:19:10 +00:00
columns : eventstore . ColumnsEvent ,
2020-10-05 18:39:36 +00:00
dest : [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
useV1 : true ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
2023-10-19 10:19:10 +00:00
query : ` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events ` ,
2020-10-02 14:21:51 +00:00
dbErr : errors . IsErrorInvalidArgument ,
} ,
} ,
{
name : "event query error" ,
args : args {
2023-10-19 10:19:10 +00:00
columns : eventstore . ColumnsEvent ,
2023-10-19 15:21:31 +00:00
dest : eventstore . Reducer ( func ( event eventstore . Event ) error {
reducedEvents = append ( reducedEvents , event )
return nil
} ) ,
dbErr : sql . ErrConnDone ,
useV1 : true ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
2023-10-19 10:19:10 +00:00
query : ` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events ` ,
2020-10-02 14:21:51 +00:00
dbErr : errors . IsInternal ,
} ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
2020-10-05 18:39:36 +00:00
crdb := & CRDB { }
2023-10-19 10:19:10 +00:00
query , rowScanner := prepareColumns ( crdb , tt . args . columns , tt . args . useV1 )
2020-10-02 14:21:51 +00:00
if query != tt . res . query {
2020-10-05 18:39:36 +00:00
t . Errorf ( "prepareColumns() got = %s, want %s" , query , tt . res . query )
2020-10-02 14:21:51 +00:00
}
if tt . res . query == "" && rowScanner != nil {
t . Errorf ( "row scanner should be nil" )
}
if rowScanner == nil {
return
}
2020-10-05 18:39:36 +00:00
err := rowScanner ( prepareTestScan ( tt . args . dbErr , tt . fields . dbRow ) , tt . args . dest )
if err != nil && tt . res . dbErr == nil || err != nil && ! tt . res . dbErr ( err ) || err == nil && tt . res . dbErr != nil {
t . Errorf ( "wrong error type in rowScanner got: %v" , err )
return
}
if tt . res . dbErr != nil && tt . res . dbErr ( err ) {
return
}
2023-10-19 10:19:10 +00:00
if equalizer , ok := tt . res . expected . ( interface { Equal ( time . Time ) bool } ) ; ok {
equalizer . Equal ( tt . args . dest . ( * sql . NullTime ) . Time )
return
}
2023-10-19 15:21:31 +00:00
if _ , ok := tt . args . dest . ( eventstore . Reducer ) ; ok {
assert . Equal ( t , tt . res . expected , reducedEvents )
reducedEvents = nil
return
}
2023-10-19 10:19:10 +00:00
got := reflect . Indirect ( reflect . ValueOf ( tt . args . dest ) ) . Interface ( )
if ! reflect . DeepEqual ( got , tt . res . expected ) {
t . Errorf ( "unexpected result from rowScanner \nwant: %+v \ngot: %+v" , tt . res . expected , got )
2020-10-02 14:21:51 +00:00
}
} )
}
}
2020-10-05 17:09:26 +00:00
func prepareTestScan ( err error , res [ ] interface { } ) scan {
2020-10-02 14:21:51 +00:00
return func ( dests ... interface { } ) error {
if err != nil {
return err
}
if len ( dests ) != len ( res ) {
return errors . ThrowInvalidArgumentf ( nil , "SQL-NML1q" , "expected len %d got %d" , len ( res ) , len ( dests ) )
}
for i , r := range res {
2023-10-19 10:19:10 +00:00
_ , ok := dests [ i ] . ( * eventstore . Version )
if ok {
val , ok := r . ( uint8 )
if ok {
r = eventstore . Version ( "" + strconv . Itoa ( int ( val ) ) )
}
}
2020-10-02 14:21:51 +00:00
reflect . ValueOf ( dests [ i ] ) . Elem ( ) . Set ( reflect . ValueOf ( r ) )
}
return nil
}
}
func Test_prepareCondition ( t * testing . T ) {
type args struct {
2023-10-19 10:19:10 +00:00
query * repository . SearchQuery
useV1 bool
2020-10-02 14:21:51 +00:00
}
type res struct {
clause string
values [ ] interface { }
}
tests := [ ] struct {
name string
args args
res res
} {
{
name : "nil filters" ,
args : args {
2023-10-19 10:19:10 +00:00
query : & repository . SearchQuery { } ,
useV1 : true ,
} ,
res : res {
clause : "" ,
values : nil ,
} ,
} ,
{
name : "nil filters v2" ,
args : args {
query : & repository . SearchQuery { } ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
clause : "" ,
values : nil ,
} ,
} ,
{
name : "empty filters" ,
args : args {
2023-10-19 10:19:10 +00:00
query : & repository . SearchQuery {
SubQueries : [ ] [ ] * repository . Filter { } ,
} ,
useV1 : true ,
} ,
res : res {
clause : "" ,
values : nil ,
} ,
} ,
{
name : "empty filters v2" ,
args : args {
query : & repository . SearchQuery {
SubQueries : [ ] [ ] * repository . Filter { } ,
} ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
clause : "" ,
values : nil ,
} ,
} ,
{
name : "invalid condition" ,
args : args {
2023-10-19 10:19:10 +00:00
query : & repository . SearchQuery {
SubQueries : [ ] [ ] * repository . Filter {
{
repository . NewFilter ( repository . FieldAggregateID , "wrong" , repository . Operation ( - 1 ) ) ,
} ,
} ,
} ,
useV1 : true ,
} ,
res : res {
clause : "" ,
values : nil ,
} ,
} ,
{
name : "invalid condition v2" ,
args : args {
query : & repository . SearchQuery {
SubQueries : [ ] [ ] * repository . Filter {
{
repository . NewFilter ( repository . FieldAggregateID , "wrong" , repository . Operation ( - 1 ) ) ,
} ,
2021-07-06 11:55:57 +00:00
} ,
2020-10-02 14:21:51 +00:00
} ,
} ,
res : res {
clause : "" ,
values : nil ,
} ,
} ,
{
name : "array as condition value" ,
args : args {
2023-10-19 10:19:10 +00:00
query : & repository . SearchQuery {
AwaitOpenTransactions : true ,
SubQueries : [ ] [ ] * repository . Filter {
{
repository . NewFilter ( repository . FieldAggregateType , [ ] eventstore . AggregateType { "user" , "org" } , repository . OperationIn ) ,
} ,
2021-07-06 11:55:57 +00:00
} ,
2020-10-02 14:21:51 +00:00
} ,
2023-10-19 10:19:10 +00:00
useV1 : true ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
2023-10-19 10:19:10 +00:00
clause : " WHERE aggregate_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher')" ,
values : [ ] interface { } { [ ] eventstore . AggregateType { "user" , "org" } } ,
} ,
} ,
{
name : "array as condition value v2" ,
args : args {
query : & repository . SearchQuery {
AwaitOpenTransactions : true ,
SubQueries : [ ] [ ] * repository . Filter {
{
repository . NewFilter ( repository . FieldAggregateType , [ ] eventstore . AggregateType { "user" , "org" } , repository . OperationIn ) ,
} ,
} ,
} ,
} ,
res : res {
clause : ` WHERE aggregate_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher') ` ,
values : [ ] interface { } { [ ] eventstore . AggregateType { "user" , "org" } } ,
2020-10-02 14:21:51 +00:00
} ,
} ,
{
name : "multiple filters" ,
args : args {
2023-10-19 10:19:10 +00:00
query : & repository . SearchQuery {
AwaitOpenTransactions : true ,
SubQueries : [ ] [ ] * repository . Filter {
{
repository . NewFilter ( repository . FieldAggregateType , [ ] eventstore . AggregateType { "user" , "org" } , repository . OperationIn ) ,
repository . NewFilter ( repository . FieldAggregateID , "1234" , repository . OperationEquals ) ,
repository . NewFilter ( repository . FieldEventType , [ ] eventstore . EventType { "user.created" , "org.created" } , repository . OperationIn ) ,
} ,
2021-07-06 11:55:57 +00:00
} ,
2020-10-02 14:21:51 +00:00
} ,
2023-10-19 10:19:10 +00:00
useV1 : true ,
2020-10-02 14:21:51 +00:00
} ,
res : res {
2023-10-19 10:19:10 +00:00
clause : " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher')" ,
values : [ ] interface { } { [ ] eventstore . AggregateType { "user" , "org" } , "1234" , [ ] eventstore . EventType { "user.created" , "org.created" } } ,
} ,
} ,
{
name : "multiple filters v2" ,
args : args {
query : & repository . SearchQuery {
AwaitOpenTransactions : true ,
SubQueries : [ ] [ ] * repository . Filter {
{
repository . NewFilter ( repository . FieldAggregateType , [ ] eventstore . AggregateType { "user" , "org" } , repository . OperationIn ) ,
repository . NewFilter ( repository . FieldAggregateID , "1234" , repository . OperationEquals ) ,
repository . NewFilter ( repository . FieldEventType , [ ] eventstore . EventType { "user.created" , "org.created" } , repository . OperationIn ) ,
} ,
} ,
} ,
} ,
res : res {
clause : ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher') ` ,
values : [ ] interface { } { [ ] eventstore . AggregateType { "user" , "org" } , "1234" , [ ] eventstore . EventType { "user.created" , "org.created" } } ,
2020-10-02 14:21:51 +00:00
} ,
} ,
}
2023-10-19 10:19:10 +00:00
crdb := NewCRDB ( & database . DB { Database : new ( cockroach . Config ) } )
2020-10-02 14:21:51 +00:00
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
2023-10-19 10:19:10 +00:00
gotClause , gotValues := prepareConditions ( crdb , tt . args . query , tt . args . useV1 )
2020-10-02 14:21:51 +00:00
if gotClause != tt . res . clause {
t . Errorf ( "prepareCondition() gotClause = %v, want %v" , gotClause , tt . res . clause )
}
if len ( gotValues ) != len ( tt . res . values ) {
t . Errorf ( "wrong length of gotten values got = %d, want %d" , len ( gotValues ) , len ( tt . res . values ) )
return
}
for i , value := range gotValues {
if ! reflect . DeepEqual ( value , tt . res . values [ i ] ) {
t . Errorf ( "prepareCondition() gotValues = %v, want %v" , gotValues , tt . res . values )
}
}
} )
}
}
2020-10-21 17:00:41 +00:00
func Test_query_events_with_crdb ( t * testing . T ) {
type args struct {
2023-10-19 10:19:10 +00:00
searchQuery * eventstore . SearchQueryBuilder
2020-10-21 17:00:41 +00:00
}
type fields struct {
2023-10-19 10:19:10 +00:00
existingEvents [ ] eventstore . Command
2020-10-21 17:00:41 +00:00
client * sql . DB
}
type res struct {
eventCount int
}
tests := [ ] struct {
name string
fields fields
args args
res res
wantErr bool
} {
{
name : "aggregate type filter no events" ,
args : args {
2023-10-19 10:19:10 +00:00
searchQuery : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
AddQuery ( ) .
AggregateTypes ( "not found" ) .
Builder ( ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
client : testCRDBClient ,
2023-10-19 10:19:10 +00:00
existingEvents : [ ] eventstore . Command {
2021-01-15 08:32:59 +00:00
generateEvent ( t , "300" ) ,
generateEvent ( t , "300" ) ,
generateEvent ( t , "300" ) ,
2020-10-21 17:00:41 +00:00
} ,
} ,
res : res {
eventCount : 0 ,
} ,
wantErr : false ,
} ,
{
name : "aggregate type filter events found" ,
args : args {
2023-10-19 10:19:10 +00:00
searchQuery : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
AddQuery ( ) .
AggregateTypes ( eventstore . AggregateType ( t . Name ( ) ) ) .
Builder ( ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
client : testCRDBClient ,
2023-10-19 10:19:10 +00:00
existingEvents : [ ] eventstore . Command {
2021-01-15 08:32:59 +00:00
generateEvent ( t , "301" ) ,
generateEvent ( t , "302" ) ,
generateEvent ( t , "302" ) ,
generateEvent ( t , "303" , func ( e * repository . Event ) { e . AggregateType = "not in list" } ) ,
2020-10-21 17:00:41 +00:00
} ,
} ,
res : res {
eventCount : 3 ,
} ,
wantErr : false ,
} ,
{
name : "aggregate type and id filter events found" ,
args : args {
2023-10-19 10:19:10 +00:00
searchQuery : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
AddQuery ( ) .
AggregateTypes ( eventstore . AggregateType ( t . Name ( ) ) ) .
AggregateIDs ( "303" ) .
Builder ( ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
client : testCRDBClient ,
2023-10-19 10:19:10 +00:00
existingEvents : [ ] eventstore . Command {
2021-01-15 08:32:59 +00:00
generateEvent ( t , "303" ) ,
generateEvent ( t , "303" ) ,
generateEvent ( t , "303" ) ,
generateEvent ( t , "304" , func ( e * repository . Event ) { e . AggregateType = "not in list" } ) ,
generateEvent ( t , "305" ) ,
2020-10-21 17:00:41 +00:00
} ,
} ,
res : res {
eventCount : 3 ,
} ,
wantErr : false ,
} ,
{
name : "resource owner filter events found" ,
args : args {
2023-10-19 10:19:10 +00:00
searchQuery : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
ResourceOwner ( "caos" ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
client : testCRDBClient ,
2023-10-19 10:19:10 +00:00
existingEvents : [ ] eventstore . Command {
2022-01-06 07:29:58 +00:00
generateEvent ( t , "306" , func ( e * repository . Event ) { e . ResourceOwner = sql . NullString { String : "caos" , Valid : true } } ) ,
generateEvent ( t , "307" , func ( e * repository . Event ) { e . ResourceOwner = sql . NullString { String : "caos" , Valid : true } } ) ,
generateEvent ( t , "308" , func ( e * repository . Event ) { e . ResourceOwner = sql . NullString { String : "caos" , Valid : true } } ) ,
generateEvent ( t , "309" , func ( e * repository . Event ) { e . ResourceOwner = sql . NullString { String : "orgID" , Valid : true } } ) ,
generateEvent ( t , "309" , func ( e * repository . Event ) { e . ResourceOwner = sql . NullString { String : "orgID" , Valid : true } } ) ,
2020-10-21 17:00:41 +00:00
} ,
} ,
res : res {
eventCount : 3 ,
} ,
wantErr : false ,
} ,
{
name : "event type filter events found" ,
args : args {
2023-10-19 10:19:10 +00:00
searchQuery : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
AddQuery ( ) .
EventTypes ( "user.created" , "user.updated" ) .
Builder ( ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
client : testCRDBClient ,
2023-10-19 10:19:10 +00:00
existingEvents : [ ] eventstore . Command {
generateEvent ( t , "311" , func ( e * repository . Event ) { e . Typ = "user.created" } ) ,
generateEvent ( t , "311" , func ( e * repository . Event ) { e . Typ = "user.updated" } ) ,
generateEvent ( t , "311" , func ( e * repository . Event ) { e . Typ = "user.deactivated" } ) ,
generateEvent ( t , "311" , func ( e * repository . Event ) { e . Typ = "user.locked" } ) ,
generateEvent ( t , "312" , func ( e * repository . Event ) { e . Typ = "user.created" } ) ,
generateEvent ( t , "312" , func ( e * repository . Event ) { e . Typ = "user.updated" } ) ,
generateEvent ( t , "312" , func ( e * repository . Event ) { e . Typ = "user.deactivated" } ) ,
generateEvent ( t , "312" , func ( e * repository . Event ) { e . Typ = "user.reactivated" } ) ,
generateEvent ( t , "313" , func ( e * repository . Event ) { e . Typ = "user.locked" } ) ,
2020-10-21 17:00:41 +00:00
} ,
} ,
res : res {
eventCount : 7 ,
} ,
wantErr : false ,
} ,
{
name : "fail because no filter" ,
args : args {
2023-10-19 10:19:10 +00:00
searchQuery : eventstore . NewSearchQueryBuilder ( eventstore . Columns ( - 1 ) ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
client : testCRDBClient ,
2023-10-19 10:19:10 +00:00
existingEvents : [ ] eventstore . Command { } ,
2020-10-21 17:00:41 +00:00
} ,
res : res {
eventCount : 0 ,
} ,
wantErr : true ,
} ,
}
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
db := & CRDB {
2023-02-27 21:36:43 +00:00
DB : & database . DB {
DB : tt . fields . client ,
Database : new ( testDB ) ,
} ,
2020-10-21 17:00:41 +00:00
}
// setup initial data for query
2023-10-19 10:19:10 +00:00
if _ , err := db . Push ( context . Background ( ) , tt . fields . existingEvents ... ) ; err != nil {
2020-10-21 17:00:41 +00:00
t . Errorf ( "error in setup = %v" , err )
return
}
2023-10-19 10:19:10 +00:00
events := [ ] eventstore . Event { }
2023-10-19 15:21:31 +00:00
if err := query ( context . Background ( ) , db , tt . args . searchQuery , eventstore . Reducer ( func ( event eventstore . Event ) error {
events = append ( events , event )
return nil
} ) , true ) ; ( err != nil ) != tt . wantErr {
2020-10-21 17:00:41 +00:00
t . Errorf ( "CRDB.query() error = %v, wantErr %v" , err , tt . wantErr )
}
} )
}
}
func Test_query_events_mocked ( t * testing . T ) {
2020-10-02 14:21:51 +00:00
type args struct {
2023-10-19 10:19:10 +00:00
query * eventstore . SearchQueryBuilder
2020-10-21 17:00:41 +00:00
dest interface { }
2020-10-02 14:21:51 +00:00
}
type res struct {
2020-10-21 17:00:41 +00:00
wantErr bool
}
type fields struct {
mock * dbMock
2020-10-02 14:21:51 +00:00
}
tests := [ ] struct {
2020-10-21 17:00:41 +00:00
name string
args args
fields fields
res res
2020-10-02 14:21:51 +00:00
} {
{
name : "with order by desc" ,
args : args {
2020-10-21 17:00:41 +00:00
dest : & [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderDesc ( ) .
AwaitOpenTransactions ( ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Builder ( ) ,
2020-10-02 14:21:51 +00:00
} ,
2020-10-21 17:00:41 +00:00
fields : fields {
mock : newMockClient ( t ) . expectQuery ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) } ,
2020-10-21 17:00:41 +00:00
) ,
} ,
2020-10-02 14:21:51 +00:00
res : res {
2020-10-21 17:00:41 +00:00
wantErr : false ,
2020-10-02 14:21:51 +00:00
} ,
} ,
{
name : "with limit" ,
args : args {
2020-10-21 17:00:41 +00:00
dest : & [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderAsc ( ) .
AwaitOpenTransactions ( ) .
Limit ( 5 ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Builder ( ) ,
2020-10-02 14:21:51 +00:00
} ,
2020-10-21 17:00:41 +00:00
fields : fields {
mock : newMockClient ( t ) . expectQuery ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence LIMIT \$2 ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) , uint64 ( 5 ) } ,
2020-10-21 17:00:41 +00:00
) ,
} ,
2020-10-02 14:21:51 +00:00
res : res {
2020-10-21 17:00:41 +00:00
wantErr : false ,
2020-10-02 14:21:51 +00:00
} ,
} ,
{
name : "with limit and order by desc" ,
args : args {
2020-10-21 17:00:41 +00:00
dest : & [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderDesc ( ) .
AwaitOpenTransactions ( ) .
Limit ( 5 ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Builder ( ) ,
2020-10-02 14:21:51 +00:00
} ,
2020-10-21 17:00:41 +00:00
fields : fields {
mock : newMockClient ( t ) . expectQuery ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC LIMIT \$2 ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) , uint64 ( 5 ) } ,
2020-10-21 17:00:41 +00:00
) ,
} ,
res : res {
wantErr : false ,
} ,
} ,
2023-02-27 21:36:43 +00:00
{
name : "with limit and order by desc as of system time" ,
args : args {
dest : & [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderDesc ( ) .
AwaitOpenTransactions ( ) .
Limit ( 5 ) .
AllowTimeTravel ( ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Builder ( ) ,
2023-02-27 21:36:43 +00:00
} ,
fields : fields {
mock : newMockClient ( t ) . expectQuery ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC LIMIT \$2 ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) , uint64 ( 5 ) } ,
2023-02-27 21:36:43 +00:00
) ,
} ,
res : res {
wantErr : false ,
} ,
} ,
2020-10-21 17:00:41 +00:00
{
name : "error sql conn closed" ,
args : args {
dest : & [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderDesc ( ) .
AwaitOpenTransactions ( ) .
Limit ( 0 ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Builder ( ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
mock : newMockClient ( t ) . expectQueryErr ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) } ,
2020-10-21 17:00:41 +00:00
sql . ErrConnDone ) ,
} ,
res : res {
wantErr : true ,
} ,
} ,
{
name : "error unexpected dest" ,
args : args {
dest : nil ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderDesc ( ) .
AwaitOpenTransactions ( ) .
Limit ( 0 ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Builder ( ) ,
2020-10-21 17:00:41 +00:00
} ,
fields : fields {
2023-08-22 12:49:02 +00:00
mock : newMockClient ( t ) . expectQueryScanErr ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) } ,
& repository . Event { Seq : 100 } ) ,
2020-10-21 17:00:41 +00:00
} ,
2020-10-02 14:21:51 +00:00
res : res {
2020-10-21 17:00:41 +00:00
wantErr : true ,
2020-10-02 14:21:51 +00:00
} ,
} ,
2020-10-05 20:03:21 +00:00
{
name : "error no columns" ,
args : args {
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . Columns ( - 1 ) ) ,
2020-10-05 20:03:21 +00:00
} ,
res : res {
2020-10-21 17:00:41 +00:00
wantErr : true ,
2020-10-05 20:03:21 +00:00
} ,
} ,
2021-07-06 11:55:57 +00:00
{
name : "with subqueries" ,
args : args {
dest : & [ ] * repository . Event { } ,
2023-10-19 10:19:10 +00:00
query : eventstore . NewSearchQueryBuilder ( eventstore . ColumnsEvent ) .
OrderDesc ( ) .
AwaitOpenTransactions ( ) .
Limit ( 5 ) .
AddQuery ( ) .
AggregateTypes ( "user" ) .
Or ( ) .
AggregateTypes ( "org" ) .
AggregateIDs ( "asdf42" ) .
Builder ( ) ,
2021-07-06 11:55:57 +00:00
} ,
fields : fields {
mock : newMockClient ( t ) . expectQuery ( t ,
2023-10-19 10:19:10 +00:00
` SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \(aggregate_type = \$1 OR \(aggregate_type = \$2 AND aggregate_id = \$3\)\) AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC LIMIT \$4 ` ,
[ ] driver . Value { eventstore . AggregateType ( "user" ) , eventstore . AggregateType ( "org" ) , "asdf42" , uint64 ( 5 ) } ,
2021-07-06 11:55:57 +00:00
) ,
} ,
res : res {
wantErr : false ,
} ,
} ,
2020-10-02 14:21:51 +00:00
}
2023-10-19 10:19:10 +00:00
crdb := NewCRDB ( & database . DB { Database : new ( testDB ) } )
2020-10-02 14:21:51 +00:00
for _ , tt := range tests {
t . Run ( tt . name , func ( t * testing . T ) {
2020-10-21 17:00:41 +00:00
if tt . fields . mock != nil {
2023-02-27 21:36:43 +00:00
crdb . DB . DB = tt . fields . mock . client
2020-10-02 14:21:51 +00:00
}
2020-10-21 17:00:41 +00:00
2023-10-19 10:19:10 +00:00
err := query ( context . Background ( ) , crdb , tt . args . query , tt . args . dest , true )
2020-10-21 17:00:41 +00:00
if ( err != nil ) != tt . res . wantErr {
t . Errorf ( "query() error = %v, wantErr %v" , err , tt . res . wantErr )
2020-10-02 14:21:51 +00:00
}
2020-10-21 17:00:41 +00:00
if tt . fields . mock == nil {
return
2020-10-02 14:21:51 +00:00
}
2020-10-21 17:00:41 +00:00
if err := tt . fields . mock . mock . ExpectationsWereMet ( ) ; err != nil {
t . Errorf ( "not all expectaions met: %v" , err )
2020-10-02 14:21:51 +00:00
}
} )
}
}
2020-10-21 17:00:41 +00:00
type dbMock struct {
mock sqlmock . Sqlmock
client * sql . DB
}
func ( m * dbMock ) expectQuery ( t * testing . T , expectedQuery string , args [ ] driver . Value , events ... * repository . Event ) * dbMock {
2023-08-22 12:49:02 +00:00
m . mock . ExpectBegin ( )
query := m . mock . ExpectQuery ( expectedQuery ) . WithArgs ( args ... )
m . mock . ExpectCommit ( )
2023-10-19 10:19:10 +00:00
rows := sqlmock . NewRows ( [ ] string { "sequence" } )
2023-08-22 12:49:02 +00:00
for _ , event := range events {
2023-10-19 10:19:10 +00:00
rows = rows . AddRow ( event . Seq )
2023-08-22 12:49:02 +00:00
}
query . WillReturnRows ( rows ) . RowsWillBeClosed ( )
return m
}
func ( m * dbMock ) expectQueryScanErr ( t * testing . T , expectedQuery string , args [ ] driver . Value , events ... * repository . Event ) * dbMock {
m . mock . ExpectBegin ( )
2020-10-21 17:00:41 +00:00
query := m . mock . ExpectQuery ( expectedQuery ) . WithArgs ( args ... )
2023-08-22 12:49:02 +00:00
m . mock . ExpectRollback ( )
2023-10-19 10:19:10 +00:00
rows := sqlmock . NewRows ( [ ] string { "sequence" } )
2020-10-21 17:00:41 +00:00
for _ , event := range events {
2023-10-19 10:19:10 +00:00
rows = rows . AddRow ( event . Seq )
2020-10-21 17:00:41 +00:00
}
query . WillReturnRows ( rows ) . RowsWillBeClosed ( )
return m
}
func ( m * dbMock ) expectQueryErr ( t * testing . T , expectedQuery string , args [ ] driver . Value , err error ) * dbMock {
2023-08-22 12:49:02 +00:00
m . mock . ExpectBegin ( )
2020-10-21 17:00:41 +00:00
m . mock . ExpectQuery ( expectedQuery ) . WithArgs ( args ... ) . WillReturnError ( err )
return m
}
func newMockClient ( t * testing . T ) * dbMock {
t . Helper ( )
db , mock , err := sqlmock . New ( )
if err != nil {
t . Errorf ( "unable to create mock client: %v" , err )
t . FailNow ( )
return nil
}
return & dbMock {
mock : mock ,
client : db ,
}
}