fix(mirror): add max auth request age configuration (#9812)

The `auth.auth_requests` table is not cleaned up so long running Zitadel
installations can contain many rows.

The mirror command can take long because a the data are first copied
into memory (or disk) on cockroach and users do not get any output from
mirror. This is unfortunate because people don't know if Zitadel got
stuck.

Enhance logging throughout the projection processes and introduce a
configuration option for the maximum age of authentication requests.

None

closes https://github.com/zitadel/zitadel/issues/9764

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Silvan
2025-04-29 17:29:16 +02:00
committed by adlerhurst
parent 45647239d3
commit ba87ac7dc7
11 changed files with 131 additions and 84 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
_ "embed"
"io"
"strconv"
"time"
"github.com/jackc/pgx/v5/stdlib"
@@ -41,12 +42,16 @@ func copyAuth(ctx context.Context, config *Migration) {
logging.OnError(err).Fatal("unable to connect to destination database")
defer destClient.Close()
copyAuthRequests(ctx, sourceClient, destClient)
copyAuthRequests(ctx, sourceClient, destClient, config.MaxAuthRequestAge)
}
func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
func copyAuthRequests(ctx context.Context, source, dest *database.DB, maxAuthRequestAge time.Duration) {
start := time.Now()
logging.Info("creating index on auth.auth_requests.change_date to speed up copy in source database")
_, err := source.ExecContext(ctx, "CREATE INDEX CONCURRENTLY IF NOT EXISTS auth_requests_change_date ON auth.auth_requests (change_date)")
logging.OnError(err).Fatal("unable to create index on auth.auth_requests.change_date")
sourceConn, err := source.Conn(ctx)
logging.OnError(err).Fatal("unable to acquire connection")
defer sourceConn.Close()
@@ -55,9 +60,9 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
errs := make(chan error, 1)
go func() {
err = sourceConn.Raw(func(driverConn interface{}) error {
err = sourceConn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn()
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+") TO STDOUT")
_, err := conn.PgConn().CopyTo(ctx, w, "COPY (SELECT id, regexp_replace(request::TEXT, '\\\\u0000', '', 'g')::JSON request, code, request_type, creation_date, change_date, instance_id FROM auth.auth_requests "+instanceClause()+" AND change_date > NOW() - INTERVAL '"+strconv.FormatFloat(maxAuthRequestAge.Seconds(), 'f', -1, 64)+" seconds') TO STDOUT")
w.Close()
return err
})
@@ -69,7 +74,7 @@ func copyAuthRequests(ctx context.Context, source, dest *database.DB) {
defer destConn.Close()
var affected int64
err = destConn.Raw(func(driverConn interface{}) error {
err = destConn.Raw(func(driverConn any) error {
conn := driverConn.(*stdlib.Conn).Conn()
if shouldReplace {