Files
zitadel/internal/notification/projections.go
Silvan dafde7468d fix(eventstore): use decimal for position (#9881)
# Which Problems Are Solved

Float64 which was used for the event.Position field is [not precise in
go and gets rounded](https://github.com/golang/go/issues/47300). This
can lead to unprecies position tracking of events and therefore
projections especially on cockcoachdb as the position used there is a
big number.

example of a unprecies position:
exact: 1725257931223002628
float64: 1725257931223002624.000000

# How the Problems Are Solved

The float64 was replaced by
[github.com/jackc/pgx-shopspring-decimal](https://github.com/jackc/pgx-shopspring-decimal).

# Additional Changes

Rename `latestSequence`-queries to `latestPosition`

# Additional Context

closes https://github.com/zitadel/zitadel/issues/8863
2025-05-14 10:14:08 +00:00

97 lines
3.6 KiB
Go

package notification
import (
"context"
"fmt"
"time"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/command"
"github.com/zitadel/zitadel/internal/crypto"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
"github.com/zitadel/zitadel/internal/notification/handlers"
_ "github.com/zitadel/zitadel/internal/notification/statik"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/query/projection"
)
var projections []*handler.Handler
func Register(
ctx context.Context,
userHandlerCustomConfig, quotaHandlerCustomConfig, telemetryHandlerCustomConfig, backChannelLogoutHandlerCustomConfig projection.CustomConfig,
telemetryCfg handlers.TelemetryPusherConfig,
externalDomain string,
externalPort uint16,
externalSecure bool,
commands *command.Commands,
queries *query.Queries,
es *eventstore.Eventstore,
otpEmailTmpl, fileSystemPath string,
userEncryption, smtpEncryption, smsEncryption, keysEncryptionAlg crypto.EncryptionAlgorithm,
tokenLifetime time.Duration,
) {
q := handlers.NewNotificationQueries(queries, es, externalDomain, externalPort, externalSecure, fileSystemPath, userEncryption, smtpEncryption, smsEncryption)
c := newChannels(q)
projections = append(projections, handlers.NewUserNotifier(ctx, projection.ApplyCustomConfig(userHandlerCustomConfig), commands, q, c, otpEmailTmpl))
projections = append(projections, handlers.NewQuotaNotifier(ctx, projection.ApplyCustomConfig(quotaHandlerCustomConfig), commands, q, c))
projections = append(projections, handlers.NewBackChannelLogoutNotifier(
ctx,
projection.ApplyCustomConfig(backChannelLogoutHandlerCustomConfig),
commands,
q,
es,
keysEncryptionAlg,
c,
tokenLifetime,
))
if telemetryCfg.Enabled {
projections = append(projections, handlers.NewTelemetryPusher(ctx, telemetryCfg, projection.ApplyCustomConfig(telemetryHandlerCustomConfig), commands, q, c))
}
}
func Start(ctx context.Context) {
for _, projection := range projections {
projection.Start(ctx)
}
}
func SetCurrentState(ctx context.Context, es *eventstore.Eventstore) error {
if len(projections) == 0 {
return nil
}
position, err := es.LatestPosition(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsMaxPosition).InstanceID(authz.GetInstance(ctx).InstanceID()).OrderDesc().Limit(1))
if err != nil {
return err
}
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("set current state of notification projection")
_, err = projection.Trigger(ctx, handler.WithMinPosition(position))
if err != nil {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("current state of notification projection set")
}
return nil
}
func ProjectInstance(ctx context.Context) error {
for i, projection := range projections {
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("starting notification projection")
_, err := projection.Trigger(ctx)
if err != nil {
return err
}
logging.WithFields("name", projection.ProjectionName(), "instance", authz.GetInstance(ctx).InstanceID(), "index", fmt.Sprintf("%d/%d", i, len(projections))).Info("notification projection done")
}
return nil
}
func Projections() []*handler.Handler {
return projections
}