2024-09-06 15:47:57 +03:00
//go:build integration
package sink
import (
2025-02-20 13:27:20 +01:00
"bytes"
"context"
"encoding/json"
2024-09-06 15:47:57 +03:00
"errors"
"io"
"net/http"
"net/url"
"path"
"sync"
"sync/atomic"
2025-02-20 13:27:20 +01:00
"time"
2024-09-06 15:47:57 +03:00
"github.com/go-chi/chi/v5"
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"github.com/zitadel/logging"
2025-02-20 13:27:20 +01:00
"github.com/zitadel/oidc/v3/pkg/oidc"
"golang.org/x/oauth2"
"golang.org/x/text/language"
crewjam_saml "github.com/crewjam/saml"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/idp/providers/ldap"
openid "github.com/zitadel/zitadel/internal/idp/providers/oidc"
"github.com/zitadel/zitadel/internal/idp/providers/saml"
2024-09-06 15:47:57 +03:00
)
const (
port = "8081"
listenAddr = "127.0.0.1:" + port
host = "localhost:" + port
)
// CallURL returns the full URL to the handler of a [Channel].
func CallURL ( ch Channel ) string {
u := url . URL {
Scheme : "http" ,
Host : host ,
Path : rootPath ( ch ) ,
}
return u . String ( )
}
2025-02-20 13:27:20 +01:00
func SuccessfulOAuthIntent ( instanceID , idpID , idpUserID , userID string ) ( string , string , time . Time , uint64 , error ) {
u := url . URL {
Scheme : "http" ,
Host : host ,
Path : successfulIntentOAuthPath ( ) ,
}
resp , err := callIntent ( u . String ( ) , & SuccessfulIntentRequest {
InstanceID : instanceID ,
IDPID : idpID ,
IDPUserID : idpUserID ,
UserID : userID ,
} )
if err != nil {
return "" , "" , time . Time { } , uint64 ( 0 ) , err
}
return resp . IntentID , resp . Token , resp . ChangeDate , resp . Sequence , nil
}
func SuccessfulSAMLIntent ( instanceID , idpID , idpUserID , userID string ) ( string , string , time . Time , uint64 , error ) {
u := url . URL {
Scheme : "http" ,
Host : host ,
Path : successfulIntentSAMLPath ( ) ,
}
resp , err := callIntent ( u . String ( ) , & SuccessfulIntentRequest {
InstanceID : instanceID ,
IDPID : idpID ,
IDPUserID : idpUserID ,
UserID : userID ,
} )
if err != nil {
return "" , "" , time . Time { } , uint64 ( 0 ) , err
}
return resp . IntentID , resp . Token , resp . ChangeDate , resp . Sequence , nil
}
func SuccessfulLDAPIntent ( instanceID , idpID , idpUserID , userID string ) ( string , string , time . Time , uint64 , error ) {
u := url . URL {
Scheme : "http" ,
Host : host ,
Path : successfulIntentLDAPPath ( ) ,
}
resp , err := callIntent ( u . String ( ) , & SuccessfulIntentRequest {
InstanceID : instanceID ,
IDPID : idpID ,
IDPUserID : idpUserID ,
UserID : userID ,
} )
if err != nil {
return "" , "" , time . Time { } , uint64 ( 0 ) , err
}
return resp . IntentID , resp . Token , resp . ChangeDate , resp . Sequence , nil
}
2024-09-06 15:47:57 +03:00
// StartServer starts a simple HTTP server on localhost:8081
// ZITADEL can use the server to send HTTP requests which can be
// used to validate tests through [Subscribe]rs.
// For each [Channel] a route is registered on http://localhost:8081/<channel_name>.
// The route must be used to send the HTTP request to be validated.
// [CallURL] can be used to obtain the full URL for a given Channel.
//
// This function is only active when the `integration` build tag is enabled
2025-02-20 13:27:20 +01:00
func StartServer ( commands * command . Commands ) ( close func ( ) ) {
2024-09-06 15:47:57 +03:00
router := chi . NewRouter ( )
for _ , ch := range ChannelValues ( ) {
fwd := & forwarder {
channelID : ch ,
subscribers : make ( map [ int64 ] chan <- * Request ) ,
}
router . HandleFunc ( rootPath ( ch ) , fwd . receiveHandler )
router . HandleFunc ( subscribePath ( ch ) , fwd . subscriptionHandler )
2025-02-20 13:27:20 +01:00
router . HandleFunc ( successfulIntentOAuthPath ( ) , successfulIntentHandler ( commands , createSuccessfulOAuthIntent ) )
router . HandleFunc ( successfulIntentSAMLPath ( ) , successfulIntentHandler ( commands , createSuccessfulSAMLIntent ) )
router . HandleFunc ( successfulIntentLDAPPath ( ) , successfulIntentHandler ( commands , createSuccessfulLDAPIntent ) )
2024-09-06 15:47:57 +03:00
}
s := & http . Server {
Addr : listenAddr ,
Handler : router ,
}
logging . WithFields ( "listen_addr" , listenAddr ) . Warn ( "!!!! A sink server is started which may expose sensitive data on a public endpoint. Make sure the `integration` build tag is disabled for production builds. !!!!" )
go func ( ) {
err := s . ListenAndServe ( )
if ! errors . Is ( err , http . ErrServerClosed ) {
logging . WithError ( err ) . Fatal ( "sink server" )
}
} ( )
return func ( ) {
logging . OnError ( s . Close ( ) ) . Error ( "sink server" )
}
}
func rootPath ( c Channel ) string {
return path . Join ( "/" , c . String ( ) )
}
func subscribePath ( c Channel ) string {
return path . Join ( "/" , c . String ( ) , "subscribe" )
}
2025-02-20 13:27:20 +01:00
func intentPath ( ) string {
return path . Join ( "/" , "intent" )
}
func successfulIntentPath ( ) string {
return path . Join ( intentPath ( ) , "/" , "successful" )
}
func successfulIntentOAuthPath ( ) string {
return path . Join ( successfulIntentPath ( ) , "/" , "oauth" )
}
func successfulIntentSAMLPath ( ) string {
return path . Join ( successfulIntentPath ( ) , "/" , "saml" )
}
func successfulIntentLDAPPath ( ) string {
return path . Join ( successfulIntentPath ( ) , "/" , "ldap" )
}
2024-09-06 15:47:57 +03:00
// forwarder handles incoming HTTP requests from ZITADEL and
// forwards them to all subscribed web sockets.
type forwarder struct {
channelID Channel
id atomic . Int64
mtx sync . RWMutex
subscribers map [ int64 ] chan <- * Request
upgrader websocket . Upgrader
}
// receiveHandler receives a simple HTTP for a single [Channel]
// and forwards them on all active subscribers of that Channel.
func ( c * forwarder ) receiveHandler ( w http . ResponseWriter , r * http . Request ) {
req := & Request {
Header : r . Header . Clone ( ) ,
}
var err error
req . Body , err = io . ReadAll ( r . Body )
if err != nil {
http . Error ( w , err . Error ( ) , http . StatusBadRequest )
}
c . mtx . RLock ( )
for _ , reqChan := range c . subscribers {
reqChan <- req
}
c . mtx . RUnlock ( )
w . WriteHeader ( http . StatusOK )
}
// subscriptionHandler upgrades HTTP request to a websocket connection for subscribers.
// All received HTTP requests on a subscriber's channel are send on the websocket to the client.
func ( c * forwarder ) subscriptionHandler ( w http . ResponseWriter , r * http . Request ) {
ws , err := c . upgrader . Upgrade ( w , r , nil )
logging . OnError ( err ) . Error ( "websocket upgrade" )
if err != nil {
return
}
done := readLoop ( ws )
id := c . id . Add ( 1 )
reqChannel := make ( chan * Request , 100 )
c . mtx . Lock ( )
c . subscribers [ id ] = reqChannel
c . mtx . Unlock ( )
logging . WithFields ( "id" , id , "channel" , c . channelID ) . Info ( "websocket opened" )
defer func ( ) {
c . mtx . Lock ( )
delete ( c . subscribers , id )
c . mtx . Unlock ( )
ws . Close ( )
close ( reqChannel )
} ( )
for {
select {
case err := <- done :
logging . WithError ( err ) . WithFields ( logrus . Fields { "id" : id , "channel" : c . channelID } ) . Info ( "websocket closed" )
return
case req := <- reqChannel :
if err := ws . WriteJSON ( req ) ; err != nil {
logging . WithError ( err ) . WithFields ( logrus . Fields { "id" : id , "channel" : c . channelID } ) . Error ( "websocket write json" )
return
}
}
}
}
// readLoop makes sure we can receive close messages
func readLoop ( ws * websocket . Conn ) ( done chan error ) {
done = make ( chan error , 1 )
go func ( done chan <- error ) {
for {
_ , _ , err := ws . NextReader ( )
if err != nil {
done <- err
break
}
}
close ( done )
} ( done )
return done
}
2025-02-20 13:27:20 +01:00
type SuccessfulIntentRequest struct {
InstanceID string ` json:"instance_id" `
IDPID string ` json:"idp_id" `
IDPUserID string ` json:"idp_user_id" `
UserID string ` json:"user_id" `
}
type SuccessfulIntentResponse struct {
IntentID string ` json:"intent_id" `
Token string ` json:"token" `
ChangeDate time . Time ` json:"change_date" `
Sequence uint64 ` json:"sequence" `
}
func callIntent ( url string , req * SuccessfulIntentRequest ) ( * SuccessfulIntentResponse , error ) {
data , err := json . Marshal ( req )
if err != nil {
return nil , err
}
resp , err := http . Post ( url , "application/json" , io . NopCloser ( bytes . NewReader ( data ) ) )
if err != nil {
return nil , err
}
body , err := io . ReadAll ( resp . Body )
if err != nil {
return nil , err
}
defer resp . Body . Close ( )
if resp . StatusCode != http . StatusOK {
return nil , errors . New ( string ( body ) )
}
result := new ( SuccessfulIntentResponse )
if err := json . Unmarshal ( body , result ) ; err != nil {
return nil , err
}
return result , nil
}
func successfulIntentHandler ( cmd * command . Commands , createIntent func ( ctx context . Context , cmd * command . Commands , req * SuccessfulIntentRequest ) ( * SuccessfulIntentResponse , error ) ) func ( w http . ResponseWriter , r * http . Request ) {
return func ( w http . ResponseWriter , r * http . Request ) {
body , err := io . ReadAll ( r . Body )
if err != nil {
http . Error ( w , err . Error ( ) , http . StatusBadRequest )
return
}
req := & SuccessfulIntentRequest { }
if err := json . Unmarshal ( body , req ) ; err != nil {
}
ctx := authz . WithInstanceID ( r . Context ( ) , req . InstanceID )
resp , err := createIntent ( ctx , cmd , req )
if err != nil {
http . Error ( w , err . Error ( ) , http . StatusBadRequest )
return
}
data , err := json . Marshal ( resp )
if err != nil {
http . Error ( w , err . Error ( ) , http . StatusBadRequest )
return
}
w . Write ( data )
return
}
}
func createIntent ( ctx context . Context , cmd * command . Commands , instanceID , idpID string ) ( string , error ) {
2025-02-26 13:20:47 +01:00
writeModel , _ , err := cmd . CreateIntent ( ctx , "" , idpID , "https://example.com/success" , "https://example.com/failure" , instanceID , nil )
2025-02-20 13:27:20 +01:00
if err != nil {
return "" , err
}
return writeModel . AggregateID , nil
}
func createSuccessfulOAuthIntent ( ctx context . Context , cmd * command . Commands , req * SuccessfulIntentRequest ) ( * SuccessfulIntentResponse , error ) {
intentID , err := createIntent ( ctx , cmd , req . InstanceID , req . IDPID )
writeModel , err := cmd . GetIntentWriteModel ( ctx , intentID , req . InstanceID )
idpUser := openid . NewUser (
& oidc . UserInfo {
Subject : req . IDPUserID ,
UserInfoProfile : oidc . UserInfoProfile {
PreferredUsername : "username" ,
} ,
} ,
)
idpSession := & openid . Session {
Tokens : & oidc . Tokens [ * oidc . IDTokenClaims ] {
Token : & oauth2 . Token {
AccessToken : "accessToken" ,
} ,
IDToken : "idToken" ,
} ,
}
token , err := cmd . SucceedIDPIntent ( ctx , writeModel , idpUser , idpSession , req . UserID )
if err != nil {
return nil , err
}
return & SuccessfulIntentResponse {
intentID ,
token ,
writeModel . ChangeDate ,
writeModel . ProcessedSequence ,
} , nil
}
func createSuccessfulSAMLIntent ( ctx context . Context , cmd * command . Commands , req * SuccessfulIntentRequest ) ( * SuccessfulIntentResponse , error ) {
intentID , err := createIntent ( ctx , cmd , req . InstanceID , req . IDPID )
writeModel , err := cmd . GetIntentWriteModel ( ctx , intentID , req . InstanceID )
idpUser := & saml . UserMapper {
ID : req . IDPUserID ,
Attributes : map [ string ] [ ] string { "attribute1" : { "value1" } } ,
}
assertion := & crewjam_saml . Assertion { ID : "id" }
token , err := cmd . SucceedSAMLIDPIntent ( ctx , writeModel , idpUser , req . UserID , assertion )
if err != nil {
return nil , err
}
return & SuccessfulIntentResponse {
intentID ,
token ,
writeModel . ChangeDate ,
writeModel . ProcessedSequence ,
} , nil
}
func createSuccessfulLDAPIntent ( ctx context . Context , cmd * command . Commands , req * SuccessfulIntentRequest ) ( * SuccessfulIntentResponse , error ) {
intentID , err := createIntent ( ctx , cmd , req . InstanceID , req . IDPID )
writeModel , err := cmd . GetIntentWriteModel ( ctx , intentID , req . InstanceID )
username := "username"
lang := language . Make ( "en" )
idpUser := ldap . NewUser (
req . IDPUserID ,
"" ,
"" ,
"" ,
"" ,
username ,
"" ,
false ,
"" ,
false ,
lang ,
"" ,
"" ,
)
attributes := map [ string ] [ ] string { "id" : { req . IDPUserID } , "username" : { username } , "language" : { lang . String ( ) } }
token , err := cmd . SucceedLDAPIDPIntent ( ctx , writeModel , idpUser , req . UserID , attributes )
if err != nil {
return nil , err
}
return & SuccessfulIntentResponse {
intentID ,
token ,
writeModel . ChangeDate ,
writeModel . ProcessedSequence ,
} , nil
}