From b5f97d64b04039e38871dcab41454e19cc4d798b Mon Sep 17 00:00:00 2001 From: Silvan <27845747+adlerhurst@users.noreply.github.com> Date: Tue, 29 Jul 2025 09:09:00 +0200 Subject: [PATCH] chore(queue): use schema config instead of `search_path` and `application_name` to configure the database schema (#10075) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Removes manual schema and application name setup via raw SQL and switches to using River’s built-in schema configuration. # Which Problems Are Solved River provides a configuration flag to set the schema of the queue. Zitadel sets the schema through database statements which is not needed anymore. # How the Problems Are Solved Set the schema in the river configuration and removed old code --- internal/queue/database.go | 39 -------------------------------------- internal/queue/migrate.go | 3 +-- internal/queue/queue.go | 3 +-- 3 files changed, 2 insertions(+), 43 deletions(-) diff --git a/internal/queue/database.go b/internal/queue/database.go index c5eb0b8ca3..93bf5abb9f 100644 --- a/internal/queue/database.go +++ b/internal/queue/database.go @@ -1,45 +1,6 @@ package queue -import ( - "context" - "sync" - - "github.com/jackc/pgx/v5" - - "github.com/zitadel/zitadel/internal/database/dialect" -) - const ( schema = "queue" applicationName = "zitadel_queue" ) - -var conns = &sync.Map{} - -type queueKey struct{} - -func WithQueue(parent context.Context) context.Context { - return context.WithValue(parent, queueKey{}, struct{}{}) -} - -func init() { - dialect.RegisterBeforeAcquire(func(ctx context.Context, c *pgx.Conn) error { - if _, ok := ctx.Value(queueKey{}).(struct{}); !ok { - return nil - } - _, err := c.Exec(ctx, "SET search_path TO "+schema+"; SET application_name TO "+applicationName) - if err != nil { - return err - } - conns.Store(c, struct{}{}) - return nil - }) - dialect.RegisterAfterRelease(func(c *pgx.Conn) error { - _, ok := conns.LoadAndDelete(c) - if !ok { - return nil - } - _, err := c.Exec(context.Background(), "SET search_path TO DEFAULT; SET application_name TO "+dialect.DefaultAppName) - return err - }) -} diff --git a/internal/queue/migrate.go b/internal/queue/migrate.go index e814da3bd3..9af294cbe1 100644 --- a/internal/queue/migrate.go +++ b/internal/queue/migrate.go @@ -27,11 +27,10 @@ func (m *Migrator) Execute(ctx context.Context) error { return err } - migrator, err := rivermigrate.New(m.driver, nil) + migrator, err := rivermigrate.New(m.driver, &rivermigrate.Config{Schema: schema}) if err != nil { return err } - ctx = WithQueue(ctx) _, err = migrator.Migrate(ctx, rivermigrate.DirectionUp, nil) return err diff --git a/internal/queue/queue.go b/internal/queue/queue.go index 44e291bf4d..6db5e0ec2a 100644 --- a/internal/queue/queue.go +++ b/internal/queue/queue.go @@ -41,6 +41,7 @@ func NewQueue(config *Config) (_ *Queue, err error) { Queues: make(map[string]river.QueueConfig), JobTimeout: -1, Middleware: middleware, + Schema: schema, }, }, nil } @@ -56,7 +57,6 @@ func (q *Queue) Start(ctx context.Context) (err error) { if q == nil || !q.shouldStart { return nil } - ctx = WithQueue(ctx) q.client, err = river.NewClient(q.driver, q.config) if err != nil { @@ -112,7 +112,6 @@ func WithQueueName(name string) InsertOpt { func (q *Queue) Insert(ctx context.Context, args river.JobArgs, opts ...InsertOpt) error { options := new(river.InsertOpts) - ctx = WithQueue(ctx) for _, opt := range opts { opt(options) }