mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-12 03:47:33 +00:00
fix(eventstore): set application name during push to instance id (#8918)
# Which Problems Are Solved Noisy neighbours can introduce projection latencies because the projections only query events older than the start timestamp of the oldest push transaction. # How the Problems Are Solved During push we set the application name to `zitadel_es_pusher_<instance_id>` instead of `zitadel_es_pusher` which is used to query events by projections.
This commit is contained in:
@@ -402,8 +402,8 @@ func Test_prepareCondition(t *testing.T) {
|
||||
useV1: true,
|
||||
},
|
||||
res: res{
|
||||
clause: " WHERE aggregate_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher')",
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}},
|
||||
clause: " WHERE aggregate_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))",
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -419,8 +419,8 @@ func Test_prepareCondition(t *testing.T) {
|
||||
},
|
||||
},
|
||||
res: res{
|
||||
clause: ` WHERE aggregate_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher')`,
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}},
|
||||
clause: ` WHERE aggregate_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))`,
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, database.TextArray[string]{}},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -439,8 +439,8 @@ func Test_prepareCondition(t *testing.T) {
|
||||
useV1: true,
|
||||
},
|
||||
res: res{
|
||||
clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher')",
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}},
|
||||
clause: " WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND creation_date::TIMESTAMP < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))",
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}},
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -458,8 +458,8 @@ func Test_prepareCondition(t *testing.T) {
|
||||
},
|
||||
},
|
||||
res: res{
|
||||
clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = 'zitadel_es_pusher')`,
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}},
|
||||
clause: ` WHERE aggregate_type = ANY(?) AND aggregate_id = ? AND event_type = ANY(?) AND hlc_to_timestamp("position") < (SELECT COALESCE(MIN(start), NOW())::TIMESTAMP FROM crdb_internal.cluster_transactions where application_name = ANY(?))`,
|
||||
values: []interface{}{[]eventstore.AggregateType{"user", "org"}, "1234", []eventstore.EventType{"user.created", "org.created"}, database.TextArray[string]{}},
|
||||
},
|
||||
},
|
||||
}
|
||||
@@ -687,8 +687,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQuery(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC`,
|
||||
[]driver.Value{eventstore.AggregateType("user")},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
|
||||
),
|
||||
},
|
||||
res: res{
|
||||
@@ -709,8 +709,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQuery(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence LIMIT \$2`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence LIMIT \$3`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
|
||||
),
|
||||
},
|
||||
res: res{
|
||||
@@ -731,8 +731,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQuery(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC LIMIT \$2`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC LIMIT \$3`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
|
||||
),
|
||||
},
|
||||
res: res{
|
||||
@@ -754,8 +754,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQuery(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC LIMIT \$2`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), uint64(5)},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events AS OF SYSTEM TIME '-1 ms' WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC LIMIT \$3`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}, uint64(5)},
|
||||
),
|
||||
},
|
||||
res: res{
|
||||
@@ -776,8 +776,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQueryErr(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC`,
|
||||
[]driver.Value{eventstore.AggregateType("user")},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
|
||||
sql.ErrConnDone),
|
||||
},
|
||||
res: res{
|
||||
@@ -798,8 +798,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQueryScanErr(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC`,
|
||||
[]driver.Value{eventstore.AggregateType("user")},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE aggregate_type = \$1 AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$2\)\) ORDER BY event_sequence DESC`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), database.TextArray[string]{}},
|
||||
&repository.Event{Seq: 100}),
|
||||
},
|
||||
res: res{
|
||||
@@ -832,8 +832,8 @@ func Test_query_events_mocked(t *testing.T) {
|
||||
},
|
||||
fields: fields{
|
||||
mock: newMockClient(t).expectQuery(t,
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \(aggregate_type = \$1 OR \(aggregate_type = \$2 AND aggregate_id = \$3\)\) AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = 'zitadel_es_pusher'\) ORDER BY event_sequence DESC LIMIT \$4`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", uint64(5)},
|
||||
`SELECT creation_date, event_type, event_sequence, event_data, editor_user, resource_owner, instance_id, aggregate_type, aggregate_id, aggregate_version FROM eventstore.events WHERE \(aggregate_type = \$1 OR \(aggregate_type = \$2 AND aggregate_id = \$3\)\) AND creation_date::TIMESTAMP < \(SELECT COALESCE\(MIN\(start\), NOW\(\)\)::TIMESTAMP FROM crdb_internal\.cluster_transactions where application_name = ANY\(\$4\)\) ORDER BY event_sequence DESC LIMIT \$5`,
|
||||
[]driver.Value{eventstore.AggregateType("user"), eventstore.AggregateType("org"), "asdf42", database.TextArray[string]{}, uint64(5)},
|
||||
),
|
||||
},
|
||||
res: res{
|
||||
|
Reference in New Issue
Block a user