zitadel/internal/notification/handlers/telemetry_pusher.go

151 lines
4.5 KiB
Go
Raw Normal View History

2023-06-19 12:15:30 +02:00
package handlers
import (
"context"
2023-06-28 08:19:34 +02:00
"fmt"
2023-06-19 12:15:30 +02:00
"net/http"
2023-06-28 08:19:34 +02:00
"github.com/zitadel/logging"
2023-06-28 17:43:19 +02:00
"github.com/zitadel/zitadel/internal/api/authz"
2023-06-28 08:19:34 +02:00
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/command"
2023-06-28 17:43:19 +02:00
"github.com/zitadel/zitadel/internal/errors"
2023-06-19 12:15:30 +02:00
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/crdb"
"github.com/zitadel/zitadel/internal/notification/channels/webhook"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/notification/types"
2023-06-28 17:43:19 +02:00
"github.com/zitadel/zitadel/internal/query"
2023-06-19 12:15:30 +02:00
"github.com/zitadel/zitadel/internal/query/projection"
2023-06-30 02:11:17 +02:00
"github.com/zitadel/zitadel/internal/repository/milestone"
2023-06-28 17:43:19 +02:00
"github.com/zitadel/zitadel/internal/repository/pseudo"
2023-06-19 12:15:30 +02:00
)
const (
TelemetryProjectionTable = "projections.telemetry"
)
type TelemetryPusherConfig struct {
Enabled bool
Endpoints []string
}
type telemetryPusher struct {
crdb.StatementHandler
commands *command.Commands
queries *NotificationQueries
metricSuccessfulDeliveriesJSON string
metricFailedDeliveriesJSON string
2023-06-21 11:40:16 +02:00
endpoints []string
2023-06-19 12:15:30 +02:00
}
func NewTelemetryPusher(
ctx context.Context,
telemetryCfg TelemetryPusherConfig,
handlerCfg crdb.StatementHandlerConfig,
commands *command.Commands,
queries *NotificationQueries,
metricSuccessfulDeliveriesJSON,
metricFailedDeliveriesJSON string,
) *telemetryPusher {
p := new(telemetryPusher)
handlerCfg.ProjectionName = TelemetryProjectionTable
2023-06-21 11:40:16 +02:00
handlerCfg.Reducers = []handler.AggregateReducer{{}}
if telemetryCfg.Enabled {
handlerCfg.Reducers = p.reducers()
}
p.endpoints = telemetryCfg.Endpoints
2023-06-19 12:15:30 +02:00
p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg)
p.commands = commands
p.queries = queries
p.metricSuccessfulDeliveriesJSON = metricSuccessfulDeliveriesJSON
p.metricFailedDeliveriesJSON = metricFailedDeliveriesJSON
projection.TelemetryPusherProjection = p
return p
}
func (t *telemetryPusher) reducers() []handler.AggregateReducer {
2023-06-28 08:19:34 +02:00
return []handler.AggregateReducer{{
Aggregate: pseudo.AggregateType,
EventRedusers: []handler.EventReducer{{
2023-06-28 11:35:22 +02:00
Event: pseudo.ScheduledEventType,
2023-06-28 08:19:34 +02:00
Reduce: t.pushMilestones,
}},
}}
2023-06-19 12:15:30 +02:00
}
2023-06-28 08:19:34 +02:00
func (t *telemetryPusher) pushMilestones(event eventstore.Event) (*handler.Statement, error) {
ctx := call.WithTimestamp(context.Background())
2023-06-28 11:35:22 +02:00
scheduledEvent, ok := event.(*pseudo.ScheduledEvent)
2023-06-19 12:15:30 +02:00
if !ok {
2023-06-28 08:19:34 +02:00
return nil, errors.ThrowInvalidArgumentf(nil, "HANDL-lDTs5", "reduce.wrong.event.type %s", event.Type())
2023-06-19 12:15:30 +02:00
}
2023-06-28 08:19:34 +02:00
isReached, err := query.NewNotNullQuery(query.MilestoneReachedDateColID)
2023-06-19 12:15:30 +02:00
if err != nil {
return nil, err
}
2023-06-28 08:19:34 +02:00
isNotPushed, err := query.NewIsNullQuery(query.MilestonePushedDateColID)
if err != nil {
return nil, err
}
hasPrimaryDomain, err := query.NewNotNullQuery(query.MilestonePrimaryDomainColID)
if err != nil {
return nil, err
2023-06-19 12:15:30 +02:00
}
2023-06-28 11:35:22 +02:00
unpushedMilestones, err := t.queries.Queries.SearchMilestones(ctx, scheduledEvent.InstanceIDs, &query.MilestonesSearchQueries{
2023-06-28 08:19:34 +02:00
SearchRequest: query.SearchRequest{
2023-06-28 11:35:22 +02:00
Limit: 100,
2023-06-28 08:19:34 +02:00
SortingColumn: query.MilestoneReachedDateColID,
Asc: true,
},
Queries: []query.SearchQuery{isReached, isNotPushed, hasPrimaryDomain},
})
if err != nil {
return nil, err
}
var errs int
for _, ms := range unpushedMilestones.Milestones {
2023-06-28 11:35:22 +02:00
if err = t.pushMilestone(ctx, scheduledEvent, ms); err != nil {
2023-06-28 08:19:34 +02:00
errs++
logging.Warnf("pushing milestone %+v failed: %s", *ms, err.Error())
}
}
if errs > 0 {
return nil, fmt.Errorf("pushing %d of %d milestones failed", errs, unpushedMilestones.Count)
}
2023-06-28 11:35:22 +02:00
return crdb.NewNoOpStatement(scheduledEvent), nil
2023-06-28 08:19:34 +02:00
}
2023-06-28 11:35:22 +02:00
func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.ScheduledEvent, ms *query.Milestone) error {
2023-06-28 17:43:19 +02:00
ctx = authz.WithInstanceID(ctx, ms.InstanceID)
2023-06-30 02:53:47 +02:00
alreadyHandled, err := t.queries.IsAlreadyHandled(ctx, event, map[string]interface{}{"type": ms.Type.String()}, milestone.AggregateType, milestone.PushedEventType)
2023-06-30 02:11:17 +02:00
if err != nil {
return err
}
if alreadyHandled {
return nil
}
2023-06-21 11:40:16 +02:00
for _, endpoint := range t.endpoints {
2023-06-28 08:19:34 +02:00
if err := types.SendJSON(
2023-06-19 12:15:30 +02:00
ctx,
webhook.Config{
CallURL: endpoint,
Method: http.MethodPost,
},
t.queries.GetFileSystemProvider,
t.queries.GetLogProvider,
2023-06-28 08:19:34 +02:00
ms,
2023-06-28 11:35:22 +02:00
event,
2023-06-19 12:15:30 +02:00
t.metricSuccessfulDeliveriesJSON,
t.metricFailedDeliveriesJSON,
).WithoutTemplate(); err != nil {
2023-06-28 08:19:34 +02:00
return err
2023-06-19 12:15:30 +02:00
}
}
2023-06-28 17:43:19 +02:00
return t.commands.MilestonePushed(ctx, ms.Type, t.endpoints, ms.PrimaryDomain)
2023-06-19 12:15:30 +02:00
}