diff --git a/internal/queue/database.go b/internal/queue/database.go index c5eb0b8ca3..93bf5abb9f 100644 --- a/internal/queue/database.go +++ b/internal/queue/database.go @@ -1,45 +1,6 @@ package queue -import ( - "context" - "sync" - - "github.com/jackc/pgx/v5" - - "github.com/zitadel/zitadel/internal/database/dialect" -) - const ( schema = "queue" applicationName = "zitadel_queue" ) - -var conns = &sync.Map{} - -type queueKey struct{} - -func WithQueue(parent context.Context) context.Context { - return context.WithValue(parent, queueKey{}, struct{}{}) -} - -func init() { - dialect.RegisterBeforeAcquire(func(ctx context.Context, c *pgx.Conn) error { - if _, ok := ctx.Value(queueKey{}).(struct{}); !ok { - return nil - } - _, err := c.Exec(ctx, "SET search_path TO "+schema+"; SET application_name TO "+applicationName) - if err != nil { - return err - } - conns.Store(c, struct{}{}) - return nil - }) - dialect.RegisterAfterRelease(func(c *pgx.Conn) error { - _, ok := conns.LoadAndDelete(c) - if !ok { - return nil - } - _, err := c.Exec(context.Background(), "SET search_path TO DEFAULT; SET application_name TO "+dialect.DefaultAppName) - return err - }) -} diff --git a/internal/queue/migrate.go b/internal/queue/migrate.go index e814da3bd3..9af294cbe1 100644 --- a/internal/queue/migrate.go +++ b/internal/queue/migrate.go @@ -27,11 +27,10 @@ func (m *Migrator) Execute(ctx context.Context) error { return err } - migrator, err := rivermigrate.New(m.driver, nil) + migrator, err := rivermigrate.New(m.driver, &rivermigrate.Config{Schema: schema}) if err != nil { return err } - ctx = WithQueue(ctx) _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) return err diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 44e291bf4d..6db5e0ec2a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -41,6 +41,7 @@ func NewQueue(config *Config) (_ *Queue, err error) { Queues: make(map[string]river.QueueConfig), JobTimeout: -1, Middleware: middleware, + Schema: schema, }, }, nil } @@ -56,7 +57,6 @@ func (q *Queue) Start(ctx context.Context) (err error) { if q == nil || !q.shouldStart { return nil } - ctx = WithQueue(ctx) q.client, err = river.NewClient(q.driver, q.config) if err != nil { @@ -112,7 +112,6 @@ func WithQueueName(name string) InsertOpt { func (q *Queue) Insert(ctx context.Context, args river.JobArgs, opts ...InsertOpt) error { options := new(river.InsertOpts) - ctx = WithQueue(ctx) for _, opt := range opts { opt(options) }