mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:07:30 +00:00
feat: push telemetry (#6027)
* document analytics config
* rework configuration and docs
* describe HandleActiveInstances better
* describe active instances on quotas better
* only projected events are considered
* cleanup
* describe changes at runtime
* push milestones
* stop tracking events
* calculate and push 4 in 6 milestones
* reduce milestone pushed
* remove docs
* fix scheduled pseudo event projection
* push 5 in 6 milestones
* push 6 in 6 milestones
* ignore client ids
* fix text array contains
* push human readable milestone type
* statement unit tests
* improve dev and db performance
* organize imports
* cleanup
* organize imports
* test projection
* check rows.Err()
* test search query
* pass linting
* review
* test 4 milestones
* simplify milestone by instance ids query
* use type NamespacedCondition
* cleanup
* lint
* lint
* dont overwrite original error
* no opt-in in examples
* cleanup
* prerelease
* enable request headers
* make limit configurable
* review fixes
* only requeue special handlers secondly
* include integration tests
* Revert "include integration tests"
This reverts commit 96db9504ec
.
* pass reducers
* test handlers
* fix unit test
* feat: increment version
* lint
* remove prerelease
* fix integration tests
This commit is contained in:
@@ -21,10 +21,8 @@ func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel,
|
||||
|
||||
logging.Debug("successfully initialized webhook json channel")
|
||||
return channels.HandleMessageFunc(func(message channels.Message) error {
|
||||
|
||||
requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
msg, ok := message.(*messages.JSON)
|
||||
if !ok {
|
||||
return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON")
|
||||
@@ -33,27 +31,24 @@ func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cfg.Headers != nil {
|
||||
req.Header = cfg.Headers
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = resp.Body.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status")
|
||||
}
|
||||
|
||||
logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called")
|
||||
return nil
|
||||
}), nil
|
||||
|
@@ -1,12 +1,14 @@
|
||||
package webhook
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
CallURL string
|
||||
Method string
|
||||
Headers http.Header
|
||||
}
|
||||
|
||||
func (w *Config) Validate() error {
|
||||
|
@@ -4,16 +4,15 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/repository/user"
|
||||
)
|
||||
|
||||
func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, eventTypes ...eventstore.EventType) (bool, error) {
|
||||
func (n *NotificationQueries) IsAlreadyHandled(ctx context.Context, event eventstore.Event, data map[string]interface{}, aggregateType eventstore.AggregateType, eventTypes ...eventstore.EventType) (bool, error) {
|
||||
events, err := n.es.Filter(
|
||||
ctx,
|
||||
eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
|
||||
InstanceID(event.Aggregate().InstanceID).
|
||||
AddQuery().
|
||||
AggregateTypes(user.AggregateType).
|
||||
AggregateTypes(aggregateType).
|
||||
AggregateIDs(event.Aggregate().ID).
|
||||
SequenceGreater(event.Sequence()).
|
||||
EventTypes(eventTypes...).
|
||||
|
30
internal/notification/handlers/handlers_integration_test.go
Normal file
30
internal/notification/handlers/handlers_integration_test.go
Normal file
@@ -0,0 +1,30 @@
|
||||
//go:build integration
|
||||
|
||||
package handlers_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/integration"
|
||||
)
|
||||
|
||||
var (
|
||||
CTX context.Context
|
||||
SystemCTX context.Context
|
||||
Tester *integration.Tester
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
os.Exit(func() int {
|
||||
ctx, _, cancel := integration.Contexts(5 * time.Minute)
|
||||
CTX = ctx
|
||||
defer cancel()
|
||||
Tester = integration.NewTester(ctx)
|
||||
SystemCTX = Tester.WithAuthorization(ctx, integration.SystemUser)
|
||||
defer Tester.Done()
|
||||
return m.Run()
|
||||
}())
|
||||
}
|
@@ -68,7 +68,7 @@ func (u *quotaNotifier) reduceNotificationDue(event eventstore.Event) (*handler.
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-DLxdE", "reduce.wrong.event.type %s", quota.NotificationDueEventType)
|
||||
}
|
||||
ctx := HandlerContext(event.Aggregate())
|
||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.NotifiedEventType)
|
||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"dueEventID": e.ID}, quota.AggregateType, quota.NotifiedEventType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
150
internal/notification/handlers/telemetry_pusher.go
Normal file
150
internal/notification/handlers/telemetry_pusher.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/zitadel/logging"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/api/call"
|
||||
"github.com/zitadel/zitadel/internal/command"
|
||||
"github.com/zitadel/zitadel/internal/errors"
|
||||
"github.com/zitadel/zitadel/internal/eventstore"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/handler"
|
||||
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
|
||||
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
|
||||
_ "github.com/zitadel/zitadel/internal/notification/statik"
|
||||
"github.com/zitadel/zitadel/internal/notification/types"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/query/projection"
|
||||
"github.com/zitadel/zitadel/internal/repository/milestone"
|
||||
"github.com/zitadel/zitadel/internal/repository/pseudo"
|
||||
)
|
||||
|
||||
const (
|
||||
TelemetryProjectionTable = "projections.telemetry"
|
||||
)
|
||||
|
||||
type TelemetryPusherConfig struct {
|
||||
Enabled bool
|
||||
Endpoints []string
|
||||
Headers http.Header
|
||||
Limit uint64
|
||||
}
|
||||
|
||||
type telemetryPusher struct {
|
||||
crdb.StatementHandler
|
||||
cfg TelemetryPusherConfig
|
||||
commands *command.Commands
|
||||
queries *NotificationQueries
|
||||
metricSuccessfulDeliveriesJSON string
|
||||
metricFailedDeliveriesJSON string
|
||||
}
|
||||
|
||||
func NewTelemetryPusher(
|
||||
ctx context.Context,
|
||||
telemetryCfg TelemetryPusherConfig,
|
||||
handlerCfg crdb.StatementHandlerConfig,
|
||||
commands *command.Commands,
|
||||
queries *NotificationQueries,
|
||||
metricSuccessfulDeliveriesJSON,
|
||||
metricFailedDeliveriesJSON string,
|
||||
) *telemetryPusher {
|
||||
p := new(telemetryPusher)
|
||||
handlerCfg.ProjectionName = TelemetryProjectionTable
|
||||
handlerCfg.Reducers = p.reducers()
|
||||
p.cfg = telemetryCfg
|
||||
p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg)
|
||||
p.commands = commands
|
||||
p.queries = queries
|
||||
p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON
|
||||
p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON
|
||||
projection.TelemetryPusherProjection = p
|
||||
return p
|
||||
}
|
||||
|
||||
func (t *telemetryPusher) reducers() []handler.AggregateReducer {
|
||||
return []handler.AggregateReducer{{
|
||||
Aggregate: pseudo.AggregateType,
|
||||
EventRedusers: []handler.EventReducer{{
|
||||
Event: pseudo.ScheduledEventType,
|
||||
Reduce: t.pushMilestones,
|
||||
}},
|
||||
}}
|
||||
}
|
||||
|
||||
func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) {
|
||||
ctx := call.WithTimestamp(context.Background())
|
||||
scheduledEvent, ok := event.(*pseudo.ScheduledEvent)
|
||||
if !ok {
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type())
|
||||
}
|
||||
|
||||
isReached, err := query.NewNotNullQuery(query.MilestoneReachedDateColID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
isNotPushed, err := query.NewIsNullQuery(query.MilestonePushedDateColID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
hasPrimaryDomain, err := query.NewNotNullQuery(query.MilestonePrimaryDomainColID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
unpushedMilestones, err := t.queries.Queries.SearchMilestones(ctx, scheduledEvent.InstanceIDs, &query.MilestonesSearchQueries{
|
||||
SearchRequest: query.SearchRequest{
|
||||
Limit: t.cfg.Limit,
|
||||
SortingColumn: query.MilestoneReachedDateColID,
|
||||
Asc: true,
|
||||
},
|
||||
Queries: []query.SearchQuery{isReached, isNotPushed, hasPrimaryDomain},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var errs int
|
||||
for _, ms := range unpushedMilestones.Milestones {
|
||||
if err = t.pushMilestone(ctx, scheduledEvent, ms); err != nil {
|
||||
errs++
|
||||
logging.Warnf("pushing milestone %+v failed: %s", *ms, err.Error())
|
||||
}
|
||||
}
|
||||
if errs > 0 {
|
||||
return nil, fmt.Errorf("pushing %d of %d milestones failed", errs, unpushedMilestones.Count)
|
||||
}
|
||||
|
||||
return crdb.NewNoOpStatement(scheduledEvent), nil
|
||||
}
|
||||
|
||||
func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.ScheduledEvent, ms *query.Milestone) error {
|
||||
ctx = authz.WithInstanceID(ctx, ms.InstanceID)
|
||||
alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"type": ms.Type.String()}, milestone.AggregateType, milestone.PushedEventType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if alreadyHandled {
|
||||
return nil
|
||||
}
|
||||
for _, endpoint := range t.cfg.Endpoints {
|
||||
if err := types.SendJSON(
|
||||
ctx,
|
||||
webhook.Config{
|
||||
CallURL: endpoint,
|
||||
Method: http.MethodPost,
|
||||
Headers: t.cfg.Headers,
|
||||
},
|
||||
t.queries.GetFileSystemProvider,
|
||||
t.queries.GetLogProvider,
|
||||
ms,
|
||||
event,
|
||||
t.metricSuccessfulDeliveriesJSON,
|
||||
t.metricFailedDeliveriesJSON,
|
||||
).WithoutTemplate(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return t.commands.MilestonePushed(ctx, ms.Type, t.cfg.Endpoints, ms.PrimaryDomain)
|
||||
}
|
@@ -0,0 +1,89 @@
|
||||
//go:build integration
|
||||
|
||||
package handlers_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/pkg/grpc/management"
|
||||
"github.com/zitadel/zitadel/pkg/grpc/system"
|
||||
)
|
||||
|
||||
func TestServer_TelemetryPushMilestones(t *testing.T) {
|
||||
bodies := make(chan []byte, 0)
|
||||
mockServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
body, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if r.Header.Get("single-value") != "single-value" {
|
||||
t.Error("single-value header not set")
|
||||
}
|
||||
if reflect.DeepEqual(r.Header.Get("multi-value"), "multi-value-1,multi-value-2") {
|
||||
t.Error("single-value header not set")
|
||||
}
|
||||
bodies <- body
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
listener, err := net.Listen("tcp", "localhost:8081")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mockServer.Listener = listener
|
||||
mockServer.Start()
|
||||
t.Cleanup(mockServer.Close)
|
||||
primaryDomain, instanceID, iamOwnerCtx := Tester.UseIsolatedInstance(CTX, SystemCTX)
|
||||
t.Log("testing against instance with primary domain", primaryDomain)
|
||||
awaitMilestone(t, bodies, primaryDomain, "InstanceCreated")
|
||||
project, err := Tester.Client.Mgmt.AddProject(iamOwnerCtx, &management.AddProjectRequest{Name: "integration"})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
awaitMilestone(t, bodies, primaryDomain, "ProjectCreated")
|
||||
if _, err = Tester.Client.Mgmt.AddOIDCApp(iamOwnerCtx, &management.AddOIDCAppRequest{
|
||||
ProjectId: project.GetId(),
|
||||
Name: "integration",
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
awaitMilestone(t, bodies, primaryDomain, "ApplicationCreated")
|
||||
// TODO: trigger and await milestone AuthenticationSucceededOnInstance
|
||||
// TODO: trigger and await milestone AuthenticationSucceededOnApplication
|
||||
if _, err = Tester.Client.System.RemoveInstance(SystemCTX, &system.RemoveInstanceRequest{InstanceId: instanceID}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
awaitMilestone(t, bodies, primaryDomain, "InstanceDeleted")
|
||||
}
|
||||
|
||||
func awaitMilestone(t *testing.T, bodies chan []byte, primaryDomain, expectMilestoneType string) {
|
||||
for {
|
||||
select {
|
||||
case body := <-bodies:
|
||||
plain := new(bytes.Buffer)
|
||||
if err := json.Indent(plain, body, "", " "); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Log("received milestone", plain.String())
|
||||
milestone := struct {
|
||||
Type string
|
||||
PrimaryDomain string
|
||||
}{}
|
||||
if err := json.Unmarshal(body, &milestone); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
if milestone.Type == expectMilestoneType && milestone.PrimaryDomain == primaryDomain {
|
||||
return
|
||||
}
|
||||
case <-time.After(60 * time.Second):
|
||||
t.Fatalf("timed out waiting for milestone %s in domain %s", expectMilestoneType, primaryDomain)
|
||||
}
|
||||
}
|
||||
}
|
@@ -337,7 +337,7 @@ func (u *userNotifier) reduceDomainClaimed(event eventstore.Event) (*handler.Sta
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Drh5w", "reduce.wrong.event.type %s", user.UserDomainClaimedType)
|
||||
}
|
||||
ctx := HandlerContext(event.Aggregate())
|
||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil,
|
||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType,
|
||||
user.UserDomainClaimedType, user.UserDomainClaimedSentType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -465,7 +465,7 @@ func (u *userNotifier) reducePasswordChanged(event eventstore.Event) (*handler.S
|
||||
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-Yko2z8", "reduce.wrong.event.type %s", user.HumanPasswordChangedType)
|
||||
}
|
||||
ctx := HandlerContext(event.Aggregate())
|
||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.HumanPasswordChangeSentType)
|
||||
alreadyHandled, err := u.queries.IsAlreadyHandled(ctx, event, nil, user.AggregateType, user.HumanPasswordChangeSentType)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -594,5 +594,5 @@ func (u *userNotifier) checkIfCodeAlreadyHandledOrExpired(ctx context.Context, e
|
||||
if event.CreationDate().Add(expiry).Before(time.Now().UTC()) {
|
||||
return true, nil
|
||||
}
|
||||
return u.queries.IsAlreadyHandled(ctx, event, data, eventTypes...)
|
||||
return u.queries.IsAlreadyHandled(ctx, event, data, user.AggregateType, eventTypes...)
|
||||
}
|
@@ -29,6 +29,8 @@ func Start(
|
||||
ctx context.Context,
|
||||
userHandlerCustomConfig projection.CustomConfig,
|
||||
quotaHandlerCustomConfig projection.CustomConfig,
|
||||
telemetryHandlerCustomConfig projection.CustomConfig,
|
||||
telemetryCfg handlers.TelemetryPusherConfig,
|
||||
externalPort uint16,
|
||||
externalSecure bool,
|
||||
commands *command.Commands,
|
||||
@@ -74,4 +76,15 @@ func Start(
|
||||
metricSuccessfulDeliveriesJSON,
|
||||
metricFailedDeliveriesJSON,
|
||||
).Start()
|
||||
if telemetryCfg.Enabled {
|
||||
handlers.NewTelemetryPusher(
|
||||
ctx,
|
||||
telemetryCfg,
|
||||
projection.ApplyCustomConfig(telemetryHandlerCustomConfig),
|
||||
commands,
|
||||
q,
|
||||
metricSuccessfulDeliveriesJSON,
|
||||
metricFailedDeliveriesJSON,
|
||||
).Start()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user