mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-13 19:09:16 +00:00
feat(eventstore): order by creation_date
and sequence
(#5568)
* feat(eventstore): order by `creation_date` and `sequence` * fix(logstore): use correct event type --------- Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
@@ -255,10 +255,10 @@ func (db *CRDB) db() *sql.DB {
|
|||||||
|
|
||||||
func (db *CRDB) orderByEventSequence(desc bool) string {
|
func (db *CRDB) orderByEventSequence(desc bool) string {
|
||||||
if desc {
|
if desc {
|
||||||
return " ORDER BY event_sequence DESC"
|
return " ORDER BY creation_date DESC, event_sequence DESC"
|
||||||
}
|
}
|
||||||
|
|
||||||
return " ORDER BY event_sequence"
|
return " ORDER BY creation_date, event_sequence"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *CRDB) eventQuery() string {
|
func (db *CRDB) eventQuery() string {
|
||||||
|
@@ -1166,7 +1166,7 @@ func TestCRDB_Push_ResourceOwner(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rows, err := testCRDBClient.Query("SELECT resource_owner FROM eventstore.events WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY event_sequence", tt.fields.aggregateType, tt.fields.aggregateIDs)
|
rows, err := testCRDBClient.Query("SELECT resource_owner FROM eventstore.events WHERE aggregate_type = $1 AND aggregate_id = ANY($2) ORDER BY creation_date, event_sequence", tt.fields.aggregateType, tt.fields.aggregateIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error("unable to query inserted rows: ", err)
|
t.Error("unable to query inserted rows: ", err)
|
||||||
return
|
return
|
||||||
|
@@ -595,7 +595,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(t,
|
mock: newMockClient(t).expectQuery(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC`,
|
||||||
[]driver.Value{repository.AggregateType("user")},
|
[]driver.Value{repository.AggregateType("user")},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@@ -624,7 +624,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(t,
|
mock: newMockClient(t).expectQuery(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence LIMIT \$2`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date, event_sequence LIMIT \$2`,
|
||||||
[]driver.Value{repository.AggregateType("user"), uint64(5)},
|
[]driver.Value{repository.AggregateType("user"), uint64(5)},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@@ -653,7 +653,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(t,
|
mock: newMockClient(t).expectQuery(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC LIMIT \$2`,
|
||||||
[]driver.Value{repository.AggregateType("user"), uint64(5)},
|
[]driver.Value{repository.AggregateType("user"), uint64(5)},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@@ -683,7 +683,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(t,
|
mock: newMockClient(t).expectQuery(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC LIMIT \$2`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC LIMIT \$2`,
|
||||||
[]driver.Value{repository.AggregateType("user"), uint64(5)},
|
[]driver.Value{repository.AggregateType("user"), uint64(5)},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
@@ -712,7 +712,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQueryErr(t,
|
mock: newMockClient(t).expectQueryErr(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC`,
|
||||||
[]driver.Value{repository.AggregateType("user")},
|
[]driver.Value{repository.AggregateType("user")},
|
||||||
sql.ErrConnDone),
|
sql.ErrConnDone),
|
||||||
},
|
},
|
||||||
@@ -741,7 +741,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(t,
|
mock: newMockClient(t).expectQuery(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY event_sequence DESC`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) ORDER BY creation_date DESC, event_sequence DESC`,
|
||||||
[]driver.Value{repository.AggregateType("user")},
|
[]driver.Value{repository.AggregateType("user")},
|
||||||
&repository.Event{Sequence: 100}),
|
&repository.Event{Sequence: 100}),
|
||||||
},
|
},
|
||||||
@@ -809,7 +809,7 @@ func Test_query_events_mocked(t *testing.T) {
|
|||||||
},
|
},
|
||||||
fields: fields{
|
fields: fields{
|
||||||
mock: newMockClient(t).expectQuery(t,
|
mock: newMockClient(t).expectQuery(t,
|
||||||
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) OR \( aggregate_type = \$2 AND aggregate_id = \$3 \) ORDER BY event_sequence DESC LIMIT \$4`,
|
`SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, previous_aggregate_type_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \( aggregate_type = \$1 \) OR \( aggregate_type = \$2 AND aggregate_id = \$3 \) ORDER BY creation_date DESC, event_sequence DESC LIMIT \$4`,
|
||||||
[]driver.Value{repository.AggregateType("user"), repository.AggregateType("org"), "asdf42", uint64(5)},
|
[]driver.Value{repository.AggregateType("user"), repository.AggregateType("org"), "asdf42", uint64(5)},
|
||||||
),
|
),
|
||||||
},
|
},
|
||||||
|
@@ -17,11 +17,11 @@ const (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
eventColumns = []string{"creation_date", "event_type", "event_sequence", "previous_aggregate_sequence", "event_data", "editor_service", "editor_user", "resource_owner", "instance_id", "aggregate_type", "aggregate_id", "aggregate_version"}
|
eventColumns = []string{"creation_date", "event_type", "event_sequence", "previous_aggregate_sequence", "event_data", "editor_service", "editor_user", "resource_owner", "instance_id", "aggregate_type", "aggregate_id", "aggregate_version"}
|
||||||
expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY event_sequence LIMIT \$2`).String()
|
expectedFilterEventsLimitFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY creation_date, event_sequence LIMIT \$2`).String()
|
||||||
expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY event_sequence DESC`).String()
|
expectedFilterEventsDescFormat = regexp.MustCompile(selectEscaped + ` \) ORDER BY creation_date DESC, event_sequence DESC`).String()
|
||||||
expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY event_sequence LIMIT \$3`).String()
|
expectedFilterEventsAggregateIDLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY creation_date, event_sequence LIMIT \$3`).String()
|
||||||
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY event_sequence LIMIT \$3`).String()
|
expectedFilterEventsAggregateIDTypeLimit = regexp.MustCompile(selectEscaped + ` AND aggregate_id = \$2 \) ORDER BY creation_date, event_sequence LIMIT \$3`).String()
|
||||||
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` \) ORDER BY event_sequence`).String()
|
expectedGetAllEvents = regexp.MustCompile(selectEscaped + ` \) ORDER BY creation_date, event_sequence`).String()
|
||||||
|
|
||||||
expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` +
|
expectedInsertStatement = regexp.MustCompile(`INSERT INTO eventstore\.events ` +
|
||||||
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, instance_id, previous_aggregate_sequence, previous_aggregate_type_sequence\) ` +
|
`\(event_type, aggregate_type, aggregate_id, aggregate_version, creation_date, event_data, editor_user, editor_service, resource_owner, instance_id, previous_aggregate_sequence, previous_aggregate_type_sequence\) ` +
|
||||||
|
@@ -51,10 +51,11 @@ func buildQuery(ctx context.Context, db dialect.Database, queryFactory *es_model
|
|||||||
query += where
|
query += where
|
||||||
|
|
||||||
if searchQuery.Columns == es_models.Columns_Event {
|
if searchQuery.Columns == es_models.Columns_Event {
|
||||||
query += " ORDER BY event_sequence"
|
order := " ORDER BY creation_date, event_sequence"
|
||||||
if searchQuery.Desc {
|
if searchQuery.Desc {
|
||||||
query += " DESC"
|
order = " ORDER BY creation_date DESC, event_sequence DESC"
|
||||||
}
|
}
|
||||||
|
query += order
|
||||||
}
|
}
|
||||||
|
|
||||||
if searchQuery.Limit > 0 {
|
if searchQuery.Limit > 0 {
|
||||||
|
@@ -436,7 +436,7 @@ func Test_buildQuery(t *testing.T) {
|
|||||||
queryFactory: es_models.NewSearchQueryFactory().OrderDesc().AddQuery().AggregateTypes("user").Factory(),
|
queryFactory: es_models.NewSearchQueryFactory().OrderDesc().AddQuery().AggregateTypes("user").Factory(),
|
||||||
},
|
},
|
||||||
res: res{
|
res: res{
|
||||||
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY event_sequence DESC",
|
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY creation_date DESC, event_sequence DESC",
|
||||||
rowScanner: true,
|
rowScanner: true,
|
||||||
values: []interface{}{es_models.AggregateType("user")},
|
values: []interface{}{es_models.AggregateType("user")},
|
||||||
},
|
},
|
||||||
@@ -447,7 +447,7 @@ func Test_buildQuery(t *testing.T) {
|
|||||||
queryFactory: es_models.NewSearchQueryFactory().Limit(5).AddQuery().AggregateTypes("user").Factory(),
|
queryFactory: es_models.NewSearchQueryFactory().Limit(5).AddQuery().AggregateTypes("user").Factory(),
|
||||||
},
|
},
|
||||||
res: res{
|
res: res{
|
||||||
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY event_sequence LIMIT $2",
|
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY creation_date, event_sequence LIMIT $2",
|
||||||
rowScanner: true,
|
rowScanner: true,
|
||||||
values: []interface{}{es_models.AggregateType("user"), uint64(5)},
|
values: []interface{}{es_models.AggregateType("user"), uint64(5)},
|
||||||
limit: 5,
|
limit: 5,
|
||||||
@@ -459,7 +459,7 @@ func Test_buildQuery(t *testing.T) {
|
|||||||
queryFactory: es_models.NewSearchQueryFactory().Limit(5).OrderDesc().AddQuery().AggregateTypes("user").Factory(),
|
queryFactory: es_models.NewSearchQueryFactory().Limit(5).OrderDesc().AddQuery().AggregateTypes("user").Factory(),
|
||||||
},
|
},
|
||||||
res: res{
|
res: res{
|
||||||
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY event_sequence DESC LIMIT $2",
|
query: "SELECT creation_date, event_type, event_sequence, previous_aggregate_sequence, event_data, editor_service, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE ( aggregate_type = $1 ) ORDER BY creation_date DESC, event_sequence DESC LIMIT $2",
|
||||||
rowScanner: true,
|
rowScanner: true,
|
||||||
values: []interface{}{es_models.AggregateType("user"), uint64(5)},
|
values: []interface{}{es_models.AggregateType("user"), uint64(5)},
|
||||||
limit: 5,
|
limit: 5,
|
||||||
|
Reference in New Issue
Block a user