Merge branch 'main' into next

This commit is contained in:
adlerhurst
2024-05-30 11:42:47 +02:00
73 changed files with 2235 additions and 134 deletions

View File

@@ -41,14 +41,24 @@ func Register(ctx context.Context, config Config, view *view.View, static static
))
}
func Projections() []*handler2.Handler {
return projections
}
func Start(ctx context.Context) {
for _, projection := range projections {
projection.Start(ctx)
}
}
func Projections() []*handler2.Handler {
return projections
func ProjectInstance(ctx context.Context) error {
for _, projection := range projections {
_, err := projection.Trigger(ctx)
if err != nil {
return err
}
}
return nil
}
func (config Config) overwrite(viewModel string) handler2.Config {

View File

@@ -63,6 +63,16 @@ func Projections() []*handler2.Handler {
return projections
}
func ProjectInstance(ctx context.Context) error {
for _, projection := range projections {
_, err := projection.Trigger(ctx)
if err != nil {
return err
}
}
return nil
}
func (config Config) overwrite(viewModel string) handler2.Config {
c := handler2.Config{
Client: config.Client,

View File

@@ -171,7 +171,7 @@ func BytesToPrivateKey(priv []byte) (*rsa.PrivateKey, error) {
var ErrEmpty = errors.New("cannot decode, empty data")
func BytesToPublicKey(pub []byte) (*rsa.PublicKey, error) {
if pub == nil {
if len(pub) == 0 {
return nil, ErrEmpty
}
block, _ := pem.Decode(pub)

View File

@@ -14,7 +14,7 @@ import (
)
func init() {
config := &Config{}
config := new(Config)
dialect.Register(config, config, true)
}
@@ -49,11 +49,12 @@ func (c *Config) MatchName(name string) bool {
return false
}
func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) {
func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) {
connector := new(Config)
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: c,
Result: connector,
})
if err != nil {
return nil, err
@@ -65,7 +66,7 @@ func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) {
}
}
return c, nil
return connector, nil
}
func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, error) {

View File

@@ -75,7 +75,7 @@ func (db *DB) QueryRow(scan func(*sql.Row) error, query string, args ...any) (er
func (db *DB) QueryRowContext(ctx context.Context, scan func(row *sql.Row) error, query string, args ...any) (err error) {
ctx, spanBeginTx := tracing.NewNamedSpan(ctx, "db.BeginTx")
tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
tx, err := db.BeginTx(ctx, &sql.TxOptions{ReadOnly: true, Isolation: sql.LevelReadCommitted})
spanBeginTx.EndWithError(err)
if err != nil {
return err

View File

@@ -14,7 +14,7 @@ import (
)
func init() {
config := &Config{}
config := new(Config)
dialect.Register(config, config, false)
}
@@ -50,11 +50,12 @@ func (c *Config) MatchName(name string) bool {
return false
}
func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) {
func (_ *Config) Decode(configs []interface{}) (dialect.Connector, error) {
connector := new(Config)
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
WeaklyTypedInput: true,
Result: c,
Result: connector,
})
if err != nil {
return nil, err
@@ -66,7 +67,7 @@ func (c *Config) Decode(configs []interface{}) (dialect.Connector, error) {
}
}
return c, nil
return connector, nil
}
func (c *Config) Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose dialect.DBPurpose) (*sql.DB, error) {

View File

@@ -259,9 +259,6 @@ func (h *Handler) triggerInstances(ctx context.Context, instances []string, trig
for ; err != nil; _, err = h.Trigger(instanceCtx, triggerOpts...) {
time.Sleep(h.retryFailedAfter)
h.log().WithField("instance", instance).OnError(err).Debug("trigger failed")
if err == nil {
break
}
}
}
}

View File

@@ -44,6 +44,16 @@ func Start(ctx context.Context) {
}
}
func ProjectInstance(ctx context.Context) error {
for _, projection := range projections {
_, err := projection.Trigger(ctx)
if err != nil {
return err
}
}
return nil
}
func Projections() []*handler.Handler {
return projections
}

View File

@@ -181,6 +181,16 @@ func Start(ctx context.Context) {
}
}
func ProjectInstance(ctx context.Context) error {
for _, projection := range projections {
_, err := projection.Trigger(ctx)
if err != nil {
return err
}
}
return nil
}
func ApplyCustomConfig(customConfig CustomConfig) handler.Config {
return applyCustomConfig(projectionConfig, customConfig)
}

View File

@@ -20,7 +20,6 @@ import (
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
es_v4 "github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/eventstore/postgres"
)
type Queries struct {
@@ -46,6 +45,7 @@ type Queries struct {
func StartQueries(
ctx context.Context,
es *eventstore.Eventstore,
esV4 es_v4.Querier,
querySqlClient, projectionSqlClient *database.DB,
projections projection.Config,
defaults sd.SystemDefaults,
@@ -59,7 +59,7 @@ func StartQueries(
) (repo *Queries, err error) {
repo = &Queries{
eventstore: es,
eventStoreV4: postgres.New(querySqlClient),
eventStoreV4: esV4,
client: querySqlClient,
DefaultLanguage: language.Und,
LoginTranslationFileContents: make(map[string][]byte),

View File

@@ -29,14 +29,14 @@ func intentToCommands(intent *intent) (commands []*command, err error) {
}
func marshalPayload(payload any) ([]byte, error) {
if reflect.ValueOf(payload).IsZero() {
if payload == nil || reflect.ValueOf(payload).IsZero() {
return nil, nil
}
return json.Marshal(payload)
}
type command struct {
eventstore.Command
*eventstore.Command
intent *intent

View File

@@ -3,7 +3,9 @@ package postgres
import (
"context"
"database/sql"
"fmt"
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
@@ -28,40 +30,54 @@ func (s *Storage) Push(ctx context.Context, intent *eventstore.PushIntent) (err
}()
}
// allows smaller wait times on query side for instances which are not actively writing
if err := setAppName(ctx, tx, "es_pusher_"+intent.Instance()); err != nil {
return err
}
var retryCount uint32
return crdb.Execute(func() (err error) {
defer func() {
if err == nil {
return
}
if retryCount < s.config.MaxRetries {
retryCount++
return
}
logging.WithFields("retry_count", retryCount).WithError(err).Debug("max retry count reached")
err = zerrors.ThrowInternal(err, "POSTG-VJfJz", "Errors.Internal")
}()
// allows smaller wait times on query side for instances which are not actively writing
if err := setAppName(ctx, tx, "es_pusher_"+intent.Instance()); err != nil {
return err
}
intents, err := lockAggregates(ctx, tx, intent)
if err != nil {
return err
}
if !checkSequences(intents) {
return zerrors.ThrowInvalidArgument(nil, "POSTG-KOM6E", "Errors.Internal.Eventstore.SequenceNotMatched")
}
commands := make([]*command, 0, len(intents))
for _, intent := range intents {
additionalCommands, err := intentToCommands(intent)
intents, err := lockAggregates(ctx, tx, intent)
if err != nil {
return err
}
commands = append(commands, additionalCommands...)
}
err = uniqueConstraints(ctx, tx, commands)
if err != nil {
return err
}
if !checkSequences(intents) {
return zerrors.ThrowInvalidArgument(nil, "POSTG-KOM6E", "Errors.Internal.Eventstore.SequenceNotMatched")
}
return push(ctx, tx, intent, commands)
commands := make([]*command, 0, len(intents))
for _, intent := range intents {
additionalCommands, err := intentToCommands(intent)
if err != nil {
return err
}
commands = append(commands, additionalCommands...)
}
err = uniqueConstraints(ctx, tx, commands)
if err != nil {
return err
}
return push(ctx, tx, intent, commands)
})
}
// setAppName for the the current transaction
func setAppName(ctx context.Context, tx *sql.Tx, name string) error {
_, err := tx.ExecContext(ctx, "SET LOCAL application_name TO $1", name)
_, err := tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL application_name TO '%s'", name))
if err != nil {
logging.WithFields("name", name).WithError(err).Debug("setting app name failed")
return zerrors.ThrowInternal(err, "POSTG-G3OmZ", "Errors.Internal")
@@ -154,7 +170,8 @@ func push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands
cmd.sequence,
cmd.position.InPositionOrder,
)
stmt.WriteString(", statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp())")
stmt.WriteString(pushPositionStmt)
stmt.WriteString(`)`)
}
stmt.WriteString(` RETURNING created_at, "position"`)

View File

@@ -36,7 +36,9 @@ func Test_uniqueConstraints(t *testing.T) {
name: "command without constraints",
args: args{
commands: []*command{
{},
{
Command: &eventstore.Command{},
},
},
expectations: []mock.Expectation{},
},
@@ -53,7 +55,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
},
@@ -81,7 +83,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddGlobalUniqueConstraint("test", "id", "error"),
},
@@ -109,7 +111,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
eventstore.NewAddEventUniqueConstraint("test", "id2", "error"),
@@ -143,7 +145,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
},
@@ -156,7 +158,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id2", "error"),
},
@@ -189,7 +191,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
},
@@ -217,7 +219,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
},
@@ -230,7 +232,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
},
@@ -263,7 +265,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
},
@@ -291,7 +293,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveGlobalUniqueConstraint("test", "id"),
},
@@ -319,7 +321,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
eventstore.NewRemoveUniqueConstraint("test", "id2"),
@@ -353,7 +355,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
},
@@ -366,7 +368,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id2"),
},
@@ -399,7 +401,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", ""),
},
@@ -433,7 +435,7 @@ func Test_uniqueConstraints(t *testing.T) {
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "My.Error"),
},
@@ -786,7 +788,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -841,7 +843,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -857,7 +859,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -926,7 +928,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -942,7 +944,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "type2", "id2"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1011,7 +1013,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1067,7 +1069,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1123,7 +1125,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1139,7 +1141,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1214,7 +1216,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1230,7 +1232,7 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Command: eventstore.Command{
Command: &eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
@@ -1286,6 +1288,7 @@ func Test_push(t *testing.T) {
},
},
}
initPushStmt("postgres")
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
dbMock := mock.NewSQLMock(t, append([]mock.Expectation{mock.ExpectBegin(nil)}, tt.args.expectations...)...)

