zitadel/internal/notification/channels/webhook/channel.go
Elio Bischof cccccd005c
feat: call webhooks at least once (#5454)
* feat: call webhooks at least once

* self review

* feat: improve notification observability

* feat: add notification tracing

* test(e2e): test at-least-once webhook delivery

* fix webhook notifications

* dedicated quota notifications handler

* fix linting

* fix e2e test

* wait less in e2e test

* fix: don't ignore failed events in handlers

* fix: don't ignore failed events in handlers

* faster requeues

* question

* fix retries

* fix retries

* retry

* don't instance ids query

* revert handler_projection

* statements can be nil

* cleanup

* make unit tests pass

* add comments

* add comments

* lint

* spool only active instances

* feat(config): handle inactive instances

* customizable HandleInactiveInstances

* call inactive instances quota webhooks

* test: handling with and w/o inactive instances

* omit retrying noop statements

* docs: describe projection options

* enable global handling of inactive instances

* self review

* requeue quota notifications every 5m

* remove caos_errors reference

* fix comment styles

* make handlers package flat

* fix linting

* fix repeating quota notifications

* test with more usage

* debug log channel init failures
2023-03-28 22:09:06 +00:00

61 lines
1.5 KiB
Go

package webhook
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/notification/channels"
"github.com/zitadel/zitadel/internal/notification/messages"
)
func InitChannel(ctx context.Context, cfg Config) (channels.NotificationChannel, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
logging.Debug("successfully initialized webhook json channel")
return channels.HandleMessageFunc(func(message channels.Message) error {
requestCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
msg, ok := message.(*messages.JSON)
if !ok {
return errors.ThrowInternal(nil, "WEBH-K686U", "message is not JSON")
}
payload, err := msg.GetContent()
if err != nil {
return err
}
req, err := http.NewRequestWithContext(requestCtx, cfg.Method, cfg.CallURL, strings.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if err = resp.Body.Close(); err != nil {
return err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return errors.ThrowUnknown(fmt.Errorf("calling url %s returned %s", cfg.CallURL, resp.Status), "WEBH-LBxU0", "webhook didn't return a success status")
}
logging.WithFields("calling_url", cfg.CallURL, "method", cfg.Method).Debug("webhook called")
return nil
}), nil
}