//go:build integration

package sink

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net"
	"net/http"
	"net/url"
	"sync/atomic"

	"github.com/gorilla/websocket"
	"github.com/zitadel/logging"
)

// Request is a message forwarded from the handler to [Subscription]s.
type Request struct {
	Header http.Header
	Body   json.RawMessage
}

// Subscription is a websocket client to which [Request]s are forwarded by the server.
type Subscription struct {
	conn       *websocket.Conn
	closed     atomic.Bool
	reqChannel chan *Request
}

// Subscribe to a channel.
// The subscription forwards all requests it received on the channel's
// handler, after Subscribe has returned.
// Multiple subscription may be active on a single channel.
// Each request is always forwarded to each Subscription.
// Close must be called to cleanup up the Subscription's channel and go routine.
func Subscribe(ctx context.Context, ch Channel) *Subscription {
	u := url.URL{
		Scheme: "ws",
		Host:   listenAddr,
		Path:   subscribePath(ch),
	}
	conn, resp, err := websocket.DefaultDialer.DialContext(ctx, u.String(), nil)
	if err != nil {
		if resp != nil {
			defer resp.Body.Close()
			body, _ := io.ReadAll(resp.Body)
			err = fmt.Errorf("subscribe: %w, status: %s, body: %s", err, resp.Status, body)
		}
		panic(err)
	}

	sub := &Subscription{
		conn:       conn,
		reqChannel: make(chan *Request, 10),
	}
	go sub.readToChan()
	return sub
}

func (s *Subscription) readToChan() {
	for {
		if s.closed.Load() {
			break
		}
		req := new(Request)
		if err := s.conn.ReadJSON(req); err != nil {
			opErr := new(net.OpError)
			if errors.As(err, &opErr) {
				break
			}
			logging.WithError(err).Error("subscription read")
			break
		}
		s.reqChannel <- req
	}
	close(s.reqChannel)
}

// Recv returns the channel over which [Request]s are send.
func (s *Subscription) Recv() <-chan *Request {
	return s.reqChannel
}

func (s *Subscription) Close() error {
	s.closed.Store(true)
	return s.conn.Close()
}