View File

@@ -194,6 +194,7 @@ func writeAggregateFilters(stmt *database.Statement, filters []*eventstore.Aggre
func writeAggregateFilter(stmt *database.Statement, filter *eventstore.AggregateFilter) {
conditions := definedConditions([]*condition{
{column: "owner", condition: filter.Owners()},
{column: "aggregate_type", condition: filter.Type()},
{column: "aggregate_id", condition: filter.IDs()},
})

View File

@@ -3,6 +3,8 @@ package postgres
import (
"context"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/v2/eventstore"
)
@@ -10,15 +12,35 @@ import (
var (
_ eventstore.Pusher = (*Storage)(nil)
_ eventstore.Querier = (*Storage)(nil)
pushPositionStmt string
)
type Storage struct {
client *database.DB
config *Config
}
func New(client *database.DB) *Storage {
type Config struct {
MaxRetries uint32
}
func New(client *database.DB, config *Config) *Storage {
initPushStmt(client.Type())
return &Storage{
client: client,
config: config,
}
}
func initPushStmt(typ string) {
switch typ {
case "cockroach":
pushPositionStmt = ", hlc_to_timestamp(cluster_logical_timestamp()), cluster_logical_timestamp()"
case "postgres":
pushPositionStmt = ", statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp())"
default:
logging.WithFields("database_type", typ).Panic("position statement for type not implemented")
}
}

View File

@@ -87,7 +87,7 @@ type PushAggregate struct {
// owner of the aggregate
owner string
// Commands is an ordered list of changes on the aggregate
commands []Command
commands []*Command
// CurrentSequence checks the current state of the aggregate.
// The following types match the current sequence of the aggregate as described:
// * nil or [SequenceIgnore]: Not relevant to add the commands
@@ -122,7 +122,7 @@ func (pa *PushAggregate) Owner() string {
return pa.owner
}
func (pa *PushAggregate) Commands() []Command {
func (pa *PushAggregate) Commands() []*Command {
return pa.commands
}
@@ -165,7 +165,7 @@ func CurrentSequenceAtLeast(sequence uint32) PushAggregateOpt {
}
}
func AppendCommands(commands ...Command) PushAggregateOpt {
func AppendCommands(commands ...*Command) PushAggregateOpt {
return func(pa *PushAggregate) {
pa.commands = append(pa.commands, commands...)
}

View File

@@ -255,6 +255,7 @@ func NewAggregateFilter(typ string, opts ...AggregateFilterOpt) *AggregateFilter
type AggregateFilter struct {
typ string
ids []string
owners *filter[[]string]
events []*EventFilter
}
@@ -273,6 +274,13 @@ func (f *AggregateFilter) IDs() database.Condition {
return database.NewListContains(f.ids...)
}
func (f *AggregateFilter) Owners() database.Condition {
if f.owners == nil {
return nil
}
return f.owners.condition
}
func (f *AggregateFilter) Events() []*EventFilter {
return f.events
}
@@ -298,6 +306,61 @@ func AggregateIDs(ids ...string) AggregateFilterOpt {
}
}
func AggregateOwnersEqual(owners ...string) AggregateFilterOpt {
return func(f *AggregateFilter) {
var cond database.Condition
switch len(owners) {
case 0:
return
case 1:
cond = database.NewTextEqual(owners[0])
default:
cond = database.NewListEquals(owners...)
}
f.owners = &filter[[]string]{
condition: cond,
value: &owners,
}
}
}
func AggregateOwnersContains(owners ...string) AggregateFilterOpt {
return func(f *AggregateFilter) {
var cond database.Condition
switch len(owners) {
case 0:
return
case 1:
cond = database.NewTextEqual(owners[0])
default:
cond = database.NewListContains(owners...)
}
f.owners = &filter[[]string]{
condition: cond,
value: &owners,
}
}
}
func AggregateOwnersNotContains(owners ...string) AggregateFilterOpt {
return func(f *AggregateFilter) {
var cond database.Condition
switch len(owners) {
case 0:
return
case 1:
cond = database.NewTextUnequal(owners[0])
default:
cond = database.NewListNotContains(owners...)
}
f.owners = &filter[[]string]{
condition: cond,
value: &owners,
}
}
}
func AppendEvent(opts ...EventFilterOpt) AggregateFilterOpt {
return AppendEvents(NewEventFilter(opts...))
}

View File

@@ -0,0 +1,15 @@
package projection
import (
"github.com/zitadel/zitadel/internal/v2/eventstore"
)
type HighestPosition eventstore.GlobalPosition
var _ eventstore.Reducer = (*HighestPosition)(nil)
// Reduce implements eventstore.Reducer.
func (h *HighestPosition) Reduce(events ...*eventstore.StorageEvent) error {
*h = HighestPosition(events[len(events)-1].Position)
return nil
}

View File

@@ -0,0 +1,72 @@
package readmodel
import (
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/v2/system"
"github.com/zitadel/zitadel/internal/v2/system/mirror"
)
type LastSuccessfulMirror struct {
ID string
Position float64
source string
}
func NewLastSuccessfulMirror(source string) *LastSuccessfulMirror {
return &LastSuccessfulMirror{
source: source,
}
}
var _ eventstore.Reducer = (*LastSuccessfulMirror)(nil)
func (p *LastSuccessfulMirror) Filter() *eventstore.Filter {
return eventstore.NewFilter(
eventstore.AppendAggregateFilter(
system.AggregateType,
eventstore.AggregateOwnersEqual(system.AggregateOwner),
eventstore.AppendEvent(
eventstore.SetEventTypes(
mirror.SucceededType,
),
eventstore.EventCreatorsEqual(mirror.Creator),
),
),
eventstore.FilterPagination(
eventstore.Descending(),
),
)
}
// Reduce implements eventstore.Reducer.
func (h *LastSuccessfulMirror) Reduce(events ...*eventstore.StorageEvent) (err error) {
for _, event := range events {
if event.Type == mirror.SucceededType {
err = h.reduceSucceeded(event)
}
if err != nil {
return err
}
}
return nil
}
func (h *LastSuccessfulMirror) reduceSucceeded(event *eventstore.StorageEvent) error {
// if position is set we skip all older events
if h.Position > 0 {
return nil
}
succeededEvent, err := mirror.SucceededEventFromStorage(event)
if err != nil {
return err
}
if h.source != succeededEvent.Payload.Source {
return nil
}
h.Position = succeededEvent.Payload.Position
return nil
}

View File

@@ -0,0 +1,8 @@
package system
const (
AggregateType = "system"
AggregateOwner = "SYSTEM"
AggregateInstance = ""
EventTypePrefix = AggregateType + "."
)

View File

@@ -0,0 +1,8 @@
package mirror
import "github.com/zitadel/zitadel/internal/v2/system"
const (
Creator = "MIRROR"
eventTypePrefix = system.EventTypePrefix + "mirror."
)

View File

@@ -0,0 +1,52 @@
package mirror
import (
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
type failedPayload struct {
Cause string `json:"cause"`
// Source is the name of the database data are mirrored to
Source string `json:"source"`
}
const FailedType = eventTypePrefix + "failed"
type FailedEvent eventstore.Event[failedPayload]
var _ eventstore.TypeChecker = (*FailedEvent)(nil)
func (e *FailedEvent) ActionType() string {
return FailedType
}
func FailedEventFromStorage(event *eventstore.StorageEvent) (e *FailedEvent, _ error) {
if event.Type != e.ActionType() {
return nil, zerrors.ThrowInvalidArgument(nil, "MIRRO-bwB9l", "Errors.Invalid.Event.Type")
}
payload, err := eventstore.UnmarshalPayload[failedPayload](event.Payload)
if err != nil {
return nil, err
}
return &FailedEvent{
StorageEvent: event,
Payload: payload,
}, nil
}
func NewFailedCommand(source string, cause error) *eventstore.Command {
return &eventstore.Command{
Action: eventstore.Action[any]{
Creator: Creator,
Type: FailedType,
Payload: failedPayload{
Cause: cause.Error(),
Source: source,
},
Revision: 1,
},
}
}

View File

@@ -0,0 +1,68 @@
package mirror
import (
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
type startedPayload struct {
// Destination is the name of the database data are mirrored to
Destination string `json:"destination"`
// Either Instances or System needs to be set
Instances []string `json:"instances,omitempty"`
System bool `json:"system,omitempty"`
}
const StartedType = eventTypePrefix + "started"
type StartedEvent eventstore.Event[startedPayload]
var _ eventstore.TypeChecker = (*StartedEvent)(nil)
func (e *StartedEvent) ActionType() string {
return StartedType
}
func StartedEventFromStorage(event *eventstore.StorageEvent) (e *StartedEvent, _ error) {
if event.Type != e.ActionType() {
return nil, zerrors.ThrowInvalidArgument(nil, "MIRRO-bwB9l", "Errors.Invalid.Event.Type")
}
payload, err := eventstore.UnmarshalPayload[startedPayload](event.Payload)
if err != nil {
return nil, err
}
return &StartedEvent{
StorageEvent: event,
Payload: payload,
}, nil
}
func NewStartedSystemCommand(destination string) *eventstore.Command {
return newStartedCommand(&startedPayload{
Destination: destination,
System: true,
})
}
func NewStartedInstancesCommand(destination string, instances []string) (*eventstore.Command, error) {
if len(instances) == 0 {
return nil, zerrors.ThrowInvalidArgument(nil, "MIRRO-8YkrE", "Errors.Mirror.NoInstances")
}
return newStartedCommand(&startedPayload{
Destination: destination,
Instances: instances,
}), nil
}
func newStartedCommand(payload *startedPayload) *eventstore.Command {
return &eventstore.Command{
Action: eventstore.Action[any]{
Creator: Creator,
Type: StartedType,
Revision: 1,
Payload: *payload,
},
}
}

View File

@@ -0,0 +1,53 @@
package mirror
import (
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
type succeededPayload struct {
// Source is the name of the database data are mirrored from
Source string `json:"source"`
// Position until data will be mirrored
Position float64 `json:"position"`
}
const SucceededType = eventTypePrefix + "succeeded"
type SucceededEvent eventstore.Event[succeededPayload]
var _ eventstore.TypeChecker = (*SucceededEvent)(nil)
func (e *SucceededEvent) ActionType() string {
return SucceededType
}
func SucceededEventFromStorage(event *eventstore.StorageEvent) (e *SucceededEvent, _ error) {
if event.Type != e.ActionType() {
return nil, zerrors.ThrowInvalidArgument(nil, "MIRRO-xh5IW", "Errors.Invalid.Event.Type")
}
payload, err := eventstore.UnmarshalPayload[succeededPayload](event.Payload)
if err != nil {
return nil, err
}
return &SucceededEvent{
StorageEvent: event,
Payload: payload,
}, nil
}
func NewSucceededCommand(source string, position float64) *eventstore.Command {
return &eventstore.Command{
Action: eventstore.Action[any]{
Creator: Creator,
Type: SucceededType,
Revision: 1,
Payload: succeededPayload{
Source: source,
Position: position,
},
},
}
}