From f93a35c7a8817e227009f26722d3221361295c08 Mon Sep 17 00:00:00 2001 From: Livio Spring Date: Wed, 2 Jul 2025 07:57:41 -0400 Subject: [PATCH] feat: implement service ping (#10080) This PR is still WIP and needs changes to at least the tests. # Which Problems Are Solved To be able to report analytical / telemetry data from deployed Zitadel systems back to a central endpoint, we designed a "service ping" functionality. See also https://github.com/zitadel/zitadel/issues/9706. This PR adds the first implementation to allow collection base data as well as report amount of resources such as organizations, users per organization and more. # How the Problems Are Solved - Added a worker to handle the different `ReportType` variations. - Schedule a periodic job to start a `ServicePingReport` - Configuration added to allow customization of what data will be reported - Setup step to generate and store a `systemID` # Additional Changes None # Additional Context relates to #9869 --- cmd/defaults.yaml | 31 + cmd/setup/60.go | 27 + cmd/setup/config.go | 1 + cmd/setup/setup.go | 2 + cmd/start/config.go | 2 + cmd/start/start.go | 11 + go.mod | 1 + internal/queue/queue.go | 21 + internal/serviceping/client.go | 153 +++ internal/serviceping/config.go | 18 + internal/serviceping/mock/mock_gen.go | 5 + internal/serviceping/mock/queries.mock.go | 72 ++ internal/serviceping/mock/queue.mock.go | 62 ++ internal/serviceping/mock/telemetry.mock.go | 83 ++ internal/serviceping/report.go | 17 + internal/serviceping/worker.go | 252 +++++ internal/serviceping/worker_test.go | 1052 +++++++++++++++++++ internal/v2/system/event.go | 44 + 18 files changed, 1854 insertions(+) create mode 100644 cmd/setup/60.go create mode 100644 internal/serviceping/client.go create mode 100644 internal/serviceping/config.go create mode 100644 internal/serviceping/mock/mock_gen.go create mode 100644 internal/serviceping/mock/queries.mock.go create mode 100644 internal/serviceping/mock/queue.mock.go create mode 100644 internal/serviceping/mock/telemetry.mock.go create mode 100644 internal/serviceping/report.go create mode 100644 internal/serviceping/worker.go create mode 100644 internal/serviceping/worker_test.go create mode 100644 internal/v2/system/event.go diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index f1fc6a2414..f88616b821 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -1203,6 +1203,37 @@ DefaultInstance: # If an audit log retention is set using an instance limit, it will overwrite the system default. AuditLogRetention: 0s # ZITADEL_AUDITLOGRETENTION +# The ServicePing are periodic reports of analytics data and the usage of ZITADEL. +# It is sent to a central endpoint to help us improve ZITADEL. +# It's enabled by default, but you can opt out either completely or by disabling specific telemetry data. +ServicePing: + # By setting Enabled to false, the service ping is disabled completely. + Enabled: true # ZITADEL_SERVICEPING_ENABLED + # The endpoint to which the reports are sent. The endpoint is used as a base path. Individual reports are sent to the endpoint with a specific path. + Endpoint: "https://zitadel.cloud/api/ping" # ZITADEL_SERVICEPING_ENDPOINT + # Interval at which the service ping is sent to the endpoint. + # The interval is in the format of a cron expression. + # By default, it is set to every day at midnight: + Interval: "0 0 * * *" # ZITADEL_SERVICEPING_INTERVAL + # Maximum number of attempts for each individual report to be sent. + # If one report fails, it will be retried up to this number of times. + # Other reports will still be handled in parallel and have their own retry count. + # This means if the base information only succeeded after 3 attempts, + # the resource count still has 5 attempts to be sent. + MaxAttempts: 5 # ZITADEL_SERVICEPING_MAXATTEMPTS + # The following features can be enabled or disabled individually. + # By default, all features are enabled. + # Note that if the service ping is enabled, base information about the system is always sent. + # This includes the version and the id, creation date and domains of all instances. + # If you disable a feature, it will not be sent in the service ping. + # Some features provide additional configuration options, if enabled. + Telemetry: + # ResourceCount is a periodic report of the number of resources in ZITADEL. + # This includes the number of users, organizations, projects, and other resources. + ResourceCount: + Enabled: true # ZITADEL_SERVICEPING_TELEMETRY_RESOURCECOUNT_ENABLED + BulkSize: 10000 # ZITADEL_SERVICEPING_TELEMETRY_RESOURCECOUNT_BULKSIZE + InternalAuthZ: # Configure the RolePermissionMappings by environment variable using JSON notation: # ZITADEL_INTERNALAUTHZ_ROLEPERMISSIONMAPPINGS='[{"role": "IAM_OWNER", "permissions": ["iam.write"]}, {"role": "ORG_OWNER", "permissions": ["org.write"]}]' diff --git a/cmd/setup/60.go b/cmd/setup/60.go new file mode 100644 index 0000000000..3f606c2212 --- /dev/null +++ b/cmd/setup/60.go @@ -0,0 +1,27 @@ +package setup + +import ( + "context" + _ "embed" + + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/serviceping" + "github.com/zitadel/zitadel/internal/v2/system" +) + +type GenerateSystemID struct { + eventstore *eventstore.Eventstore +} + +func (mig *GenerateSystemID) Execute(ctx context.Context, _ eventstore.Event) error { + id, err := serviceping.GenerateSystemID() + if err != nil { + return err + } + _, err = mig.eventstore.Push(ctx, system.NewIDGeneratedEvent(ctx, id)) + return err +} + +func (mig *GenerateSystemID) String() string { + return "60_generate_system_id" +} diff --git a/cmd/setup/config.go b/cmd/setup/config.go index 7385cc7652..bac73b0ae5 100644 --- a/cmd/setup/config.go +++ b/cmd/setup/config.go @@ -156,6 +156,7 @@ type Steps struct { s57CreateResourceCounts *CreateResourceCounts s58ReplaceLoginNames3View *ReplaceLoginNames3View s59SetupWebkeys *SetupWebkeys + s60GenerateSystemID *GenerateSystemID } func MustNewSteps(v *viper.Viper) *Steps { diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index dd23c320c7..15236a73e9 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -217,6 +217,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s56IDPTemplate6SAMLFederatedLogout = &IDPTemplate6SAMLFederatedLogout{dbClient: dbClient} steps.s57CreateResourceCounts = &CreateResourceCounts{dbClient: dbClient} steps.s58ReplaceLoginNames3View = &ReplaceLoginNames3View{dbClient: dbClient} + steps.s60GenerateSystemID = &GenerateSystemID{eventstore: eventstoreClient} err = projection.Create(ctx, dbClient, eventstoreClient, config.Projections, nil, nil, nil) logging.OnError(err).Fatal("unable to start projections") @@ -264,6 +265,7 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) steps.s56IDPTemplate6SAMLFederatedLogout, steps.s57CreateResourceCounts, steps.s58ReplaceLoginNames3View, + steps.s60GenerateSystemID, } { setupErr = executeMigration(ctx, eventstoreClient, step, "migration failed") if setupErr != nil { diff --git a/cmd/start/config.go b/cmd/start/config.go index 78b6f0afe0..c680bf7c05 100644 --- a/cmd/start/config.go +++ b/cmd/start/config.go @@ -32,6 +32,7 @@ import ( "github.com/zitadel/zitadel/internal/logstore" "github.com/zitadel/zitadel/internal/notification/handlers" "github.com/zitadel/zitadel/internal/query/projection" + "github.com/zitadel/zitadel/internal/serviceping" static_config "github.com/zitadel/zitadel/internal/static/config" metrics "github.com/zitadel/zitadel/internal/telemetry/metrics/config" profiler "github.com/zitadel/zitadel/internal/telemetry/profiler/config" @@ -81,6 +82,7 @@ type Config struct { LogStore *logstore.Configs Quotas *QuotasConfig Telemetry *handlers.TelemetryPusherConfig + ServicePing *serviceping.Config } type QuotasConfig struct { diff --git a/cmd/start/start.go b/cmd/start/start.go index dbd6289041..06f3554a58 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -99,6 +99,7 @@ import ( "github.com/zitadel/zitadel/internal/notification" "github.com/zitadel/zitadel/internal/query" "github.com/zitadel/zitadel/internal/queue" + "github.com/zitadel/zitadel/internal/serviceping" "github.com/zitadel/zitadel/internal/static" es_v4 "github.com/zitadel/zitadel/internal/v2/eventstore" es_v4_pg "github.com/zitadel/zitadel/internal/v2/eventstore/postgres" @@ -317,10 +318,20 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server ) execution.Start(ctx) + // the service ping and it's workers need to be registered before starting the queue + if err := serviceping.Register(ctx, q, queries, eventstoreClient, config.ServicePing); err != nil { + return err + } + if err = q.Start(ctx); err != nil { return err } + // the scheduler / periodic jobs need to be started after the queue already runs + if err = serviceping.Start(config.ServicePing, q); err != nil { + return err + } + router := mux.NewRouter() tlsConfig, err := config.TLS.Config() if err != nil { diff --git a/go.mod b/go.mod index 9d02050b48..f0ab6246d6 100644 --- a/go.mod +++ b/go.mod @@ -67,6 +67,7 @@ require ( github.com/riverqueue/river/riverdriver v0.22.0 github.com/riverqueue/river/rivertype v0.22.0 github.com/riverqueue/rivercontrib/otelriver v0.5.0 + github.com/robfig/cron/v3 v3.0.1 github.com/rs/cors v1.11.1 github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 github.com/shopspring/decimal v1.3.1 diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 22df8c2b5c..44e291bf4d 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -9,6 +9,7 @@ import ( "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivertype" "github.com/riverqueue/rivercontrib/otelriver" + "github.com/robfig/cron/v3" "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/database" @@ -75,6 +76,26 @@ func (q *Queue) AddWorkers(w ...Worker) { } } +func (q *Queue) AddPeriodicJob(schedule cron.Schedule, jobArgs river.JobArgs, opts ...InsertOpt) (handle rivertype.PeriodicJobHandle) { + if q == nil { + logging.Info("skip adding periodic job because queue is not set") + return + } + options := new(river.InsertOpts) + for _, opt := range opts { + opt(options) + } + return q.client.PeriodicJobs().Add( + river.NewPeriodicJob( + schedule, + func() (river.JobArgs, *river.InsertOpts) { + return jobArgs, options + }, + nil, + ), + ) +} + type InsertOpt func(*river.InsertOpts) func WithMaxAttempts(maxAttempts uint8) InsertOpt { diff --git a/internal/serviceping/client.go b/internal/serviceping/client.go new file mode 100644 index 0000000000..87711aada6 --- /dev/null +++ b/internal/serviceping/client.go @@ -0,0 +1,153 @@ +package serviceping + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/base64" + "fmt" + "io" + "net/http" + + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/query" + analytics "github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta" +) + +const ( + pathBaseInformation = "/instances" + pathResourceCounts = "/resource_counts" +) + +type Client struct { + httpClient *http.Client + endpoint string +} + +func (c Client) ReportBaseInformation(ctx context.Context, in *analytics.ReportBaseInformationRequest, opts ...grpc.CallOption) (*analytics.ReportBaseInformationResponse, error) { + reportResponse := new(analytics.ReportBaseInformationResponse) + err := c.callTelemetryService(ctx, pathBaseInformation, in, reportResponse) + if err != nil { + return nil, err + } + return reportResponse, nil +} + +func (c Client) ReportResourceCounts(ctx context.Context, in *analytics.ReportResourceCountsRequest, opts ...grpc.CallOption) (*analytics.ReportResourceCountsResponse, error) { + reportResponse := new(analytics.ReportResourceCountsResponse) + err := c.callTelemetryService(ctx, pathResourceCounts, in, reportResponse) + if err != nil { + return nil, err + } + return reportResponse, nil +} + +func (c Client) callTelemetryService(ctx context.Context, path string, in proto.Message, out proto.Message) error { + requestBody, err := protojson.Marshal(in) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint+path, bytes.NewReader(requestBody)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + + if resp.StatusCode != http.StatusOK { + return &TelemetryError{ + StatusCode: resp.StatusCode, + Body: body, + } + } + + return protojson.UnmarshalOptions{ + AllowPartial: true, + DiscardUnknown: true, + }.Unmarshal(body, out) +} + +func NewClient(config *Config) Client { + return Client{ + httpClient: http.DefaultClient, + endpoint: config.Endpoint, + } +} + +func GenerateSystemID() (string, error) { + randBytes := make([]byte, 64) + if _, err := rand.Read(randBytes); err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(randBytes), nil +} + +func instanceInformationToPb(instances *query.Instances) []*analytics.InstanceInformation { + instanceInformation := make([]*analytics.InstanceInformation, len(instances.Instances)) + for i, instance := range instances.Instances { + domains := instanceDomainToPb(instance) + instanceInformation[i] = &analytics.InstanceInformation{ + Id: instance.ID, + Domains: domains, + CreatedAt: timestamppb.New(instance.CreationDate), + } + } + return instanceInformation +} + +func instanceDomainToPb(instance *query.Instance) []string { + domains := make([]string, len(instance.Domains)) + for i, domain := range instance.Domains { + domains[i] = domain.Domain + } + return domains +} + +func resourceCountsToPb(counts []query.ResourceCount) []*analytics.ResourceCount { + resourceCounts := make([]*analytics.ResourceCount, len(counts)) + for i, count := range counts { + resourceCounts[i] = &analytics.ResourceCount{ + InstanceId: count.InstanceID, + ParentType: countParentTypeToPb(count.ParentType), + ParentId: count.ParentID, + ResourceName: count.Resource, + TableName: count.TableName, + UpdatedAt: timestamppb.New(count.UpdatedAt), + Amount: uint32(count.Amount), + } + } + return resourceCounts +} + +func countParentTypeToPb(parentType domain.CountParentType) analytics.CountParentType { + switch parentType { + case domain.CountParentTypeInstance: + return analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE + case domain.CountParentTypeOrganization: + return analytics.CountParentType_COUNT_PARENT_TYPE_ORGANIZATION + default: + return analytics.CountParentType_COUNT_PARENT_TYPE_UNSPECIFIED + } +} + +type TelemetryError struct { + StatusCode int + Body []byte +} + +func (e *TelemetryError) Error() string { + return fmt.Sprintf("telemetry error %d: %s", e.StatusCode, e.Body) +} diff --git a/internal/serviceping/config.go b/internal/serviceping/config.go new file mode 100644 index 0000000000..13f2311324 --- /dev/null +++ b/internal/serviceping/config.go @@ -0,0 +1,18 @@ +package serviceping + +type Config struct { + Enabled bool + Endpoint string + Interval string + MaxAttempts uint8 + Telemetry TelemetryConfig +} + +type TelemetryConfig struct { + ResourceCount ResourceCount +} + +type ResourceCount struct { + Enabled bool + BulkSize int +} diff --git a/internal/serviceping/mock/mock_gen.go b/internal/serviceping/mock/mock_gen.go new file mode 100644 index 0000000000..6b4d2defbe --- /dev/null +++ b/internal/serviceping/mock/mock_gen.go @@ -0,0 +1,5 @@ +package mock + +//go:generate mockgen -package mock -destination queue.mock.go github.com/zitadel/zitadel/internal/serviceping Queue +//go:generate mockgen -package mock -destination queries.mock.go github.com/zitadel/zitadel/internal/serviceping Queries +//go:generate mockgen -package mock -destination telemetry.mock.go github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta TelemetryServiceClient diff --git a/internal/serviceping/mock/queries.mock.go b/internal/serviceping/mock/queries.mock.go new file mode 100644 index 0000000000..593c4d5ff7 --- /dev/null +++ b/internal/serviceping/mock/queries.mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/zitadel/zitadel/internal/serviceping (interfaces: Queries) +// +// Generated by this command: +// +// mockgen -package mock -destination queries.mock.go github.com/zitadel/zitadel/internal/serviceping Queries +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + query "github.com/zitadel/zitadel/internal/query" + gomock "go.uber.org/mock/gomock" +) + +// MockQueries is a mock of Queries interface. +type MockQueries struct { + ctrl *gomock.Controller + recorder *MockQueriesMockRecorder + isgomock struct{} +} + +// MockQueriesMockRecorder is the mock recorder for MockQueries. +type MockQueriesMockRecorder struct { + mock *MockQueries +} + +// NewMockQueries creates a new mock instance. +func NewMockQueries(ctrl *gomock.Controller) *MockQueries { + mock := &MockQueries{ctrl: ctrl} + mock.recorder = &MockQueriesMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueries) EXPECT() *MockQueriesMockRecorder { + return m.recorder +} + +// ListResourceCounts mocks base method. +func (m *MockQueries) ListResourceCounts(ctx context.Context, lastID, size int) ([]query.ResourceCount, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListResourceCounts", ctx, lastID, size) + ret0, _ := ret[0].([]query.ResourceCount) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListResourceCounts indicates an expected call of ListResourceCounts. +func (mr *MockQueriesMockRecorder) ListResourceCounts(ctx, lastID, size any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListResourceCounts", reflect.TypeOf((*MockQueries)(nil).ListResourceCounts), ctx, lastID, size) +} + +// SearchInstances mocks base method. +func (m *MockQueries) SearchInstances(ctx context.Context, queries *query.InstanceSearchQueries) (*query.Instances, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SearchInstances", ctx, queries) + ret0, _ := ret[0].(*query.Instances) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SearchInstances indicates an expected call of SearchInstances. +func (mr *MockQueriesMockRecorder) SearchInstances(ctx, queries any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SearchInstances", reflect.TypeOf((*MockQueries)(nil).SearchInstances), ctx, queries) +} diff --git a/internal/serviceping/mock/queue.mock.go b/internal/serviceping/mock/queue.mock.go new file mode 100644 index 0000000000..e984352a8c --- /dev/null +++ b/internal/serviceping/mock/queue.mock.go @@ -0,0 +1,62 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/zitadel/zitadel/internal/serviceping (interfaces: Queue) +// +// Generated by this command: +// +// mockgen -package mock -destination queue.mock.go github.com/zitadel/zitadel/internal/serviceping Queue +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + river "github.com/riverqueue/river" + queue "github.com/zitadel/zitadel/internal/queue" + gomock "go.uber.org/mock/gomock" +) + +// MockQueue is a mock of Queue interface. +type MockQueue struct { + ctrl *gomock.Controller + recorder *MockQueueMockRecorder + isgomock struct{} +} + +// MockQueueMockRecorder is the mock recorder for MockQueue. +type MockQueueMockRecorder struct { + mock *MockQueue +} + +// NewMockQueue creates a new mock instance. +func NewMockQueue(ctrl *gomock.Controller) *MockQueue { + mock := &MockQueue{ctrl: ctrl} + mock.recorder = &MockQueueMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockQueue) EXPECT() *MockQueueMockRecorder { + return m.recorder +} + +// Insert mocks base method. +func (m *MockQueue) Insert(ctx context.Context, args river.JobArgs, opts ...queue.InsertOpt) error { + m.ctrl.T.Helper() + varargs := []any{ctx, args} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Insert", varargs...) + ret0, _ := ret[0].(error) + return ret0 +} + +// Insert indicates an expected call of Insert. +func (mr *MockQueueMockRecorder) Insert(ctx, args any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, args}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Insert", reflect.TypeOf((*MockQueue)(nil).Insert), varargs...) +} diff --git a/internal/serviceping/mock/telemetry.mock.go b/internal/serviceping/mock/telemetry.mock.go new file mode 100644 index 0000000000..536bf34671 --- /dev/null +++ b/internal/serviceping/mock/telemetry.mock.go @@ -0,0 +1,83 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta (interfaces: TelemetryServiceClient) +// +// Generated by this command: +// +// mockgen -package mock -destination telemetry.mock.go github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta TelemetryServiceClient +// + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + analytics "github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta" + gomock "go.uber.org/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockTelemetryServiceClient is a mock of TelemetryServiceClient interface. +type MockTelemetryServiceClient struct { + ctrl *gomock.Controller + recorder *MockTelemetryServiceClientMockRecorder + isgomock struct{} +} + +// MockTelemetryServiceClientMockRecorder is the mock recorder for MockTelemetryServiceClient. +type MockTelemetryServiceClientMockRecorder struct { + mock *MockTelemetryServiceClient +} + +// NewMockTelemetryServiceClient creates a new mock instance. +func NewMockTelemetryServiceClient(ctrl *gomock.Controller) *MockTelemetryServiceClient { + mock := &MockTelemetryServiceClient{ctrl: ctrl} + mock.recorder = &MockTelemetryServiceClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTelemetryServiceClient) EXPECT() *MockTelemetryServiceClientMockRecorder { + return m.recorder +} + +// ReportBaseInformation mocks base method. +func (m *MockTelemetryServiceClient) ReportBaseInformation(ctx context.Context, in *analytics.ReportBaseInformationRequest, opts ...grpc.CallOption) (*analytics.ReportBaseInformationResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReportBaseInformation", varargs...) + ret0, _ := ret[0].(*analytics.ReportBaseInformationResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportBaseInformation indicates an expected call of ReportBaseInformation. +func (mr *MockTelemetryServiceClientMockRecorder) ReportBaseInformation(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportBaseInformation", reflect.TypeOf((*MockTelemetryServiceClient)(nil).ReportBaseInformation), varargs...) +} + +// ReportResourceCounts mocks base method. +func (m *MockTelemetryServiceClient) ReportResourceCounts(ctx context.Context, in *analytics.ReportResourceCountsRequest, opts ...grpc.CallOption) (*analytics.ReportResourceCountsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ReportResourceCounts", varargs...) + ret0, _ := ret[0].(*analytics.ReportResourceCountsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReportResourceCounts indicates an expected call of ReportResourceCounts. +func (mr *MockTelemetryServiceClientMockRecorder) ReportResourceCounts(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportResourceCounts", reflect.TypeOf((*MockTelemetryServiceClient)(nil).ReportResourceCounts), varargs...) +} diff --git a/internal/serviceping/report.go b/internal/serviceping/report.go new file mode 100644 index 0000000000..d31f6a8f74 --- /dev/null +++ b/internal/serviceping/report.go @@ -0,0 +1,17 @@ +package serviceping + +type ReportType uint + +const ( + ReportTypeBaseInformation ReportType = iota + ReportTypeResourceCounts +) + +type ServicePingReport struct { + ReportID string + ReportType ReportType +} + +func (r *ServicePingReport) Kind() string { + return "service_ping_report" +} diff --git a/internal/serviceping/worker.go b/internal/serviceping/worker.go new file mode 100644 index 0000000000..0156373170 --- /dev/null +++ b/internal/serviceping/worker.go @@ -0,0 +1,252 @@ +package serviceping + +import ( + "context" + "errors" + "net/http" + + "github.com/muhlemmer/gu" + "github.com/riverqueue/river" + "github.com/robfig/cron/v3" + "github.com/zitadel/logging" + + "github.com/zitadel/zitadel/cmd/build" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/queue" + "github.com/zitadel/zitadel/internal/v2/system" + analytics "github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta" +) + +const ( + QueueName = "service_ping_report" +) + +var ( + ErrInvalidReportType = errors.New("invalid report type") + + _ river.Worker[*ServicePingReport] = (*Worker)(nil) +) + +type Worker struct { + river.WorkerDefaults[*ServicePingReport] + + reportClient analytics.TelemetryServiceClient + db Queries + queue Queue + + config *Config + systemID string + version string +} + +type Queries interface { + SearchInstances(ctx context.Context, queries *query.InstanceSearchQueries) (*query.Instances, error) + ListResourceCounts(ctx context.Context, lastID int, size int) ([]query.ResourceCount, error) +} + +type Queue interface { + Insert(ctx context.Context, args river.JobArgs, opts ...queue.InsertOpt) error +} + +// Register implements the [queue.Worker] interface. +func (w *Worker) Register(workers *river.Workers, queues map[string]river.QueueConfig) { + river.AddWorker[*ServicePingReport](workers, w) + queues[QueueName] = river.QueueConfig{ + MaxWorkers: 1, // for now, we only use a single worker to prevent too much side effects on other queues + } +} + +// Work implements the [river.Worker] interface. +func (w *Worker) Work(ctx context.Context, job *river.Job[*ServicePingReport]) (err error) { + defer func() { + err = w.handleClientError(err) + }() + switch job.Args.ReportType { + case ReportTypeBaseInformation: + reportID, err := w.reportBaseInformation(ctx) + if err != nil { + return err + } + return w.createReportJobs(ctx, reportID) + case ReportTypeResourceCounts: + return w.reportResourceCounts(ctx, job.Args.ReportID) + default: + logging.WithFields("reportType", job.Args.ReportType, "reportID", job.Args.ReportID). + Error("unknown job type") + return river.JobCancel(ErrInvalidReportType) + } +} + +func (w *Worker) reportBaseInformation(ctx context.Context) (string, error) { + instances, err := w.db.SearchInstances(ctx, &query.InstanceSearchQueries{}) + if err != nil { + return "", err + } + instanceInformation := instanceInformationToPb(instances) + resp, err := w.reportClient.ReportBaseInformation(ctx, &analytics.ReportBaseInformationRequest{ + SystemId: w.systemID, + Version: w.version, + Instances: instanceInformation, + }) + if err != nil { + return "", err + } + return resp.GetReportId(), nil +} + +func (w *Worker) reportResourceCounts(ctx context.Context, reportID string) error { + lastID := 0 + // iterate over the resource counts until there are no more counts to report + // or the context gets cancelled + for { + select { + case <-ctx.Done(): + return nil + default: + counts, err := w.db.ListResourceCounts(ctx, lastID, w.config.Telemetry.ResourceCount.BulkSize) + if err != nil { + return err + } + // if there are no counts, we can stop the loop + if len(counts) == 0 { + return nil + } + request := &analytics.ReportResourceCountsRequest{ + SystemId: w.systemID, + ResourceCounts: resourceCountsToPb(counts), + } + if reportID != "" { + request.ReportId = gu.Ptr(reportID) + } + resp, err := w.reportClient.ReportResourceCounts(ctx, request) + if err != nil { + return err + } + // in case the resource counts returned by the database are less than the bulk size, + // we can assume that we have reached the end of the resource counts and can stop the loop + if len(counts) < w.config.Telemetry.ResourceCount.BulkSize { + return nil + } + // update the lastID for the next iteration + lastID = counts[len(counts)-1].ID + // In case we get a report ID back from the server (it could be the first call of the report), + // we update it to use it for the next batch. + if resp.GetReportId() != "" && resp.GetReportId() != reportID { + reportID = resp.GetReportId() + } + } + } +} + +func (w *Worker) handleClientError(err error) error { + telemetryError := new(TelemetryError) + if !errors.As(err, &telemetryError) { + // If the error is not a TelemetryError, we can assume that it is a transient error + // and can be retried by the queue. + return err + } + switch telemetryError.StatusCode { + case http.StatusBadRequest, + http.StatusNotFound, + http.StatusNotImplemented, + http.StatusConflict, + http.StatusPreconditionFailed: + // In case of these errors, we can assume that a retry does not make sense, + // so we can cancel the job. + return river.JobCancel(err) + default: + // As of now we assume that all other errors are transient and can be retried. + // So we just return the error, which will be handled by the queue as a failed attempt. + return err + } +} + +func (w *Worker) createReportJobs(ctx context.Context, reportID string) error { + errs := make([]error, 0) + if w.config.Telemetry.ResourceCount.Enabled { + err := w.addReportJob(ctx, reportID, ReportTypeResourceCounts) + if err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +func (w *Worker) addReportJob(ctx context.Context, reportID string, reportType ReportType) error { + job := &ServicePingReport{ + ReportID: reportID, + ReportType: reportType, + } + return w.queue.Insert(ctx, job, + queue.WithQueueName(QueueName), + queue.WithMaxAttempts(w.config.MaxAttempts), + ) +} + +type systemIDReducer struct { + id string +} + +func (s *systemIDReducer) Reduce() error { + return nil +} + +func (s *systemIDReducer) AppendEvents(events ...eventstore.Event) { + for _, event := range events { + if idEvent, ok := event.(*system.IDGeneratedEvent); ok { + s.id = idEvent.ID + } + } +} + +func (s *systemIDReducer) Query() *eventstore.SearchQueryBuilder { + return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + AddQuery(). + AggregateTypes(system.AggregateType). + EventTypes(system.IDGeneratedType). + Builder() +} + +func Register( + ctx context.Context, + q *queue.Queue, + queries *query.Queries, + eventstoreClient *eventstore.Eventstore, + config *Config, +) error { + if !config.Enabled { + return nil + } + systemID := new(systemIDReducer) + err := eventstoreClient.FilterToQueryReducer(ctx, systemID) + if err != nil { + return err + } + q.AddWorkers(&Worker{ + reportClient: NewClient(config), + db: queries, + queue: q, + config: config, + systemID: systemID.id, + version: build.Version(), + }) + return nil +} + +func Start(config *Config, q *queue.Queue) error { + if !config.Enabled { + return nil + } + schedule, err := cron.ParseStandard(config.Interval) + if err != nil { + return err + } + q.AddPeriodicJob( + schedule, + &ServicePingReport{}, + queue.WithQueueName(QueueName), + queue.WithMaxAttempts(config.MaxAttempts), + ) + return nil +} diff --git a/internal/serviceping/worker_test.go b/internal/serviceping/worker_test.go new file mode 100644 index 0000000000..373eee9b6e --- /dev/null +++ b/internal/serviceping/worker_test.go @@ -0,0 +1,1052 @@ +package serviceping + +import ( + "context" + "fmt" + "net/http" + "reflect" + "testing" + "time" + + "github.com/muhlemmer/gu" + "github.com/riverqueue/river" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/zitadel/zitadel/internal/domain" + "github.com/zitadel/zitadel/internal/query" + "github.com/zitadel/zitadel/internal/queue" + "github.com/zitadel/zitadel/internal/serviceping/mock" + "github.com/zitadel/zitadel/internal/zerrors" + analytics "github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta" +) + +var ( + testNow = time.Now() + errInsert = fmt.Errorf("insert error") +) + +func TestWorker_reportBaseInformation(t *testing.T) { + type fields struct { + reportClient func(*testing.T) analytics.TelemetryServiceClient + db func(*testing.T) Queries + systemID string + version string + } + type args struct { + ctx context.Context + } + type want struct { + reportID string + err error + } + tests := []struct { + name string + fields fields + args args + want want + }{ + { + name: "database error, error", + fields: fields{ + db: func(t *testing.T) Queries { + ctrl := gomock.NewController(t) + queries := mock.NewMockQueries(ctrl) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + nil, zerrors.ThrowInternal(nil, "id", "db error"), + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + return mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + }, + }, + want: want{ + reportID: "", + err: zerrors.ThrowInternal(nil, "id", "db error"), + }, + }, + { + name: "telemetry client error, error", + fields: fields{ + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportBaseInformation(gomock.Any(), &analytics.ReportBaseInformationRequest{ + SystemId: "system-id", + Version: "version", + Instances: []*analytics.InstanceInformation{ + { + Id: "id", + Domains: []string{"domain", "domain2"}, + CreatedAt: timestamppb.New(testNow), + }, + }, + }).Return( + nil, status.Error(codes.Internal, "error"), + ) + return client + }, + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + &query.Instances{ + Instances: []*query.Instance{ + { + ID: "id", + CreationDate: testNow, + Domains: []*query.InstanceDomain{ + { + Domain: "domain", + }, + { + Domain: "domain2", + }, + }, + }, + }, + }, + nil, + ) + return queries + }, + systemID: "system-id", + version: "version", + }, + want: want{ + reportID: "", + err: status.Error(codes.Internal, "error"), + }, + }, + { + name: "report ok, reportID returned", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + &query.Instances{ + Instances: []*query.Instance{ + { + ID: "id", + CreationDate: testNow, + Domains: []*query.InstanceDomain{ + { + Domain: "domain", + }, + { + Domain: "domain2", + }, + }, + }, + }, + }, + nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportBaseInformation(gomock.Any(), &analytics.ReportBaseInformationRequest{ + SystemId: "system-id", + Version: "version", + Instances: []*analytics.InstanceInformation{ + { + Id: "id", + Domains: []string{"domain", "domain2"}, + CreatedAt: timestamppb.New(testNow), + }, + }, + }).Return( + &analytics.ReportBaseInformationResponse{ReportId: "report-id"}, nil, + ) + return client + }, + systemID: "system-id", + version: "version", + }, + want: want{ + reportID: "report-id", + err: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &Worker{ + reportClient: tt.fields.reportClient(t), + db: tt.fields.db(t), + systemID: tt.fields.systemID, + version: tt.fields.version, + } + got, err := w.reportBaseInformation(tt.args.ctx) + assert.Equal(t, tt.want.reportID, got) + assert.ErrorIs(t, err, tt.want.err) + }) + } +} + +func TestWorker_reportResourceCounts(t *testing.T) { + type fields struct { + reportClient func(*testing.T) analytics.TelemetryServiceClient + db func(*testing.T) Queries + config *Config + systemID string + } + type args struct { + ctx context.Context + reportID string + } + tests := []struct { + name string + fields fields + args args + wantErr error + }{ + { + name: "database error, error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 1).Return( + nil, zerrors.ThrowInternal(nil, "id", "db error"), + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + return mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 1, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + reportID: "", + }, + wantErr: zerrors.ThrowInternal(nil, "id", "db error"), + }, + { + name: "no resource counts, no error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 1).Return( + []query.ResourceCount{}, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + return mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 1, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + reportID: "", + }, + wantErr: nil, + }, + { + name: "telemetry client error, error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 2).Return( + []query.ResourceCount{ + { + ID: 1, + InstanceID: "instance-id", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id", + Resource: "resource", + UpdatedAt: testNow, + Amount: 10, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: nil, + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 10, + }, + }, + }).Return( + nil, status.Error(codes.Internal, "error"), + ) + return client + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 2, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + reportID: "", + }, + wantErr: status.Error(codes.Internal, "error"), + }, + { + name: "report ok, no additional counts, no error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 2).Return( + []query.ResourceCount{ + { + ID: 1, + InstanceID: "instance-id", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id", + Resource: "resource", + UpdatedAt: testNow, + Amount: 10, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: nil, + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 10, + }, + }, + }).Return( + &analytics.ReportResourceCountsResponse{ + ReportId: "report-id", + }, nil, + ) + return client + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 2, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + reportID: "", + }, + wantErr: nil, + }, + { + name: "report ok, additional counts, no error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 2).Return( + []query.ResourceCount{ + { + ID: 1, + InstanceID: "instance-id", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id", + Resource: "resource", + UpdatedAt: testNow, + Amount: 10, + }, + { + ID: 2, + InstanceID: "instance-id2", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id2", + Resource: "resource", + UpdatedAt: testNow, + Amount: 5, + }, + }, nil, + ) + queries.EXPECT().ListResourceCounts(gomock.Any(), 2, 2).Return( + []query.ResourceCount{ + { + ID: 3, + InstanceID: "instance-id3", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id3", + Resource: "resource", + UpdatedAt: testNow, + Amount: 20, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: nil, + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 10, + }, + { + InstanceId: "instance-id2", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id2", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 5, + }, + }, + }).Return( + &analytics.ReportResourceCountsResponse{ + ReportId: "report-id", + }, nil, + ) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: gu.Ptr("report-id"), + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id3", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id3", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 20, + }, + }, + }).Return( + &analytics.ReportResourceCountsResponse{ + ReportId: "report-id", + }, nil, + ) + return client + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 2, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + reportID: "", + }, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &Worker{ + reportClient: tt.fields.reportClient(t), + db: tt.fields.db(t), + config: tt.fields.config, + systemID: tt.fields.systemID, + } + err := w.reportResourceCounts(tt.args.ctx, tt.args.reportID) + assert.ErrorIs(t, err, tt.wantErr) + }) + } +} + +func TestWorker_Work(t *testing.T) { + type fields struct { + WorkerDefaults river.WorkerDefaults[*ServicePingReport] + reportClient func(*testing.T) analytics.TelemetryServiceClient + db func(*testing.T) Queries + queue func(*testing.T) Queue + config *Config + systemID string + version string + } + type args struct { + ctx context.Context + job *river.Job[*ServicePingReport] + } + tests := []struct { + name string + fields fields + args args + wantErr error + }{ + { + name: "unknown report type, cancel job", + fields: fields{ + db: func(t *testing.T) Queries { + return mock.NewMockQueries(gomock.NewController(t)) + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + return mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: 100000, + }, + }, + }, + wantErr: river.JobCancel(ErrInvalidReportType), + }, + { + name: "report base information, database error, retry job", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + nil, zerrors.ThrowInternal(nil, "id", "db error"), + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + return mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: ReportTypeBaseInformation, + }, + }, + }, + wantErr: zerrors.ThrowInternal(nil, "id", "db error"), + }, + { + name: "report base information, config error, cancel job", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + &query.Instances{ + Instances: []*query.Instance{ + { + ID: "id", + CreationDate: testNow, + Domains: []*query.InstanceDomain{ + { + Domain: "domain", + }, + }, + }, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportBaseInformation(gomock.Any(), &analytics.ReportBaseInformationRequest{ + SystemId: "system-id", + Version: "version", + Instances: []*analytics.InstanceInformation{ + { + Id: "id", + Domains: []string{"domain"}, + CreatedAt: timestamppb.New(testNow), + }, + }, + }).Return( + nil, &TelemetryError{StatusCode: http.StatusNotFound, Body: []byte("endpoint not found")}, + ) + return client + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + systemID: "system-id", + version: "version", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: ReportTypeBaseInformation, + }, + }, + }, + wantErr: river.JobCancel(&TelemetryError{StatusCode: http.StatusNotFound, Body: []byte("endpoint not found")}), + }, + { + name: "report base information, no reports enabled, no error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + &query.Instances{ + Instances: []*query.Instance{ + { + ID: "id", + CreationDate: testNow, + Domains: []*query.InstanceDomain{ + { + Domain: "domain", + }, + }, + }, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportBaseInformation(gomock.Any(), &analytics.ReportBaseInformationRequest{ + SystemId: "system-id", + Version: "version", + Instances: []*analytics.InstanceInformation{ + { + Id: "id", + Domains: []string{"domain"}, + CreatedAt: timestamppb.New(testNow), + }, + }, + }).Return( + &analytics.ReportBaseInformationResponse{ + ReportId: "report-id", + }, nil, + ) + return client + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + Enabled: false, + }, + }, + }, + systemID: "system-id", + version: "version", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: ReportTypeBaseInformation, + }, + }, + }, + }, + { + name: "report base information, job creation error, cancel job", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + &query.Instances{ + Instances: []*query.Instance{ + { + ID: "id", + CreationDate: testNow, + Domains: []*query.InstanceDomain{ + { + Domain: "domain", + }, + }, + }, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportBaseInformation(gomock.Any(), &analytics.ReportBaseInformationRequest{ + SystemId: "system-id", + Version: "version", + Instances: []*analytics.InstanceInformation{ + { + Id: "id", + Domains: []string{"domain"}, + CreatedAt: timestamppb.New(testNow), + }, + }, + }).Return( + &analytics.ReportBaseInformationResponse{ + ReportId: "report-id", + }, nil, + ) + return client + }, + queue: func(t *testing.T) Queue { + q := mock.NewMockQueue(gomock.NewController(t)) + q.EXPECT().Insert(gomock.Any(), + &ServicePingReport{ + ReportID: "report-id", + ReportType: ReportTypeResourceCounts, + }, + gomock.AssignableToTypeOf(reflect.TypeOf(queue.WithQueueName(QueueName))), + gomock.AssignableToTypeOf(reflect.TypeOf(queue.WithMaxAttempts(5)))). // TODO: better solution + Return(errInsert) + return q + }, + config: &Config{ + MaxAttempts: 5, + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + Enabled: true, + }, + }, + }, + systemID: "system-id", + version: "version", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: ReportTypeBaseInformation, + }, + }, + }, + wantErr: errInsert, + }, + { + name: "report base information, success, no error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().SearchInstances(gomock.Any(), &query.InstanceSearchQueries{}).Return( + &query.Instances{ + Instances: []*query.Instance{ + { + ID: "id", + CreationDate: testNow, + Domains: []*query.InstanceDomain{ + { + Domain: "domain", + }, + }, + }, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportBaseInformation(gomock.Any(), &analytics.ReportBaseInformationRequest{ + SystemId: "system-id", + Version: "version", + Instances: []*analytics.InstanceInformation{ + { + Id: "id", + Domains: []string{"domain"}, + CreatedAt: timestamppb.New(testNow), + }, + }, + }).Return( + &analytics.ReportBaseInformationResponse{ + ReportId: "report-id", + }, nil, + ) + return client + }, + queue: func(t *testing.T) Queue { + q := mock.NewMockQueue(gomock.NewController(t)) + q.EXPECT().Insert(gomock.Any(), + &ServicePingReport{ + ReportID: "report-id", + ReportType: ReportTypeResourceCounts, + }, + gomock.AssignableToTypeOf(reflect.TypeOf(queue.WithQueueName(QueueName))), + gomock.AssignableToTypeOf(reflect.TypeOf(queue.WithMaxAttempts(5)))). + Return(nil) + return q + }, + config: &Config{ + MaxAttempts: 5, + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + Enabled: true, + }, + }, + }, + systemID: "system-id", + version: "version", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: ReportTypeBaseInformation, + }, + }, + }, + }, + { + name: "report resource counts, service unavailable, retry job", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 2).Return( + []query.ResourceCount{ + { + ID: 1, + InstanceID: "instance-id", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id", + Resource: "resource", + UpdatedAt: testNow, + Amount: 10, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: gu.Ptr("report-id"), + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 10, + }, + }, + }).Return( + nil, status.Error(codes.Unavailable, "service unavailable"), + ) + return client + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 2, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportType: ReportTypeResourceCounts, + ReportID: "report-id", + }, + }, + }, + wantErr: status.Error(codes.Unavailable, "service unavailable"), + }, + { + name: "report resource counts, precondition error, cancel job", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 2).Return( + []query.ResourceCount{ + { + ID: 1, + InstanceID: "instance-id", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id", + Resource: "resource", + UpdatedAt: testNow, + Amount: 10, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: gu.Ptr("report-id"), + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 10, + }, + }, + }).Return( + nil, &TelemetryError{StatusCode: http.StatusPreconditionFailed, Body: []byte("report too old")}, + ) + return client + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 2, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportID: "report-id", + ReportType: ReportTypeResourceCounts, + }, + }, + }, + wantErr: river.JobCancel(&TelemetryError{StatusCode: http.StatusPreconditionFailed, Body: []byte("report too old")}), + }, + { + name: "report resource counts, success, no error", + fields: fields{ + db: func(t *testing.T) Queries { + queries := mock.NewMockQueries(gomock.NewController(t)) + queries.EXPECT().ListResourceCounts(gomock.Any(), 0, 2).Return( + []query.ResourceCount{ + { + ID: 1, + InstanceID: "instance-id", + TableName: "table_name", + ParentType: domain.CountParentTypeInstance, + ParentID: "instance-id", + Resource: "resource", + UpdatedAt: testNow, + Amount: 10, + }, + }, nil, + ) + return queries + }, + reportClient: func(t *testing.T) analytics.TelemetryServiceClient { + client := mock.NewMockTelemetryServiceClient(gomock.NewController(t)) + client.EXPECT().ReportResourceCounts(gomock.Any(), &analytics.ReportResourceCountsRequest{ + SystemId: "system-id", + ReportId: gu.Ptr("report-id"), + ResourceCounts: []*analytics.ResourceCount{ + { + InstanceId: "instance-id", + TableName: "table_name", + ParentType: analytics.CountParentType_COUNT_PARENT_TYPE_INSTANCE, + ParentId: "instance-id", + ResourceName: "resource", + UpdatedAt: timestamppb.New(testNow), + Amount: 10, + }, + }, + }).Return( + &analytics.ReportResourceCountsResponse{ + ReportId: "report-id", + }, nil, + ) + return client + }, + queue: func(t *testing.T) Queue { + return mock.NewMockQueue(gomock.NewController(t)) + }, + config: &Config{ + Telemetry: TelemetryConfig{ + ResourceCount: ResourceCount{ + BulkSize: 2, + }, + }, + }, + systemID: "system-id", + }, + args: args{ + ctx: context.Background(), + job: &river.Job[*ServicePingReport]{ + Args: &ServicePingReport{ + ReportID: "report-id", + ReportType: ReportTypeResourceCounts, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := &Worker{ + WorkerDefaults: river.WorkerDefaults[*ServicePingReport]{}, + reportClient: tt.fields.reportClient(t), + db: tt.fields.db(t), + queue: tt.fields.queue(t), + config: tt.fields.config, + systemID: tt.fields.systemID, + version: tt.fields.version, + } + err := w.Work(tt.args.ctx, tt.args.job) + assert.ErrorIs(t, err, tt.wantErr) + }) + } +} diff --git a/internal/v2/system/event.go b/internal/v2/system/event.go new file mode 100644 index 0000000000..313c0fb293 --- /dev/null +++ b/internal/v2/system/event.go @@ -0,0 +1,44 @@ +package system + +import ( + "context" + + "github.com/zitadel/zitadel/internal/eventstore" +) + +func init() { + eventstore.RegisterFilterEventMapper(AggregateType, IDGeneratedType, eventstore.GenericEventMapper[IDGeneratedEvent]) +} + +const IDGeneratedType = AggregateType + ".id.generated" + +type IDGeneratedEvent struct { + eventstore.BaseEvent `json:"-"` + + ID string `json:"id"` +} + +func (e *IDGeneratedEvent) SetBaseEvent(b *eventstore.BaseEvent) { + e.BaseEvent = *b +} + +func (e *IDGeneratedEvent) Payload() interface{} { + return e +} + +func (e *IDGeneratedEvent) UniqueConstraints() []*eventstore.UniqueConstraint { + return nil +} + +func NewIDGeneratedEvent( + ctx context.Context, + id string, +) *IDGeneratedEvent { + return &IDGeneratedEvent{ + BaseEvent: *eventstore.NewBaseEventForPush( + ctx, + eventstore.NewAggregate(ctx, AggregateOwner, AggregateType, "v1"), + IDGeneratedType), + ID: id, + } +}