mirror of
https://github.com/zitadel/zitadel.git
synced 2025-02-28 16:27:23 +00:00
feat: add task queue (#9321)
# Which Problems Are Solved To integrate river as a task queue we need to ensure the migrations of river are executed. # How the Problems Are Solved - A new schema was added to the Zitadel database called "queue" - Added a repeatable setup step to Zitadel which executes the [migrations of river](https://riverqueue.com/docs/migrations#go-migration-api). # Additional Changes - Added more hooks to the databases to properly set the schema for the task queue # Additional Context - Closes https://github.com/zitadel/zitadel/issues/9280
This commit is contained in:
parent
0ea42f1ddf
commit
415bc32ed6
28
cmd/setup/48_river_queue_repeatable.go
Normal file
28
cmd/setup/48_river_queue_repeatable.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
package setup
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
|
"github.com/zitadel/zitadel/internal/eventstore"
|
||||||
|
"github.com/zitadel/zitadel/internal/queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RiverMigrateRepeatable struct {
|
||||||
|
client *database.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *RiverMigrateRepeatable) Execute(ctx context.Context, _ eventstore.Event) error {
|
||||||
|
if mig.client.Type() != "postgres" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return queue.New(mig.client).ExecuteMigrations(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mig *RiverMigrateRepeatable) String() string {
|
||||||
|
return "repeatable_migrate_river"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *RiverMigrateRepeatable) Check(lastRun map[string]interface{}) bool {
|
||||||
|
return true
|
||||||
|
}
|
@ -199,6 +199,9 @@ func Setup(ctx context.Context, config *Config, steps *Steps, masterKey string)
|
|||||||
eventstore: eventstoreClient,
|
eventstore: eventstoreClient,
|
||||||
rolePermissionMappings: config.InternalAuthZ.RolePermissionMappings,
|
rolePermissionMappings: config.InternalAuthZ.RolePermissionMappings,
|
||||||
},
|
},
|
||||||
|
&RiverMigrateRepeatable{
|
||||||
|
client: dbClient,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, step := range []migration.Migration{
|
for _, step := range []migration.Migration{
|
||||||
|
18
go.mod
18
go.mod
@ -42,7 +42,7 @@ require (
|
|||||||
github.com/h2non/gock v1.2.0
|
github.com/h2non/gock v1.2.0
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||||
github.com/improbable-eng/grpc-web v0.15.0
|
github.com/improbable-eng/grpc-web v0.15.0
|
||||||
github.com/jackc/pgx/v5 v5.7.0
|
github.com/jackc/pgx/v5 v5.7.2
|
||||||
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52
|
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52
|
||||||
github.com/jinzhu/gorm v1.9.16
|
github.com/jinzhu/gorm v1.9.16
|
||||||
github.com/k3a/html2text v1.2.1
|
github.com/k3a/html2text v1.2.1
|
||||||
@ -63,7 +63,7 @@ require (
|
|||||||
github.com/sony/sonyflake v1.2.0
|
github.com/sony/sonyflake v1.2.0
|
||||||
github.com/spf13/cobra v1.8.1
|
github.com/spf13/cobra v1.8.1
|
||||||
github.com/spf13/viper v1.19.0
|
github.com/spf13/viper v1.19.0
|
||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.10.0
|
||||||
github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203
|
github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203
|
||||||
github.com/ttacon/libphonenumber v1.2.1
|
github.com/ttacon/libphonenumber v1.2.1
|
||||||
github.com/twilio/twilio-go v1.22.2
|
github.com/twilio/twilio-go v1.22.2
|
||||||
@ -87,7 +87,7 @@ require (
|
|||||||
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
|
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8
|
||||||
golang.org/x/net v0.33.0
|
golang.org/x/net v0.33.0
|
||||||
golang.org/x/oauth2 v0.23.0
|
golang.org/x/oauth2 v0.23.0
|
||||||
golang.org/x/sync v0.10.0
|
golang.org/x/sync v0.11.0
|
||||||
golang.org/x/text v0.21.0
|
golang.org/x/text v0.21.0
|
||||||
google.golang.org/api v0.187.0
|
google.golang.org/api v0.187.0
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd
|
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd
|
||||||
@ -115,7 +115,7 @@ require (
|
|||||||
github.com/google/go-tpm v0.9.0 // indirect
|
github.com/google/go-tpm v0.9.0 // indirect
|
||||||
github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect
|
github.com/google/pprof v0.0.0-20240528025155-186aa0362fba // indirect
|
||||||
github.com/google/s2a-go v0.1.7 // indirect
|
github.com/google/s2a-go v0.1.7 // indirect
|
||||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
|
||||||
github.com/lib/pq v1.10.9 // indirect
|
github.com/lib/pq v1.10.9 // indirect
|
||||||
github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect
|
github.com/mattermost/xml-roundtrip-validator v0.1.0 // indirect
|
||||||
@ -124,11 +124,20 @@ require (
|
|||||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
github.com/riverqueue/river v0.16.0 // indirect
|
||||||
|
github.com/riverqueue/river/riverdriver v0.16.0 // indirect
|
||||||
|
github.com/riverqueue/river/rivershared v0.16.0 // indirect
|
||||||
|
github.com/riverqueue/river/rivertype v0.16.0 // indirect
|
||||||
github.com/sagikazarmark/locafero v0.4.0 // indirect
|
github.com/sagikazarmark/locafero v0.4.0 // indirect
|
||||||
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
|
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
|
||||||
github.com/sourcegraph/conc v0.3.0 // indirect
|
github.com/sourcegraph/conc v0.3.0 // indirect
|
||||||
|
github.com/tidwall/gjson v1.18.0 // indirect
|
||||||
|
github.com/tidwall/match v1.1.1 // indirect
|
||||||
|
github.com/tidwall/pretty v1.2.1 // indirect
|
||||||
|
github.com/tidwall/sjson v1.2.5 // indirect
|
||||||
github.com/yuin/gopher-lua v1.1.1 // indirect
|
github.com/yuin/gopher-lua v1.1.1 // indirect
|
||||||
github.com/zenazn/goji v1.0.1 // indirect
|
github.com/zenazn/goji v1.0.1 // indirect
|
||||||
|
go.uber.org/goleak v1.3.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/time v0.5.0 // indirect
|
golang.org/x/time v0.5.0 // indirect
|
||||||
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect
|
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect
|
||||||
@ -193,6 +202,7 @@ require (
|
|||||||
github.com/prometheus/client_model v0.6.1 // indirect
|
github.com/prometheus/client_model v0.6.1 // indirect
|
||||||
github.com/prometheus/common v0.55.0 // indirect
|
github.com/prometheus/common v0.55.0 // indirect
|
||||||
github.com/prometheus/procfs v0.15.1 // indirect
|
github.com/prometheus/procfs v0.15.1 // indirect
|
||||||
|
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0
|
||||||
github.com/rs/xid v1.5.0 // indirect
|
github.com/rs/xid v1.5.0 // indirect
|
||||||
github.com/russellhaering/goxmldsig v1.4.0 // indirect
|
github.com/russellhaering/goxmldsig v1.4.0 // indirect
|
||||||
github.com/sirupsen/logrus v1.9.3
|
github.com/sirupsen/logrus v1.9.3
|
||||||
|
28
go.sum
28
go.sum
@ -422,8 +422,12 @@ github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7Ulw
|
|||||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||||
github.com/jackc/pgx/v5 v5.7.0 h1:FG6VLIdzvAPhnYqP14sQ2xhFLkiUQHCs6ySqO91kF4g=
|
github.com/jackc/pgx/v5 v5.7.0 h1:FG6VLIdzvAPhnYqP14sQ2xhFLkiUQHCs6ySqO91kF4g=
|
||||||
github.com/jackc/pgx/v5 v5.7.0/go.mod h1:awP1KNnjylvpxHuHP63gzjhnGkI1iw+PMoIwvoleN/8=
|
github.com/jackc/pgx/v5 v5.7.0/go.mod h1:awP1KNnjylvpxHuHP63gzjhnGkI1iw+PMoIwvoleN/8=
|
||||||
|
github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
|
||||||
|
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
|
||||||
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
|
||||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||||
|
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||||
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ=
|
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52 h1:jny9eqYPwkG8IVy7foUoRjQmFLcArCSz+uPsL6KS0HQ=
|
||||||
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52/go.mod h1:RDZ+4PR3mDOtTpVbI0qBE+rdhmtIrtbssiNn38/1OWA=
|
github.com/jarcoal/jpath v0.0.0-20140328210829-f76b8b2dbf52/go.mod h1:RDZ+4PR3mDOtTpVbI0qBE+rdhmtIrtbssiNn38/1OWA=
|
||||||
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
|
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
|
||||||
@ -640,6 +644,16 @@ github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Ung
|
|||||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||||
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
github.com/redis/go-redis/v9 v9.7.0 h1:HhLSs+B6O021gwzl+locl0zEDnyNkxMtf/Z3NNBMa9E=
|
||||||
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw=
|
||||||
|
github.com/riverqueue/river v0.16.0 h1:YyQrs0kGgjuABwgat02DPUYS0TMyG2ZFlzvf6+fSFaw=
|
||||||
|
github.com/riverqueue/river v0.16.0/go.mod h1:pEZ8Gc15XyFjVY89nJeL256ub5z18XF7ukYn8ktqQrs=
|
||||||
|
github.com/riverqueue/river/riverdriver v0.16.0 h1:y4Df4e1Xk3Id0nnu1VxHJn9118OzmRHcmvOxM/i1Q30=
|
||||||
|
github.com/riverqueue/river/riverdriver v0.16.0/go.mod h1:7Kdf5HQDrLyLUUqPqXobaK+7zbcMctWeAl7yhg4nHes=
|
||||||
|
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0 h1:6HP296OPN+3ORL9qG1f561pldB5eovkLzfkNIQmaTXI=
|
||||||
|
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.16.0/go.mod h1:MAeBNoTQ+CD3nRvV9mF6iCBfsGJTxYHZeZSP4MYoeUE=
|
||||||
|
github.com/riverqueue/river/rivershared v0.16.0 h1:L1lQ3gMwdIsxA6yF0/PwAdsFP0T82yBD1V03q2GuJDU=
|
||||||
|
github.com/riverqueue/river/rivershared v0.16.0/go.mod h1:y5Xu8Shcp44DUNnEQV4c6oWH4m2OTkSMCe6nRrgzT34=
|
||||||
|
github.com/riverqueue/river/rivertype v0.16.0 h1:iDjNtCiUbXwLraqNEyQdH/OD80f1wTo8Ai6WHYCwRxs=
|
||||||
|
github.com/riverqueue/river/rivertype v0.16.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE=
|
||||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
@ -715,10 +729,22 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
|
|||||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||||
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||||
github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203 h1:1SWXcTphBQjYGWRRxLFIAR1LVtQEj4eR7xPtyeOVM/c=
|
github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203 h1:1SWXcTphBQjYGWRRxLFIAR1LVtQEj4eR7xPtyeOVM/c=
|
||||||
github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203/go.mod h1:0Xw5cYMOYpgaWs+OOSx41ugycl2qvKTi9tlMMcZhFyY=
|
github.com/superseriousbusiness/exifremove v0.0.0-20210330092427-6acd27eac203/go.mod h1:0Xw5cYMOYpgaWs+OOSx41ugycl2qvKTi9tlMMcZhFyY=
|
||||||
|
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
|
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
|
||||||
|
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
|
||||||
|
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
|
||||||
|
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
|
||||||
|
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||||
|
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
|
||||||
|
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
|
||||||
|
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
|
||||||
|
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
|
||||||
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
|
||||||
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 h1:5u+EJUQiosu3JFX0XS0qTf5FznsMOzTjGqavBGuCbo0=
|
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2 h1:5u+EJUQiosu3JFX0XS0qTf5FznsMOzTjGqavBGuCbo0=
|
||||||
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2/go.mod h1:4kyMkleCiLkgY6z8gK5BkI01ChBtxR0ro3I1ZDcGM3w=
|
github.com/ttacon/builder v0.0.0-20170518171403-c099f663e1c2/go.mod h1:4kyMkleCiLkgY6z8gK5BkI01ChBtxR0ro3I1ZDcGM3w=
|
||||||
@ -900,6 +926,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
|
|||||||
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
|
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
|
||||||
|
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||||
|
@ -97,6 +97,27 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(connConfig.BeforeAcquire) > 0 {
|
||||||
|
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
|
||||||
|
for _, f := range connConfig.BeforeAcquire {
|
||||||
|
if err := f(ctx, conn); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(connConfig.AfterRelease) > 0 {
|
||||||
|
config.AfterRelease = func(conn *pgx.Conn) bool {
|
||||||
|
for _, f := range connConfig.AfterRelease {
|
||||||
|
if err := f(conn); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if connConfig.MaxOpenConns != 0 {
|
if connConfig.MaxOpenConns != 0 {
|
||||||
config.MaxConns = int32(connConfig.MaxOpenConns)
|
config.MaxConns = int32(connConfig.MaxOpenConns)
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,9 @@ var (
|
|||||||
type ConnectionConfig struct {
|
type ConnectionConfig struct {
|
||||||
MaxOpenConns,
|
MaxOpenConns,
|
||||||
MaxIdleConns uint32
|
MaxIdleConns uint32
|
||||||
AfterConnect []func(ctx context.Context, c *pgx.Conn) error
|
AfterConnect []func(ctx context.Context, c *pgx.Conn) error
|
||||||
|
BeforeAcquire []func(ctx context.Context, c *pgx.Conn) error
|
||||||
|
AfterRelease []func(c *pgx.Conn) error
|
||||||
}
|
}
|
||||||
|
|
||||||
var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error
|
var afterConnectFuncs []func(ctx context.Context, c *pgx.Conn) error
|
||||||
@ -27,6 +29,18 @@ func RegisterAfterConnect(f func(ctx context.Context, c *pgx.Conn) error) {
|
|||||||
afterConnectFuncs = append(afterConnectFuncs, f)
|
afterConnectFuncs = append(afterConnectFuncs, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var beforeAcquireFuncs []func(ctx context.Context, c *pgx.Conn) error
|
||||||
|
|
||||||
|
func RegisterBeforeAcquire(f func(ctx context.Context, c *pgx.Conn) error) {
|
||||||
|
beforeAcquireFuncs = append(beforeAcquireFuncs, f)
|
||||||
|
}
|
||||||
|
|
||||||
|
var afterReleaseFuncs []func(c *pgx.Conn) error
|
||||||
|
|
||||||
|
func RegisterAfterRelease(f func(c *pgx.Conn) error) {
|
||||||
|
afterReleaseFuncs = append(afterReleaseFuncs, f)
|
||||||
|
}
|
||||||
|
|
||||||
func RegisterDefaultPgTypeVariants[T any](m *pgtype.Map, name, arrayName string) {
|
func RegisterDefaultPgTypeVariants[T any](m *pgtype.Map, name, arrayName string) {
|
||||||
// T
|
// T
|
||||||
var value T
|
var value T
|
||||||
@ -58,8 +72,10 @@ func RegisterDefaultPgTypeVariants[T any](m *pgtype.Map, name, arrayName string)
|
|||||||
// The pusherRatio and spoolerRatio must be between 0 and 1.
|
// The pusherRatio and spoolerRatio must be between 0 and 1.
|
||||||
func NewConnectionConfig(openConns, idleConns uint32) *ConnectionConfig {
|
func NewConnectionConfig(openConns, idleConns uint32) *ConnectionConfig {
|
||||||
return &ConnectionConfig{
|
return &ConnectionConfig{
|
||||||
MaxOpenConns: openConns,
|
MaxOpenConns: openConns,
|
||||||
MaxIdleConns: idleConns,
|
MaxIdleConns: idleConns,
|
||||||
AfterConnect: afterConnectFuncs,
|
AfterConnect: afterConnectFuncs,
|
||||||
|
BeforeAcquire: beforeAcquireFuncs,
|
||||||
|
AfterRelease: afterReleaseFuncs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -81,13 +81,37 @@ func (c *Config) Connect(useAdmin bool) (*sql.DB, *pgxpool.Pool, error) {
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
|
if len(connConfig.AfterConnect) > 0 {
|
||||||
for _, f := range connConfig.AfterConnect {
|
config.AfterConnect = func(ctx context.Context, conn *pgx.Conn) error {
|
||||||
if err := f(ctx, conn); err != nil {
|
for _, f := range connConfig.AfterConnect {
|
||||||
return err
|
if err := f(ctx, conn); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(connConfig.BeforeAcquire) > 0 {
|
||||||
|
config.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
|
||||||
|
for _, f := range connConfig.BeforeAcquire {
|
||||||
|
if err := f(ctx, conn); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(connConfig.AfterRelease) > 0 {
|
||||||
|
config.AfterRelease = func(conn *pgx.Conn) bool {
|
||||||
|
for _, f := range connConfig.AfterRelease {
|
||||||
|
if err := f(conn); err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if connConfig.MaxOpenConns != 0 {
|
if connConfig.MaxOpenConns != 0 {
|
||||||
|
75
internal/queue/queue.go
Normal file
75
internal/queue/queue.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/jackc/pgx/v5"
|
||||||
|
"github.com/riverqueue/river/riverdriver"
|
||||||
|
"github.com/riverqueue/river/riverdriver/riverpgxv5"
|
||||||
|
"github.com/riverqueue/river/rivermigrate"
|
||||||
|
|
||||||
|
"github.com/zitadel/zitadel/internal/database"
|
||||||
|
"github.com/zitadel/zitadel/internal/database/dialect"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
schema = "queue"
|
||||||
|
applicationName = "zitadel_queue"
|
||||||
|
)
|
||||||
|
|
||||||
|
var conns = &sync.Map{}
|
||||||
|
|
||||||
|
type queueKey struct{}
|
||||||
|
|
||||||
|
func WithQueue(parent context.Context) context.Context {
|
||||||
|
return context.WithValue(parent, queueKey{}, struct{}{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
dialect.RegisterBeforeAcquire(func(ctx context.Context, c *pgx.Conn) error {
|
||||||
|
if _, ok := ctx.Value(queueKey{}).(struct{}); !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := c.Exec(ctx, "SET search_path TO "+schema+"; SET application_name TO "+applicationName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
conns.Store(c, struct{}{})
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
dialect.RegisterAfterRelease(func(c *pgx.Conn) error {
|
||||||
|
_, ok := conns.LoadAndDelete(c)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err := c.Exec(context.Background(), "SET search_path TO DEFAULT; SET application_name TO "+dialect.DefaultAppName)
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Queue abstracts the underlying queuing library
|
||||||
|
// For more information see github.com/riverqueue/river
|
||||||
|
// TODO(adlerhurst): maybe it makes more sense to split the effective queue from the migrator.
|
||||||
|
type Queue struct {
|
||||||
|
driver riverdriver.Driver[pgx.Tx]
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(client *database.DB) *Queue {
|
||||||
|
return &Queue{driver: riverpgxv5.New(client.Pool)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *Queue) ExecuteMigrations(ctx context.Context) error {
|
||||||
|
_, err := q.driver.GetExecutor().Exec(ctx, "CREATE SCHEMA IF NOT EXISTS "+schema)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
migrator, err := rivermigrate.New(q.driver, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ctx = WithQueue(ctx)
|
||||||
|
_, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, nil)
|
||||||
|
return err
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user