diff --git a/cmd/setup/48_river_queue_repeatable.go b/cmd/setup/48_river_queue_repeatable.go new file mode 100644 index 0000000000..e88293256b --- /dev/null +++ b/cmd/setup/48_river_queue_repeatable.go @@ -0,0 +1,28 @@ +package setup + +import ( + "context" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/queue" +) + +type RiverMigrateRepeatable struct { + client *database.DB +} + +func (mig *RiverMigrateRepeatable) Execute(ctx context.Context, _ eventstore.Event) error { + if mig.client.Type() != "postgres" { + return nil + } + return queue.New(mig.client).ExecuteMigrations(ctx) +} + +func (mig *RiverMigrateRepeatable) String() string { + return "repeatable_migrate_river" +} + +func (f *RiverMigrateRepeatable) Check(lastRun map[string]interface{}) bool { + return true +} diff --git a/cmd/setup/setup.go b/cmd/setup/setup.go index 33298033a9..b78d1fc9cf 100644 --- a/cmd/setup/setup.go +++ b/cmd/setup/setup.go @@ -199,6 +199,9 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string) eventstore: eventstoreClient, rolePermissionMappings: config.InternalAuthZ.RolePermissionMappings, }, + &RiverMigrateRepeatable{ + client: dbClient, + }, } for _, step := range []migration.Migration{ diff --git a/go.mod b/go.mod index b35bf04216..f316c3e866 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/h2non/gock v1.2.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/improbable-eng/grpc-web v0.15.0 - github.com/jackc/pgx/v5 v5.7.0 + github.com/jackc/pgx/v5 v5.7.2 github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 github.com/jinzhu/gorm v1.9.16 github.com/k3a/html2text v1.2.1 @@ -63,7 +63,7 @@ require ( github.com/sony/sonyflake v1.2.0 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203 github.com/ttacon/libphonenumber v1.2.1 github.com/twilio/twilio-go v1.22.2 @@ -87,7 +87,7 @@ require ( golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 golang.org/x/net v0.33.0 golang.org/x/oauth2 v0.23.0 - golang.org/x/sync v0.10.0 + golang.org/x/sync v0.11.0 golang.org/x/text v0.21.0 google.golang.org/api v0.187.0 google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd @@ -115,7 +115,7 @@ require ( github.com/google/go-tpm v0.9.0 // indirect github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect github.com/google/s2a-go v0.1.7 // indirect - github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/lib/pq v1.10.9 // indirect github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect @@ -124,11 +124,20 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/pkg/errors v0.9.1 // indirect + github.com/riverqueue/river v0.16.0 // indirect + github.com/riverqueue/river/riverdriver v0.16.0 // indirect + github.com/riverqueue/river/rivershared v0.16.0 // indirect + github.com/riverqueue/river/rivertype v0.16.0 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/yuin/gopher-lua v1.1.1 // indirect github.com/zenazn/goji v1.0.1 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/time v0.5.0 // indirect google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect @@ -193,6 +202,7 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0 github.com/rs/xid v1.5.0 // indirect github.com/russellhaering/goxmldsig v1.4.0 // indirect github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 0745f5aca7..709d6b7a14 100644 --- a/go.sum +++ b/go.sum @@ -422,8 +422,12 @@ github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7Ulw github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgx/v5 v5.7.0 h1:FG6VLIdzvAPhnYqP14sQ2xhFLkiUQHCs6ySqO91kF4g= github.com/jackc/pgx/v5 v5.7.0/go.mod h1:awP1KNnjylvpxHuHP63gzjhnGkI1iw+PMoIwvoleN/8= +github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI= +github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ= github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52/go.mod h1:RDZ+4PR3mDOtTpVbI0qBE+rdhmtIrtbssiNn38/1OWA= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= @@ -640,6 +644,16 @@ github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Ung github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E= github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/riverqueue/river v0.16.0 h1:YyQrs0kGgjuABwgat02DPUYS0TMyG2ZFlzvf6+fSFaw= +github.com/riverqueue/river v0.16.0/go.mod h1:pEZ8Gc15XyFjVY89nJeL256ub5z18XF7ukYn8ktqQrs= +github.com/riverqueue/river/riverdriver v0.16.0 h1:y4Df4e1Xk3Id0nnu1VxHJn9118OzmRHcmvOxM/i1Q30= +github.com/riverqueue/river/riverdriver v0.16.0/go.mod h1:7Kdf5HQDrLyLUUqPqXobaK+7zbcMctWeAl7yhg4nHes= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0 h1:6HP296OPN+3ORL9qG1f561pldB5eovkLzfkNIQmaTXI= +github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0/go.mod h1:MAeBNoTQ+CD3nRvV9mF6iCBfsGJTxYHZeZSP4MYoeUE= +github.com/riverqueue/river/rivershared v0.16.0 h1:L1lQ3gMwdIsxA6yF0/PwAdsFP0T82yBD1V03q2GuJDU= +github.com/riverqueue/river/rivershared v0.16.0/go.mod h1:y5Xu8Shcp44DUNnEQV4c6oWH4m2OTkSMCe6nRrgzT34= +github.com/riverqueue/river/rivertype v0.16.0 h1:iDjNtCiUbXwLraqNEyQdH/OD80f1wTo8Ai6WHYCwRxs= +github.com/riverqueue/river/rivertype v0.16.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -715,10 +729,22 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203 h1:1SWXcTphBQjYGWRRxLFIAR1LVtQEj4eR7xPtyeOVM/c= github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203/go.mod h1:0Xw5cYMOYpgaWs+OOSx41ugycl2qvKTi9tlMMcZhFyY= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 h1:5u+EJUQiosu3JFX0XS0qTf5FznsMOzTjGqavBGuCbo0= github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2/go.mod h1:4kyMkleCiLkgY6z8gK5BkI01ChBtxR0ro3I1ZDcGM3w= @@ -900,6 +926,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/database/cockroach/crdb.go b/internal/database/cockroach/crdb.go index 48e912b5f5..a5b3208a86 100644 --- a/internal/database/cockroach/crdb.go +++ b/internal/database/cockroach/crdb.go @@ -97,6 +97,27 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { } } + if len(connConfig.BeforeAcquire) > 0 { + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + for _, f := range connConfig.BeforeAcquire { + if err := f(ctx, conn); err != nil { + return false + } + } + return true + } + } + if len(connConfig.AfterRelease) > 0 { + config.AfterRelease = func(conn *pgx.Conn) bool { + for _, f := range connConfig.AfterRelease { + if err := f(conn); err != nil { + return false + } + } + return true + } + } + if connConfig.MaxOpenConns != 0 { config.MaxConns = int32(connConfig.MaxOpenConns) } diff --git a/internal/database/dialect/connections.go b/internal/database/dialect/connections.go index 13a4d657c3..11b2681fea 100644 --- a/internal/database/dialect/connections.go +++ b/internal/database/dialect/connections.go @@ -18,7 +18,9 @@ var ( type ConnectionConfig struct { MaxOpenConns, MaxIdleConns uint32 - AfterConnect []func(ctx context.Context, c *pgx.Conn) error + AfterConnect []func(ctx context.Context, c *pgx.Conn) error + BeforeAcquire []func(ctx context.Context, c *pgx.Conn) error + AfterRelease []func(c *pgx.Conn) error } var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error @@ -27,6 +29,18 @@ func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) { afterConnectFuncs = append(afterConnectFuncs, f) } +var beforeAcquireFuncs []func(ctx context.Context, c *pgx.Conn) error + +func RegisterBeforeAcquire(f func(ctx context.Context, c *pgx.Conn) error) { + beforeAcquireFuncs = append(beforeAcquireFuncs, f) +} + +var afterReleaseFuncs []func(c *pgx.Conn) error + +func RegisterAfterRelease(f func(c *pgx.Conn) error) { + afterReleaseFuncs = append(afterReleaseFuncs, f) +} + func RegisterDefaultPgTypeVariants[T any](m *pgtype.Map, name, arrayName string) { // T var value T @@ -58,8 +72,10 @@ func RegisterDefaultPgTypeVariants[T any](m *pgtype.Map, name, arrayName string) // The pusherRatio and spoolerRatio must be between 0 and 1. func NewConnectionConfig(openConns, idleConns uint32) *ConnectionConfig { return &ConnectionConfig{ - MaxOpenConns: openConns, - MaxIdleConns: idleConns, - AfterConnect: afterConnectFuncs, + MaxOpenConns: openConns, + MaxIdleConns: idleConns, + AfterConnect: afterConnectFuncs, + BeforeAcquire: beforeAcquireFuncs, + AfterRelease: afterReleaseFuncs, } } diff --git a/internal/database/postgres/pg.go b/internal/database/postgres/pg.go index 5f4d9a6c9b..c847cc0a58 100644 --- a/internal/database/postgres/pg.go +++ b/internal/database/postgres/pg.go @@ -81,13 +81,37 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) { return nil, nil, err } - config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { - for _, f := range connConfig.AfterConnect { - if err := f(ctx, conn); err != nil { - return err + if len(connConfig.AfterConnect) > 0 { + config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error { + for _, f := range connConfig.AfterConnect { + if err := f(ctx, conn); err != nil { + return err + } } + return nil + } + } + + if len(connConfig.BeforeAcquire) > 0 { + config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool { + for _, f := range connConfig.BeforeAcquire { + if err := f(ctx, conn); err != nil { + return false + } + } + return true + } + } + + if len(connConfig.AfterRelease) > 0 { + config.AfterRelease = func(conn *pgx.Conn) bool { + for _, f := range connConfig.AfterRelease { + if err := f(conn); err != nil { + return false + } + } + return true } - return nil } if connConfig.MaxOpenConns != 0 { diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000000..265988e9ef --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,75 @@ +package queue + +import ( + "context" + "sync" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivermigrate" + + "github.com/zitadel/zitadel/internal/database" + "github.com/zitadel/zitadel/internal/database/dialect" +) + +const ( + schema = "queue" + applicationName = "zitadel_queue" +) + +var conns = &sync.Map{} + +type queueKey struct{} + +func WithQueue(parent context.Context) context.Context { + return context.WithValue(parent, queueKey{}, struct{}{}) +} + +func init() { + dialect.RegisterBeforeAcquire(func(ctx context.Context, c *pgx.Conn) error { + if _, ok := ctx.Value(queueKey{}).(struct{}); !ok { + return nil + } + _, err := c.Exec(ctx, "SET search_path TO "+schema+"; SET application_name TO "+applicationName) + if err != nil { + return err + } + conns.Store(c, struct{}{}) + return nil + }) + dialect.RegisterAfterRelease(func(c *pgx.Conn) error { + _, ok := conns.LoadAndDelete(c) + if !ok { + return nil + } + _, err := c.Exec(context.Background(), "SET search_path TO DEFAULT; SET application_name TO "+dialect.DefaultAppName) + return err + }) +} + +// Queue abstracts the underlying queuing library +// For more information see github.com/riverqueue/river +// TODO(adlerhurst): maybe it makes more sense to split the effective queue from the migrator. +type Queue struct { + driver riverdriver.Driver[pgx.Tx] +} + +func New(client *database.DB) *Queue { + return &Queue{driver: riverpgxv5.New(client.Pool)} +} + +func (q *Queue) ExecuteMigrations(ctx context.Context) error { + _, err := q.driver.GetExecutor().Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schema) + if err != nil { + return err + } + + migrator, err := rivermigrate.New(q.driver, nil) + if err != nil { + return err + } + ctx = WithQueue(ctx) + _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) + return err +}