Files
zitadel/internal/auth/repository/eventsourcing/handler/handler.go
Silvan 82e232af72 fix(mirror): add max auth request age configuration (#9812)
# Which Problems Are Solved

The `auth.auth_requests` table is not cleaned up so long running Zitadel
installations can contain many rows.

The mirror command can take long because a the data are first copied
into memory (or disk) on cockroach and users do not get any output from
mirror. This is unfortunate because people don't know if Zitadel got
stuck.

# How the Problems Are Solved

Enhance logging throughout the projection processes and introduce a
configuration option for the maximum age of authentication requests.

# Additional Changes

None

# Additional Context

closes https://github.com/zitadel/zitadel/issues/9764

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
(cherry picked from commit 181186e477)
2025-04-30 15:23:23 +02:00

109 lines
2.8 KiB
Go

package handler
import (
"context"
"fmt"
"time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
handler2 "github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/id"
query2 "github.com/zitadel/zitadel/internal/query"
)
type Config struct {
Client *database.DB
Eventstore *eventstore.Eventstore
BulkLimit uint64
FailureCountUntilSkip uint64
TransactionDuration time.Duration
Handlers map[string]*ConfigOverwrites
ActiveInstancer interface {
ActiveInstances() []string
}
}
type ConfigOverwrites struct {
MinimumCycleDuration time.Duration
}
var projections []*handler.Handler
func Register(ctx context.Context, configs Config, view *view.View, queries *query2.Queries) {
// make sure the slice does not contain old values
projections = nil
projections = append(projections, newUser(ctx,
configs.overwrite("User"),
view,
queries,
))
projections = append(projections, newUserSession(ctx,
configs.overwrite("UserSession"),
view,
queries,
id.SonyFlakeGenerator(),
))
projections = append(projections, newToken(ctx,
configs.overwrite("Token"),
view,
))
projections = append(projections, newRefreshToken(ctx,
configs.overwrite("RefreshToken"),
view,
))
}
func Start(ctx context.Context) {
for _, projection := range projections {
projection.Start(ctx)
}
}
func Projections() []*handler2.Handler {
return projections
}
func ProjectInstance(ctx context.Context) error {
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting auth projection")
_, err := projection.Trigger(ctx)
if err != nil {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("auth projection done")
}
return nil
}
func (config Config) overwrite(viewModel string) handler2.Config {
c := handler2.Config{
Client: config.Client,
Eventstore: config.Eventstore,
BulkLimit: uint16(config.BulkLimit),
RequeueEvery: 3 * time.Minute,
MaxFailureCount: uint8(config.FailureCountUntilSkip),
TransactionDuration: config.TransactionDuration,
ActiveInstancer: config.ActiveInstancer,
}
overwrite, ok := config.Handlers[viewModel]
if !ok {
return c
}
if overwrite.MinimumCycleDuration > 0 {
c.RequeueEvery = overwrite.MinimumCycleDuration
}
return c
}