mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 21:47:32 +00:00
feat: implement service ping (#10080)
This PR is still WIP and needs changes to at least the tests. # Which Problems Are Solved To be able to report analytical / telemetry data from deployed Zitadel systems back to a central endpoint, we designed a "service ping" functionality. See also https://github.com/zitadel/zitadel/issues/9706. This PR adds the first implementation to allow collection base data as well as report amount of resources such as organizations, users per organization and more. # How the Problems Are Solved - Added a worker to handle the different `ReportType` variations. - Schedule a periodic job to start a `ServicePingReport` - Configuration added to allow customization of what data will be reported - Setup step to generate and store a `systemID` # Additional Changes None # Additional Context relates to #9869
This commit is contained in:
252
internal/serviceping/worker.go
Normal file
252
internal/serviceping/worker.go
Normal file
@@ -0,0 +1,252 @@
|
||||
package serviceping
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/muhlemmer/gu"
|
||||
"github.com/riverqueue/river"
|
||||
"github.com/robfig/cron/v3"
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/cmd/build"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/queue"
|
||||
"github.com/zitadel/zitadel/internal/v2/system"
|
||||
analytics "github.com/zitadel/zitadel/pkg/grpc/analytics/v2beta"
|
||||
)
|
||||
|
||||
const (
|
||||
QueueName = "service_ping_report"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidReportType = errors.New("invalid report type")
|
||||
|
||||
_ river.Worker[*ServicePingReport] = (*Worker)(nil)
|
||||
)
|
||||
|
||||
type Worker struct {
|
||||
river.WorkerDefaults[*ServicePingReport]
|
||||
|
||||
reportClient analytics.TelemetryServiceClient
|
||||
db Queries
|
||||
queue Queue
|
||||
|
||||
config *Config
|
||||
systemID string
|
||||
version string
|
||||
}
|
||||
|
||||
type Queries interface {
|
||||
SearchInstances(ctx context.Context, queries *query.InstanceSearchQueries) (*query.Instances, error)
|
||||
ListResourceCounts(ctx context.Context, lastID int, size int) ([]query.ResourceCount, error)
|
||||
}
|
||||
|
||||
type Queue interface {
|
||||
Insert(ctx context.Context, args river.JobArgs, opts ...queue.InsertOpt) error
|
||||
}
|
||||
|
||||
// Register implements the [queue.Worker] interface.
|
||||
func (w *Worker) Register(workers *river.Workers, queues map[string]river.QueueConfig) {
|
||||
river.AddWorker[*ServicePingReport](workers, w)
|
||||
queues[QueueName] = river.QueueConfig{
|
||||
MaxWorkers: 1, // for now, we only use a single worker to prevent too much side effects on other queues
|
||||
}
|
||||
}
|
||||
|
||||
// Work implements the [river.Worker] interface.
|
||||
func (w *Worker) Work(ctx context.Context, job *river.Job[*ServicePingReport]) (err error) {
|
||||
defer func() {
|
||||
err = w.handleClientError(err)
|
||||
}()
|
||||
switch job.Args.ReportType {
|
||||
case ReportTypeBaseInformation:
|
||||
reportID, err := w.reportBaseInformation(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return w.createReportJobs(ctx, reportID)
|
||||
case ReportTypeResourceCounts:
|
||||
return w.reportResourceCounts(ctx, job.Args.ReportID)
|
||||
default:
|
||||
logging.WithFields("reportType", job.Args.ReportType, "reportID", job.Args.ReportID).
|
||||
Error("unknown job type")
|
||||
return river.JobCancel(ErrInvalidReportType)
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) reportBaseInformation(ctx context.Context) (string, error) {
|
||||
instances, err := w.db.SearchInstances(ctx, &query.InstanceSearchQueries{})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
instanceInformation := instanceInformationToPb(instances)
|
||||
resp, err := w.reportClient.ReportBaseInformation(ctx, &analytics.ReportBaseInformationRequest{
|
||||
SystemId: w.systemID,
|
||||
Version: w.version,
|
||||
Instances: instanceInformation,
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return resp.GetReportId(), nil
|
||||
}
|
||||
|
||||
func (w *Worker) reportResourceCounts(ctx context.Context, reportID string) error {
|
||||
lastID := 0
|
||||
// iterate over the resource counts until there are no more counts to report
|
||||
// or the context gets cancelled
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
default:
|
||||
counts, err := w.db.ListResourceCounts(ctx, lastID, w.config.Telemetry.ResourceCount.BulkSize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// if there are no counts, we can stop the loop
|
||||
if len(counts) == 0 {
|
||||
return nil
|
||||
}
|
||||
request := &analytics.ReportResourceCountsRequest{
|
||||
SystemId: w.systemID,
|
||||
ResourceCounts: resourceCountsToPb(counts),
|
||||
}
|
||||
if reportID != "" {
|
||||
request.ReportId = gu.Ptr(reportID)
|
||||
}
|
||||
resp, err := w.reportClient.ReportResourceCounts(ctx, request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// in case the resource counts returned by the database are less than the bulk size,
|
||||
// we can assume that we have reached the end of the resource counts and can stop the loop
|
||||
if len(counts) < w.config.Telemetry.ResourceCount.BulkSize {
|
||||
return nil
|
||||
}
|
||||
// update the lastID for the next iteration
|
||||
lastID = counts[len(counts)-1].ID
|
||||
// In case we get a report ID back from the server (it could be the first call of the report),
|
||||
// we update it to use it for the next batch.
|
||||
if resp.GetReportId() != "" && resp.GetReportId() != reportID {
|
||||
reportID = resp.GetReportId()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) handleClientError(err error) error {
|
||||
telemetryError := new(TelemetryError)
|
||||
if !errors.As(err, &telemetryError) {
|
||||
// If the error is not a TelemetryError, we can assume that it is a transient error
|
||||
// and can be retried by the queue.
|
||||
return err
|
||||
}
|
||||
switch telemetryError.StatusCode {
|
||||
case http.StatusBadRequest,
|
||||
http.StatusNotFound,
|
||||
http.StatusNotImplemented,
|
||||
http.StatusConflict,
|
||||
http.StatusPreconditionFailed:
|
||||
// In case of these errors, we can assume that a retry does not make sense,
|
||||
// so we can cancel the job.
|
||||
return river.JobCancel(err)
|
||||
default:
|
||||
// As of now we assume that all other errors are transient and can be retried.
|
||||
// So we just return the error, which will be handled by the queue as a failed attempt.
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) createReportJobs(ctx context.Context, reportID string) error {
|
||||
errs := make([]error, 0)
|
||||
if w.config.Telemetry.ResourceCount.Enabled {
|
||||
err := w.addReportJob(ctx, reportID, ReportTypeResourceCounts)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
func (w *Worker) addReportJob(ctx context.Context, reportID string, reportType ReportType) error {
|
||||
job := &ServicePingReport{
|
||||
ReportID: reportID,
|
||||
ReportType: reportType,
|
||||
}
|
||||
return w.queue.Insert(ctx, job,
|
||||
queue.WithQueueName(QueueName),
|
||||
queue.WithMaxAttempts(w.config.MaxAttempts),
|
||||
)
|
||||
}
|
||||
|
||||
type systemIDReducer struct {
|
||||
id string
|
||||
}
|
||||
|
||||
func (s *systemIDReducer) Reduce() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *systemIDReducer) AppendEvents(events ...eventstore.Event) {
|
||||
for _, event := range events {
|
||||
if idEvent, ok := event.(*system.IDGeneratedEvent); ok {
|
||||
s.id = idEvent.ID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *systemIDReducer) Query() *eventstore.SearchQueryBuilder {
|
||||
return eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
AddQuery().
|
||||
AggregateTypes(system.AggregateType).
|
||||
EventTypes(system.IDGeneratedType).
|
||||
Builder()
|
||||
}
|
||||
|
||||
func Register(
|
||||
ctx context.Context,
|
||||
q *queue.Queue,
|
||||
queries *query.Queries,
|
||||
eventstoreClient *eventstore.Eventstore,
|
||||
config *Config,
|
||||
) error {
|
||||
if !config.Enabled {
|
||||
return nil
|
||||
}
|
||||
systemID := new(systemIDReducer)
|
||||
err := eventstoreClient.FilterToQueryReducer(ctx, systemID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q.AddWorkers(&Worker{
|
||||
reportClient: NewClient(config),
|
||||
db: queries,
|
||||
queue: q,
|
||||
config: config,
|
||||
systemID: systemID.id,
|
||||
version: build.Version(),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func Start(config *Config, q *queue.Queue) error {
|
||||
if !config.Enabled {
|
||||
return nil
|
||||
}
|
||||
schedule, err := cron.ParseStandard(config.Interval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q.AddPeriodicJob(
|
||||
schedule,
|
||||
&ServicePingReport{},
|
||||
queue.WithQueueName(QueueName),
|
||||
queue.WithMaxAttempts(config.MaxAttempts),
|
||||
)
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user