enable request headers

This commit is contained in:
Elio Bischof
2023-07-05 11:45:09 +02:00
parent 8d38ee6244
commit a901224bbb
6 changed files with 29 additions and 12 deletions

View File

@@ -28,6 +28,12 @@ Telemetry:
# If you change this configuration at runtime, remaining data that is not successfully delivered to the old endpoints is sent to the new endpoints. # If you change this configuration at runtime, remaining data that is not successfully delivered to the old endpoints is sent to the new endpoints.
Endpoints: Endpoints:
- https://httpbin.org/post - https://httpbin.org/post
# These headers are sent with every request to the configured endpoints.
Headers:
# single-value: "single-value"
# multi-value:
# - "multi-value-1"
# - "multi-value-2"
# Port ZITADEL will listen on # Port ZITADEL will listen on
Port: 8080 Port: 8080

View File

@@ -8,6 +8,11 @@ Telemetry:
Enabled: true Enabled: true
Endpoints: Endpoints:
- http://localhost:8081 - http://localhost:8081
Headers:
single-value: "single-value"
multi-value:
- "multi-value-1"
- "multi-value-2"
FirstInstance: FirstInstance:
Org: Org:

View File

@@ -21,10 +21,8 @@ func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel,
logging.Debug("successfully initialized webhook json channel") logging.Debug("successfully initialized webhook json channel")
return channels.HandleMessageFunc(func(message channels.Message) error { return channels.HandleMessageFunc(func(message channels.Message) error {
requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second) requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel() defer cancel()
msg, ok := message.(*messages.JSON) msg, ok := message.(*messages.JSON)
if !ok { if !ok {
return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON") return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON")
@@ -33,27 +31,24 @@ func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel,
if err != nil { if err != nil {
return err return err
} }
req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload)) req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload))
if err != nil { if err != nil {
return err return err
} }
if cfg.Headers != nil {
req.Header = cfg.Headers
}
req.Header.Set("Content-Type", "application/json") req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err
} }
if err = resp.Body.Close(); err != nil { if err = resp.Body.Close(); err != nil {
return err return err
} }
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status") return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status")
} }
logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called") logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called")
return nil return nil
}), nil }), nil

View File

@@ -1,12 +1,14 @@
package webhook package webhook
import ( import (
"net/http"
"net/url" "net/url"
) )
type Config struct { type Config struct {
CallURL string CallURL string
Method string Method string
Headers http.Header
} }
func (w *Config) Validate() error { func (w *Config) Validate() error {

View File

@@ -30,15 +30,16 @@ const (
type TelemetryPusherConfig struct { type TelemetryPusherConfig struct {
Enabled bool Enabled bool
Endpoints []string Endpoints []string
Headers http.Header
} }
type telemetryPusher struct { type telemetryPusher struct {
crdb.StatementHandler crdb.StatementHandler
cfg TelemetryPusherConfig
commands *command.Commands commands *command.Commands
queries *NotificationQueries queries *NotificationQueries
metricSuccessfulDeliveriesJSON string metricSuccessfulDeliveriesJSON string
metricFailedDeliveriesJSON string metricFailedDeliveriesJSON string
endpoints []string
} }
func NewTelemetryPusher( func NewTelemetryPusher(
@@ -56,7 +57,7 @@ func NewTelemetryPusher(
if telemetryCfg.Enabled { if telemetryCfg.Enabled {
handlerCfg.Reducers = p.reducers() handlerCfg.Reducers = p.reducers()
} }
p.endpoints = telemetryCfg.Endpoints p.cfg = telemetryCfg
p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg) p.StatementHandler = crdb.NewStatementHandler(ctx, handlerCfg)
p.commands = commands p.commands = commands
p.queries = queries p.queries = queries
@@ -129,12 +130,13 @@ func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.Sched
if alreadyHandled { if alreadyHandled {
return nil return nil
} }
for _, endpoint := range t.endpoints { for _, endpoint := range t.cfg.Endpoints {
if err := types.SendJSON( if err := types.SendJSON(
ctx, ctx,
webhook.Config{ webhook.Config{
CallURL: endpoint, CallURL: endpoint,
Method: http.MethodPost, Method: http.MethodPost,
Headers: t.cfg.Headers,
}, },
t.queries.GetFileSystemProvider, t.queries.GetFileSystemProvider,
t.queries.GetLogProvider, t.queries.GetLogProvider,
@@ -146,5 +148,5 @@ func (t *telemetryPusher) pushMilestone(ctx context.Context, event *pseudo.Sched
return err return err
} }
} }
return t.commands.MilestonePushed(ctx, ms.Type, t.endpoints, ms.PrimaryDomain) return t.commands.MilestonePushed(ctx, ms.Type, t.cfg.Endpoints, ms.PrimaryDomain)
} }

View File

@@ -9,6 +9,7 @@ import (
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"reflect"
"testing" "testing"
"time" "time"
@@ -25,6 +26,12 @@ func TestServer_TelemetryPushMilestones(t *testing.T) {
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
if r.Header.Get("single-value") != "single-value" {
t.Error("single-value header not set")
}
if reflect.DeepEqual(r.Header.Get("multi-value"), "multi-value-1,multi-value-2") {
t.Error("single-value header not set")
}
bodies <- body bodies <- body
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
})) }))