refactor(v2): init eventstore package (#7806)

* refactor(v2): init database package

* refactor(v2): init eventstore package

* add mock package

* test query constructors

* option based push analog to query
This commit is contained in:
Silvan 2024-04-26 17:05:21 +02:00 committed by GitHub
parent 2254434692
commit 5811a7b6a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 5681 additions and 0 deletions

View File

@ -0,0 +1,24 @@
package eventstore
type Aggregate struct {
ID string
Type string
Instance string
Owner string
}
func (agg *Aggregate) Equals(aggregate *Aggregate) bool {
if aggregate.ID != "" && aggregate.ID != agg.ID {
return false
}
if aggregate.Type != "" && aggregate.Type != agg.Type {
return false
}
if aggregate.Instance != "" && aggregate.Instance != agg.Instance {
return false
}
if aggregate.Owner != "" && aggregate.Owner != agg.Owner {
return false
}
return true
}

View File

@ -0,0 +1,29 @@
package eventstore
type CurrentSequence func(current uint32) bool
func CheckSequence(current uint32, check CurrentSequence) bool {
if check == nil {
return true
}
return check(current)
}
// SequenceIgnore doesn't check the current sequence
func SequenceIgnore() CurrentSequence {
return nil
}
// SequenceMatches exactly the provided sequence
func SequenceMatches(sequence uint32) CurrentSequence {
return func(current uint32) bool {
return current == sequence
}
}
// SequenceAtLeast matches the given sequence <= the current sequence
func SequenceAtLeast(sequence uint32) CurrentSequence {
return func(current uint32) bool {
return current >= sequence
}
}

View File

@ -0,0 +1,36 @@
package eventstore
import "time"
type Event[P any] struct {
Aggregate Aggregate
CreatedAt time.Time
Creator string
Position GlobalPosition
Revision uint16
Sequence uint32
Type string
Payload P
}
type StoragePayload interface {
Unmarshal(ptr any) error
}
func EventFromStorage[E Event[P], P any](event *Event[StoragePayload]) (*E, error) {
var payload P
if err := event.Payload.Unmarshal(&payload); err != nil {
return nil, err
}
return &E{
Aggregate: event.Aggregate,
CreatedAt: event.CreatedAt,
Creator: event.Creator,
Position: event.Position,
Revision: event.Revision,
Sequence: event.Sequence,
Type: event.Type,
Payload: payload,
}, nil
}

View File

@ -0,0 +1,41 @@
package eventstore
import (
"context"
)
func NewEventstore(querier Querier, pusher Pusher) *EventStore {
return &EventStore{
Pusher: pusher,
Querier: querier,
}
}
func NewEventstoreFromOne(o one) *EventStore {
return NewEventstore(o, o)
}
type EventStore struct {
Pusher
Querier
}
type one interface {
Pusher
Querier
}
type healthier interface {
Health(ctx context.Context) error
}
type GlobalPosition struct {
Position float64
InPositionOrder uint32
}
type Reducer interface {
Reduce(events ...*Event[StoragePayload]) error
}
type Reduce func(events ...*Event[StoragePayload]) error

View File

@ -0,0 +1,64 @@
package postgres
import (
"encoding/json"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
func intentToCommands(intent *intent) (commands []*command, err error) {
commands = make([]*command, len(intent.Commands()))
for i, cmd := range intent.Commands() {
var payload unmarshalPayload
if cmd.Payload() != nil {
payload, err = json.Marshal(cmd.Payload())
if err != nil {
logging.WithError(err).Warn("marshal payload failed")
return nil, zerrors.ThrowInternal(err, "POSTG-MInPK", "Errors.Internal")
}
}
commands[i] = &command{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *intent.Aggregate(),
Creator: cmd.Creator(),
Revision: cmd.Revision(),
Type: cmd.Type(),
// always add at least 1 to the currently stored sequence
Sequence: intent.sequence + uint32(i) + 1,
Payload: payload,
},
intent: intent,
uniqueConstraints: cmd.UniqueConstraints(),
}
}
return commands, nil
}
type command struct {
*eventstore.Event[eventstore.StoragePayload]
intent *intent
uniqueConstraints []*eventstore.UniqueConstraint
}
var _ eventstore.StoragePayload = (unmarshalPayload)(nil)
type unmarshalPayload []byte
// Unmarshal implements eventstore.StoragePayload.
func (p unmarshalPayload) Unmarshal(ptr any) error {
if len(p) == 0 {
return nil
}
if err := json.Unmarshal(p, ptr); err != nil {
return zerrors.ThrowInternal(err, "POSTG-u8qVo", "Errors.Internal")
}
return nil
}

View File

@ -0,0 +1,42 @@
package postgres
import (
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/v2/eventstore"
)
type intent struct {
*eventstore.PushAggregate
sequence uint32
}
func makeIntents(pushIntent *eventstore.PushIntent) []*intent {
res := make([]*intent, len(pushIntent.Aggregates()))
for i, aggregate := range pushIntent.Aggregates() {
res[i] = &intent{PushAggregate: aggregate}
}
return res
}
func intentByAggregate(intents []*intent, aggregate *eventstore.Aggregate) *intent {
for _, intent := range intents {
if intent.PushAggregate.Aggregate().Equals(aggregate) {
return intent
}
}
logging.WithFields("instance", aggregate.Instance, "owner", aggregate.Owner, "type", aggregate.Type, "id", aggregate.ID).Panic("no intent found")
return nil
}
func checkSequences(intents []*intent) bool {
for _, intent := range intents {
if !eventstore.CheckSequence(intent.sequence, intent.PushAggregate.CurrentSequence()) {
return false
}
}
return true
}

View File

@ -0,0 +1,122 @@
package postgres
import (
"testing"
"github.com/zitadel/zitadel/internal/v2/eventstore"
)
func Test_checkSequences(t *testing.T) {
type args struct {
intents []*intent
}
tests := []struct {
name string
args args
want bool
}{
{
name: "ignore",
args: args{
intents: []*intent{
{
sequence: 1,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
eventstore.IgnoreCurrentSequence(),
),
},
},
},
want: true,
},
{
name: "ignores",
args: args{
intents: []*intent{
{
sequence: 1,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
eventstore.IgnoreCurrentSequence(),
),
},
{
sequence: 1,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
),
},
},
},
want: true,
},
{
name: "matches",
args: args{
intents: []*intent{
{
sequence: 0,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
eventstore.CurrentSequenceMatches(0),
),
},
},
},
want: true,
},
{
name: "does not match",
args: args{
intents: []*intent{
{
sequence: 1,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
eventstore.CurrentSequenceMatches(2),
),
},
},
},
want: false,
},
{
name: "at least",
args: args{
intents: []*intent{
{
sequence: 10,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
eventstore.CurrentSequenceAtLeast(0),
),
},
},
},
want: true,
},
{
name: "at least too low",
args: args{
intents: []*intent{
{
sequence: 1,
PushAggregate: eventstore.NewPushAggregate(
"", "", "",
eventstore.CurrentSequenceAtLeast(2),
),
},
},
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := checkSequences(tt.args.intents); got != tt.want {
t.Errorf("checkSequences() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,245 @@
package postgres
import (
"context"
"database/sql"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/v2/database"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
)
// Push implements eventstore.Pusher.
func (s *Storage) Push(ctx context.Context, intent *eventstore.PushIntent) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
tx := intent.Tx()
if tx == nil {
tx, err = s.client.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable, ReadOnly: false})
if err != nil {
return err
}
defer func() {
err = database.CloseTx(tx, err)
}()
}
// allows smaller wait times on query side for instances which are not actively writing
if err := setAppName(ctx, tx, "es_pusher_"+intent.Instance()); err != nil {
return err
}
intents, err := lockAggregates(ctx, tx, intent)
if err != nil {
return err
}
if !checkSequences(intents) {
return zerrors.ThrowInvalidArgument(nil, "POSTG-KOM6E", "Errors.Internal.Eventstore.SequenceNotMatched")
}
commands := make([]*command, 0, len(intents))
for _, intent := range intents {
additionalCommands, err := intentToCommands(intent)
if err != nil {
return err
}
commands = append(commands, additionalCommands...)
}
err = uniqueConstraints(ctx, tx, commands)
if err != nil {
return err
}
return push(ctx, tx, intent, commands)
}
// setAppName for the the current transaction
func setAppName(ctx context.Context, tx *sql.Tx, name string) error {
_, err := tx.ExecContext(ctx, "SET LOCAL application_name TO $1", name)
if err != nil {
logging.WithFields("name", name).WithError(err).Debug("setting app name failed")
return zerrors.ThrowInternal(err, "POSTG-G3OmZ", "Errors.Internal")
}
return nil
}
func lockAggregates(ctx context.Context, tx *sql.Tx, intent *eventstore.PushIntent) (_ []*intent, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
var stmt database.Statement
stmt.WriteString("WITH existing AS (")
for i, aggregate := range intent.Aggregates() {
if i > 0 {
stmt.WriteString(" UNION ALL ")
}
stmt.WriteString(`(SELECT instance_id, aggregate_type, aggregate_id, "sequence" FROM eventstore.events2 WHERE instance_id = `)
stmt.WriteArgs(intent.Instance())
stmt.WriteString(` AND aggregate_type = `)
stmt.WriteArgs(aggregate.Type())
stmt.WriteString(` AND aggregate_id = `)
stmt.WriteArgs(aggregate.ID())
stmt.WriteString(` AND owner = `)
stmt.WriteArgs(aggregate.Owner())
stmt.WriteString(` ORDER BY "sequence" DESC LIMIT 1)`)
}
stmt.WriteString(") SELECT e.instance_id, e.owner, e.aggregate_type, e.aggregate_id, e.sequence FROM eventstore.events2 e JOIN existing ON e.instance_id = existing.instance_id AND e.aggregate_type = existing.aggregate_type AND e.aggregate_id = existing.aggregate_id AND e.sequence = existing.sequence FOR UPDATE")
//nolint:rowserrcheck
// rows is checked by database.MapRowsToObject
rows, err := tx.QueryContext(ctx, stmt.String(), stmt.Args()...)
if err != nil {
return nil, err
}
res := makeIntents(intent)
err = database.MapRowsToObject(rows, func(scan func(dest ...any) error) error {
var sequence sql.Null[uint32]
agg := new(eventstore.Aggregate)
err := scan(
&agg.Instance,
&agg.Owner,
&agg.Type,
&agg.ID,
&sequence,
)
if err != nil {
return err
}
intentByAggregate(res, agg).sequence = sequence.V
return nil
})
if err != nil {
return nil, err
}
return res, nil
}
func push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands []*command) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
var stmt database.Statement
stmt.WriteString(`INSERT INTO eventstore.events2 (instance_id, "owner", aggregate_type, aggregate_id, revision, creator, event_type, payload, "sequence", in_tx_order, created_at, "position") VALUES `)
for i, cmd := range commands {
if i > 0 {
stmt.WriteString(", ")
}
cmd.Position.InPositionOrder = uint32(i)
stmt.WriteString(`(`)
stmt.WriteArgs(
cmd.Aggregate.Instance,
cmd.Aggregate.Owner,
cmd.Aggregate.Type,
cmd.Aggregate.ID,
cmd.Revision,
cmd.Creator,
cmd.Type,
cmd.Payload,
cmd.Sequence,
i,
)
stmt.WriteString(", statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp())")
stmt.WriteString(`)`)
}
stmt.WriteString(` RETURNING created_at, "position"`)
//nolint:rowserrcheck
// rows is checked by database.MapRowsToObject
rows, err := tx.QueryContext(ctx, stmt.String(), stmt.Args()...)
if err != nil {
return err
}
var i int
return database.MapRowsToObject(rows, func(scan func(dest ...any) error) error {
defer func() { i++ }()
err := scan(
&commands[i].CreatedAt,
&commands[i].Position.Position,
)
if err != nil {
return err
}
return reducer.Reduce(commands[i].Event)
})
}
func uniqueConstraints(ctx context.Context, tx *sql.Tx, commands []*command) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
var stmt database.Statement
for _, cmd := range commands {
if len(cmd.uniqueConstraints) == 0 {
continue
}
for _, constraint := range cmd.uniqueConstraints {
stmt.Reset()
instance := cmd.Aggregate.Instance
if constraint.IsGlobal {
instance = ""
}
switch constraint.Action {
case eventstore.UniqueConstraintAdd:
stmt.WriteString(`INSERT INTO eventstore.unique_constraints (instance_id, unique_type, unique_field) VALUES (`)
stmt.WriteArgs(instance, constraint.UniqueType, constraint.UniqueField)
stmt.WriteRune(')')
case eventstore.UniqueConstraintInstanceRemove:
stmt.WriteString(`DELETE FROM eventstore.unique_constraints WHERE instance_id = `)
stmt.WriteArgs(instance)
case eventstore.UniqueConstraintRemove:
stmt.WriteString(`DELETE FROM eventstore.unique_constraints WHERE `)
stmt.WriteString(deleteUniqueConstraintClause)
stmt.AppendArgs(
instance,
constraint.UniqueType,
constraint.UniqueField,
)
}
_, err := tx.ExecContext(ctx, stmt.String(), stmt.Args()...)
if err != nil {
logging.WithFields("action", constraint.Action).Warn("handling of unique constraint failed")
errMessage := constraint.ErrorMessage
if errMessage == "" {
errMessage = "Errors.Internal"
}
return zerrors.ThrowAlreadyExists(err, "POSTG-QzjyP", errMessage)
}
}
}
return nil
}
// the query is so complex because we accidentally stored unique constraint case sensitive
// the query checks first if there is a case sensitive match and afterwards if there is a case insensitive match
var deleteUniqueConstraintClause = `
(instance_id = $1 AND unique_type = $2 AND unique_field = (
SELECT unique_field from (
SELECT instance_id, unique_type, unique_field
FROM eventstore.unique_constraints
WHERE instance_id = $1 AND unique_type = $2 AND unique_field = $3
UNION ALL
SELECT instance_id, unique_type, unique_field
FROM eventstore.unique_constraints
WHERE instance_id = $1 AND unique_type = $2 AND unique_field = LOWER($3)
) AS case_insensitive_constraints LIMIT 1)
)`

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,289 @@
package postgres
import (
"context"
"database/sql"
"slices"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
"github.com/zitadel/zitadel/internal/v2/database"
"github.com/zitadel/zitadel/internal/v2/eventstore"
)
func (s *Storage) Query(ctx context.Context, query *eventstore.Query) (eventCount int, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
var stmt database.Statement
writeQuery(&stmt, query)
if query.Tx() != nil {
return executeQuery(ctx, query.Tx(), &stmt, query)
}
return executeQuery(ctx, s.client.DB, &stmt, query)
}
func executeQuery(ctx context.Context, tx database.Querier, stmt *database.Statement, reducer eventstore.Reducer) (eventCount int, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
//nolint:rowserrcheck
// rows is checked by database.MapRowsToObject
rows, err := tx.QueryContext(ctx, stmt.String(), stmt.Args()...)
if err != nil {
return 0, err
}
err = database.MapRowsToObject(rows, func(scan func(dest ...any) error) error {
e := new(eventstore.Event[eventstore.StoragePayload])
var payload sql.Null[[]byte]
err := scan(
&e.CreatedAt,
&e.Type,
&e.Sequence,
&e.Position.Position,
&e.Position.InPositionOrder,
&payload,
&e.Creator,
&e.Aggregate.Owner,
&e.Aggregate.Instance,
&e.Aggregate.Type,
&e.Aggregate.ID,
&e.Revision,
)
if err != nil {
return err
}
e.Payload = unmarshalPayload(payload.V)
eventCount++
return reducer.Reduce(e)
})
return eventCount, err
}
var (
selectColumns = `SELECT created_at, event_type, "sequence", "position", in_tx_order, payload, creator, "owner", instance_id, aggregate_type, aggregate_id, revision`
// TODO: condition must know if it's args are named parameters or not
// instancePlaceholder = database.Placeholder("@instance_id")
)
func writeQuery(stmt *database.Statement, query *eventstore.Query) {
stmt.WriteString(selectColumns)
// stmt.SetNamedArg(instancePlaceholder, query.Instance())
stmt.WriteString(" FROM (")
writeFilters(stmt, query.Filters())
stmt.WriteRune(')')
writePagination(stmt, query.Pagination())
}
var from = " FROM eventstore.events2"
func writeFilters(stmt *database.Statement, filters []*eventstore.Filter) {
if len(filters) == 0 {
logging.Fatal("query does not contain filters")
}
for i, filter := range filters {
if i > 0 {
stmt.WriteString(" UNION ALL ")
}
stmt.WriteRune('(')
stmt.WriteString(selectColumns)
stmt.WriteString(from)
writeFilter(stmt, filter)
stmt.WriteString(")")
}
}
func writeFilter(stmt *database.Statement, filter *eventstore.Filter) {
stmt.WriteString(" WHERE ")
filter.Parent().Instance().Write(stmt, "instance_id")
writeAggregateFilters(stmt, filter.AggregateFilters())
writePagination(stmt, filter.Pagination())
}
func writePagination(stmt *database.Statement, pagination *eventstore.Pagination) {
writePosition(stmt, pagination.Position())
writeOrdering(stmt, pagination.Desc())
if pagination.Pagination() != nil {
pagination.Pagination().Write(stmt)
}
}
func writePosition(stmt *database.Statement, position *eventstore.PositionCondition) {
if position == nil {
return
}
max := position.Max()
min := position.Min()
stmt.WriteString(" AND ")
if max != nil {
if max.InPositionOrder > 0 {
stmt.WriteString("((")
database.NewNumberEquals(max.Position).Write(stmt, "position")
stmt.WriteString(" AND ")
database.NewNumberLess(max.InPositionOrder).Write(stmt, "in_tx_order")
stmt.WriteRune(')')
stmt.WriteString(" OR ")
}
database.NewNumberLess(max.Position).Write(stmt, "position")
if max.InPositionOrder > 0 {
stmt.WriteRune(')')
}
}
if max != nil && min != nil {
stmt.WriteString(" AND ")
}
if min != nil {
if min.InPositionOrder > 0 {
stmt.WriteString("((")
database.NewNumberEquals(min.Position).Write(stmt, "position")
stmt.WriteString(" AND ")
database.NewNumberGreater(min.InPositionOrder).Write(stmt, "in_tx_order")
stmt.WriteRune(')')
stmt.WriteString(" OR ")
}
database.NewNumberGreater(min.Position).Write(stmt, "position")
if min.InPositionOrder > 0 {
stmt.WriteRune(')')
}
}
}
func writeAggregateFilters(stmt *database.Statement, filters []*eventstore.AggregateFilter) {
if len(filters) == 0 {
return
}
stmt.WriteString(" AND ")
if len(filters) > 1 {
stmt.WriteRune('(')
}
for i, filter := range filters {
if i > 0 {
stmt.WriteString(" OR ")
}
writeAggregateFilter(stmt, filter)
}
if len(filters) > 1 {
stmt.WriteRune(')')
}
}
func writeAggregateFilter(stmt *database.Statement, filter *eventstore.AggregateFilter) {
conditions := definedConditions([]*condition{
{column: "aggregate_type", condition: filter.Type()},
{column: "aggregate_id", condition: filter.IDs()},
})
if len(conditions) > 1 || len(filter.Events()) > 0 {
stmt.WriteRune('(')
}
writeConditions(
stmt,
conditions,
" AND ",
)
writeEventFilters(stmt, filter.Events())
if len(conditions) > 1 || len(filter.Events()) > 0 {
stmt.WriteRune(')')
}
}
func writeEventFilters(stmt *database.Statement, filters []*eventstore.EventFilter) {
if len(filters) == 0 {
return
}
stmt.WriteString(" AND ")
if len(filters) > 1 {
stmt.WriteRune('(')
}
for i, filter := range filters {
if i > 0 {
stmt.WriteString(" OR ")
}
writeEventFilter(stmt, filter)
}
if len(filters) > 1 {
stmt.WriteRune(')')
}
}
func writeEventFilter(stmt *database.Statement, filter *eventstore.EventFilter) {
conditions := definedConditions([]*condition{
{column: "event_type", condition: filter.Types()},
{column: "created_at", condition: filter.CreatedAt()},
{column: "sequence", condition: filter.Sequence()},
{column: "revision", condition: filter.Revision()},
{column: "creator", condition: filter.Creators()},
})
if len(conditions) > 1 {
stmt.WriteRune('(')
}
writeConditions(
stmt,
conditions,
" AND ",
)
if len(conditions) > 1 {
stmt.WriteRune(')')
}
}
type condition struct {
column string
condition database.Condition
}
func writeConditions(stmt *database.Statement, conditions []*condition, sep string) {
var i int
for _, cond := range conditions {
if i > 0 {
stmt.WriteString(sep)
}
cond.condition.Write(stmt, cond.column)
i++
}
}
func definedConditions(conditions []*condition) []*condition {
return slices.DeleteFunc(conditions, func(cond *condition) bool {
return cond.condition == nil
})
}
func writeOrdering(stmt *database.Statement, descending bool) {
stmt.WriteString(" ORDER BY position")
if descending {
stmt.WriteString(" DESC")
}
stmt.WriteString(", in_tx_order")
if descending {
stmt.WriteString(" DESC")
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,28 @@
package postgres
import (
"context"
"github.com/zitadel/zitadel/internal/database"
"github.com/zitadel/zitadel/internal/v2/eventstore"
)
var (
_ eventstore.Pusher = (*Storage)(nil)
_ eventstore.Querier = (*Storage)(nil)
)
type Storage struct {
client *database.DB
}
func New(client *database.DB) *Storage {
return &Storage{
client: client,
}
}
// Health implements eventstore.Pusher.
func (s *Storage) Health(ctx context.Context) error {
return s.client.PingContext(ctx)
}

View File

@ -0,0 +1,190 @@
package eventstore
import (
"context"
"database/sql"
)
type Pusher interface {
healthier
// Push writes the intents to the storage
// if an intent implements [PushReducerIntent] [PushReducerIntent.Reduce] is called after
// the intent was stored
Push(ctx context.Context, intent *PushIntent) error
}
func NewPushIntent(instance string, opts ...PushOpt) *PushIntent {
intent := &PushIntent{
instance: instance,
}
for _, opt := range opts {
opt(intent)
}
return intent
}
type PushIntent struct {
instance string
reducer Reducer
tx *sql.Tx
aggregates []*PushAggregate
}
func (pi *PushIntent) Instance() string {
return pi.instance
}
func (pi *PushIntent) Reduce(events ...*Event[StoragePayload]) error {
if pi.reducer == nil {
return nil
}
return pi.reducer.Reduce(events...)
}
func (pi *PushIntent) Tx() *sql.Tx {
return pi.tx
}
func (pi *PushIntent) Aggregates() []*PushAggregate {
return pi.aggregates
}
type PushOpt func(pi *PushIntent)
func PushReducer(reducer Reducer) PushOpt {
return func(pi *PushIntent) {
pi.reducer = reducer
}
}
func PushTx(tx *sql.Tx) PushOpt {
return func(pi *PushIntent) {
pi.tx = tx
}
}
func AppendAggregate(owner, typ, id string, opts ...PushAggregateOpt) PushOpt {
return AppendAggregates(NewPushAggregate(owner, typ, id, opts...))
}
func AppendAggregates(aggregates ...*PushAggregate) PushOpt {
return func(pi *PushIntent) {
for _, aggregate := range aggregates {
aggregate.parent = pi
}
pi.aggregates = append(pi.aggregates, aggregates...)
}
}
type PushAggregate struct {
parent *PushIntent
// typ of the aggregate
typ string
// id of the aggregate
id string
// owner of the aggregate
owner string
// Commands is an ordered list of changes on the aggregate
commands []Command
// CurrentSequence checks the current state of the aggregate.
// The following types match the current sequence of the aggregate as described:
// * nil or [SequenceIgnore]: Not relevant to add the commands
// * [SequenceMatches]: Must exactly match
// * [SequenceAtLeast]: Must be >= the given sequence
currentSequence CurrentSequence
}
func NewPushAggregate(owner, typ, id string, opts ...PushAggregateOpt) *PushAggregate {
pa := &PushAggregate{
typ: typ,
id: id,
owner: owner,
}
for _, opt := range opts {
opt(pa)
}
return pa
}
func (pa *PushAggregate) Type() string {
return pa.typ
}
func (pa *PushAggregate) ID() string {
return pa.id
}
func (pa *PushAggregate) Owner() string {
return pa.owner
}
func (pa *PushAggregate) Commands() []Command {
return pa.commands
}
func (pa *PushAggregate) Aggregate() *Aggregate {
return &Aggregate{
ID: pa.id,
Type: pa.typ,
Owner: pa.owner,
Instance: pa.parent.instance,
}
}
func (pa *PushAggregate) CurrentSequence() CurrentSequence {
return pa.currentSequence
}
type PushAggregateOpt func(pa *PushAggregate)
func SetCurrentSequence(currentSequence CurrentSequence) PushAggregateOpt {
return func(pa *PushAggregate) {
pa.currentSequence = currentSequence
}
}
func IgnoreCurrentSequence() PushAggregateOpt {
return func(pa *PushAggregate) {
pa.currentSequence = SequenceIgnore()
}
}
func CurrentSequenceMatches(sequence uint32) PushAggregateOpt {
return func(pa *PushAggregate) {
pa.currentSequence = SequenceMatches(sequence)
}
}
func CurrentSequenceAtLeast(sequence uint32) PushAggregateOpt {
return func(pa *PushAggregate) {
pa.currentSequence = SequenceAtLeast(sequence)
}
}
func AppendCommands(commands ...Command) PushAggregateOpt {
return func(pa *PushAggregate) {
pa.commands = append(pa.commands, commands...)
}
}
type Command interface {
// Creator is the id of the user which created the action
Creator() string
// Type describes the action it's in the past (e.g. user.created)
Type() string
// Revision of the action
Revision() uint16
// Payload returns the payload of the event. It represent the changed fields by the event
// valid types are:
// * nil: no payload
// * struct: which can be marshalled to json
// * pointer to struct: which can be marshalled to json
// * []byte: json marshalled data
Payload() any
// UniqueConstraints should be added for unique attributes of an event, if nil constraints will not be checked
UniqueConstraints() []*UniqueConstraint
}

View File

@ -0,0 +1,756 @@
package eventstore
import (
"context"
"database/sql"
"errors"
"slices"
"time"
"github.com/zitadel/zitadel/internal/v2/database"
)
type Querier interface {
healthier
Query(ctx context.Context, query *Query) (eventCount int, err error)
}
type Query struct {
instances *filter[[]string]
filters []*Filter
tx *sql.Tx
pagination *Pagination
reducer Reducer
// TODO: await push
}
func (q *Query) Instance() database.Condition {
return q.instances.condition
}
func (q *Query) Filters() []*Filter {
return q.filters
}
func (q *Query) Tx() *sql.Tx {
return q.tx
}
func (q *Query) Pagination() *Pagination {
q.ensurePagination()
return q.pagination
}
func (q *Query) Reduce(events ...*Event[StoragePayload]) error {
return q.reducer.Reduce(events...)
}
func NewQuery(instance string, reducer Reducer, opts ...QueryOpt) *Query {
query := &Query{
reducer: reducer,
}
for _, opt := range append([]QueryOpt{SetInstance(instance)}, opts...) {
opt(query)
}
return query
}
type QueryOpt func(q *Query)
func SetInstance(instance string) QueryOpt {
return InstancesEqual(instance)
}
func InstancesEqual(instances ...string) QueryOpt {
return func(q *Query) {
var cond database.Condition
switch len(instances) {
case 0:
return
case 1:
cond = database.NewTextEqual(instances[0])
default:
cond = database.NewListEquals(instances...)
}
q.instances = &filter[[]string]{
condition: cond,
value: &instances,
}
}
}
func InstancesContains(instances ...string) QueryOpt {
return func(f *Query) {
var cond database.Condition
switch len(instances) {
case 0:
return
case 1:
cond = database.NewTextEqual(instances[0])
default:
cond = database.NewListContains(instances...)
}
f.instances = &filter[[]string]{
condition: cond,
value: &instances,
}
}
}
func InstancesNotContains(instances ...string) QueryOpt {
return func(f *Query) {
var cond database.Condition
switch len(instances) {
case 0:
return
case 1:
cond = database.NewTextUnequal(instances[0])
default:
cond = database.NewListNotContains(instances...)
}
f.instances = &filter[[]string]{
condition: cond,
value: &instances,
}
}
}
func SetQueryTx(tx *sql.Tx) QueryOpt {
return func(query *Query) {
query.tx = tx
}
}
func QueryPagination(opts ...paginationOpt) QueryOpt {
return func(query *Query) {
query.ensurePagination()
for _, opt := range opts {
opt(query.pagination)
}
}
}
func (q *Query) ensurePagination() {
if q.pagination != nil {
return
}
q.pagination = new(Pagination)
}
func AppendFilters(filters ...*Filter) QueryOpt {
return func(query *Query) {
for _, filter := range filters {
filter.parent = query
}
query.filters = append(query.filters, filters...)
}
}
func SetFilters(filters ...*Filter) QueryOpt {
return func(query *Query) {
for _, filter := range filters {
filter.parent = query
}
query.filters = filters
}
}
func AppendFilter(opts ...FilterOpt) QueryOpt {
return AppendFilters(NewFilter(opts...))
}
var ErrFilterMerge = errors.New("merge failed")
type FilterCreator func() []*Filter
func MergeFilters(filters ...[]*Filter) []*Filter {
// TODO: improve merge by checking fields of filters and merge filters if possible
// this will reduce cost of queries which do multiple filters
return slices.Concat(filters...)
}
type Filter struct {
parent *Query
pagination *Pagination
aggregateFilters []*AggregateFilter
}
func (f *Filter) Parent() *Query {
return f.parent
}
func (f *Filter) Pagination() *Pagination {
if f.pagination == nil {
return f.parent.Pagination()
}
return f.pagination
}
func (f *Filter) AggregateFilters() []*AggregateFilter {
return f.aggregateFilters
}
func NewFilter(opts ...FilterOpt) *Filter {
f := new(Filter)
for _, opt := range opts {
opt(f)
}
return f
}
type FilterOpt func(f *Filter)
func AppendAggregateFilter(typ string, opts ...AggregateFilterOpt) FilterOpt {
return AppendAggregateFilters(NewAggregateFilter(typ, opts...))
}
func AppendAggregateFilters(filters ...*AggregateFilter) FilterOpt {
return func(mf *Filter) {
mf.aggregateFilters = append(mf.aggregateFilters, filters...)
}
}
func SetAggregateFilters(filters ...*AggregateFilter) FilterOpt {
return func(mf *Filter) {
mf.aggregateFilters = filters
}
}
func FilterPagination(opts ...paginationOpt) FilterOpt {
return func(filter *Filter) {
filter.ensurePagination()
for _, opt := range opts {
opt(filter.pagination)
}
}
}
func (f *Filter) ensurePagination() {
if f.pagination != nil {
return
}
f.pagination = new(Pagination)
}
func NewAggregateFilter(typ string, opts ...AggregateFilterOpt) *AggregateFilter {
filter := &AggregateFilter{
typ: typ,
}
for _, opt := range opts {
opt(filter)
}
return filter
}
type AggregateFilter struct {
typ string
ids []string
events []*EventFilter
}
func (f *AggregateFilter) Type() *database.TextFilter[string] {
return database.NewTextEqual(f.typ)
}
func (f *AggregateFilter) IDs() database.Condition {
if len(f.ids) == 0 {
return nil
}
if len(f.ids) == 1 {
return database.NewTextEqual(f.ids[0])
}
return database.NewListContains(f.ids...)
}
func (f *AggregateFilter) Events() []*EventFilter {
return f.events
}
type AggregateFilterOpt func(f *AggregateFilter)
func SetAggregateID(id string) AggregateFilterOpt {
return func(filter *AggregateFilter) {
filter.ids = []string{id}
}
}
func AppendAggregateIDs(ids ...string) AggregateFilterOpt {
return func(f *AggregateFilter) {
f.ids = append(f.ids, ids...)
}
}
// AggregateIDs sets the given ids as search param
func AggregateIDs(ids ...string) AggregateFilterOpt {
return func(f *AggregateFilter) {
f.ids = ids
}
}
func AppendEvent(opts ...EventFilterOpt) AggregateFilterOpt {
return AppendEvents(NewEventFilter(opts...))
}
func AppendEvents(events ...*EventFilter) AggregateFilterOpt {
return func(filter *AggregateFilter) {
filter.events = append(filter.events, events...)
}
}
func SetEvents(events ...*EventFilter) AggregateFilterOpt {
return func(filter *AggregateFilter) {
filter.events = events
}
}
func NewEventFilter(opts ...EventFilterOpt) *EventFilter {
filter := new(EventFilter)
for _, opt := range opts {
opt(filter)
}
return filter
}
type EventFilter struct {
types []string
revision *filter[uint16]
createdAt *filter[time.Time]
sequence *filter[uint32]
creators *filter[[]string]
}
type filter[T any] struct {
condition database.Condition
// the following fields are considered as one of
// you can either have value and max or value
min, max *T
value *T
}
func (f *EventFilter) Types() database.Condition {
switch len(f.types) {
case 0:
return nil
case 1:
return database.NewTextEqual(f.types[0])
default:
return database.NewListContains(f.types...)
}
}
func (f *EventFilter) Revision() database.Condition {
if f.revision == nil {
return nil
}
return f.revision.condition
}
func (f *EventFilter) CreatedAt() database.Condition {
if f.createdAt == nil {
return nil
}
return f.createdAt.condition
}
func (f *EventFilter) Sequence() database.Condition {
if f.sequence == nil {
return nil
}
return f.sequence.condition
}
func (f *EventFilter) Creators() database.Condition {
if f.creators == nil {
return nil
}
return f.creators.condition
}
type EventFilterOpt func(f *EventFilter)
func SetEventType(typ string) EventFilterOpt {
return func(filter *EventFilter) {
filter.types = []string{typ}
}
}
// SetEventTypes overwrites the currently set types
func SetEventTypes(types ...string) EventFilterOpt {
return func(filter *EventFilter) {
filter.types = types
}
}
// AppendEventTypes appends the types the currently set types
func AppendEventTypes(types ...string) EventFilterOpt {
return func(filter *EventFilter) {
filter.types = append(filter.types, types...)
}
}
func EventRevisionEquals(revision uint16) EventFilterOpt {
return func(f *EventFilter) {
f.revision = &filter[uint16]{
condition: database.NewNumberEquals(revision),
value: &revision,
}
}
}
func EventRevisionAtLeast(revision uint16) EventFilterOpt {
return func(f *EventFilter) {
f.revision = &filter[uint16]{
condition: database.NewNumberAtLeast(revision),
value: &revision,
}
}
}
func EventRevisionGreater(revision uint16) EventFilterOpt {
return func(f *EventFilter) {
f.revision = &filter[uint16]{
condition: database.NewNumberGreater(revision),
value: &revision,
}
}
}
func EventRevisionAtMost(revision uint16) EventFilterOpt {
return func(f *EventFilter) {
f.revision = &filter[uint16]{
condition: database.NewNumberAtMost(revision),
value: &revision,
}
}
}
func EventRevisionLess(revision uint16) EventFilterOpt {
return func(f *EventFilter) {
f.revision = &filter[uint16]{
condition: database.NewNumberLess(revision),
value: &revision,
}
}
}
func EventRevisionBetween(min, max uint16) EventFilterOpt {
return func(f *EventFilter) {
f.revision = &filter[uint16]{
condition: database.NewNumberBetween(min, max),
min: &min,
max: &max,
}
}
}
func EventCreatedAtEquals(createdAt time.Time) EventFilterOpt {
return func(f *EventFilter) {
f.createdAt = &filter[time.Time]{
condition: database.NewNumberEquals(createdAt),
value: &createdAt,
}
}
}
func EventCreatedAtAtLeast(createdAt time.Time) EventFilterOpt {
return func(f *EventFilter) {
f.createdAt = &filter[time.Time]{
condition: database.NewNumberAtLeast(createdAt),
value: &createdAt,
}
}
}
func EventCreatedAtGreater(createdAt time.Time) EventFilterOpt {
return func(f *EventFilter) {
f.createdAt = &filter[time.Time]{
condition: database.NewNumberGreater(createdAt),
value: &createdAt,
}
}
}
func EventCreatedAtAtMost(createdAt time.Time) EventFilterOpt {
return func(f *EventFilter) {
f.createdAt = &filter[time.Time]{
condition: database.NewNumberAtMost(createdAt),
value: &createdAt,
}
}
}
func EventCreatedAtLess(createdAt time.Time) EventFilterOpt {
return func(f *EventFilter) {
f.createdAt = &filter[time.Time]{
condition: database.NewNumberLess(createdAt),
value: &createdAt,
}
}
}
func EventCreatedAtBetween(min, max time.Time) EventFilterOpt {
return func(f *EventFilter) {
f.createdAt = &filter[time.Time]{
condition: database.NewNumberBetween(min, max),
min: &min,
max: &max,
}
}
}
func EventSequenceEquals(sequence uint32) EventFilterOpt {
return func(f *EventFilter) {
f.sequence = &filter[uint32]{
condition: database.NewNumberEquals(sequence),
value: &sequence,
}
}
}
func EventSequenceAtLeast(sequence uint32) EventFilterOpt {
return func(f *EventFilter) {
f.sequence = &filter[uint32]{
condition: database.NewNumberAtLeast(sequence),
value: &sequence,
}
}
}
func EventSequenceGreater(sequence uint32) EventFilterOpt {
return func(f *EventFilter) {
f.sequence = &filter[uint32]{
condition: database.NewNumberGreater(sequence),
value: &sequence,
}
}
}
func EventSequenceAtMost(sequence uint32) EventFilterOpt {
return func(f *EventFilter) {
f.sequence = &filter[uint32]{
condition: database.NewNumberAtMost(sequence),
value: &sequence,
}
}
}
func EventSequenceLess(sequence uint32) EventFilterOpt {
return func(f *EventFilter) {
f.sequence = &filter[uint32]{
condition: database.NewNumberLess(sequence),
value: &sequence,
}
}
}
func EventSequenceBetween(min, max uint32) EventFilterOpt {
return func(f *EventFilter) {
f.sequence = &filter[uint32]{
condition: database.NewNumberBetween(min, max),
min: &min,
max: &max,
}
}
}
func EventCreatorsEqual(creators ...string) EventFilterOpt {
return func(f *EventFilter) {
var cond database.Condition
switch len(creators) {
case 0:
return
case 1:
cond = database.NewTextEqual(creators[0])
default:
cond = database.NewListEquals(creators...)
}
f.creators = &filter[[]string]{
condition: cond,
value: &creators,
}
}
}
func EventCreatorsContains(creators ...string) EventFilterOpt {
return func(f *EventFilter) {
var cond database.Condition
switch len(creators) {
case 0:
return
case 1:
cond = database.NewTextEqual(creators[0])
default:
cond = database.NewListContains(creators...)
}
f.creators = &filter[[]string]{
condition: cond,
value: &creators,
}
}
}
func EventCreatorsNotContains(creators ...string) EventFilterOpt {
return func(f *EventFilter) {
var cond database.Condition
switch len(creators) {
case 0:
return
case 1:
cond = database.NewTextUnequal(creators[0])
default:
cond = database.NewListNotContains(creators...)
}
f.creators = &filter[[]string]{
condition: cond,
value: &creators,
}
}
}
func Limit(limit uint32) paginationOpt {
return func(p *Pagination) {
p.ensurePagination()
p.pagination.Limit = limit
}
}
func Offset(offset uint32) paginationOpt {
return func(p *Pagination) {
p.ensurePagination()
p.pagination.Offset = offset
}
}
type PositionCondition struct {
min, max *GlobalPosition
}
func (pc *PositionCondition) Max() *GlobalPosition {
if pc == nil || pc.max == nil {
return nil
}
max := *pc.max
return &max
}
func (pc *PositionCondition) Min() *GlobalPosition {
if pc == nil || pc.min == nil {
return nil
}
min := *pc.min
return &min
}
// PositionGreater prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position >
func PositionGreater(position float64, inPositionOrder uint32) paginationOpt {
return func(p *Pagination) {
p.ensurePosition()
p.position.min = &GlobalPosition{
Position: position,
InPositionOrder: inPositionOrder,
}
}
}
// GlobalPositionGreater prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position >
func GlobalPositionGreater(position *GlobalPosition) paginationOpt {
return PositionGreater(position.Position, position.InPositionOrder)
}
// PositionLess prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position >
func PositionLess(position float64, inPositionOrder uint32) paginationOpt {
return func(p *Pagination) {
p.ensurePosition()
p.position.max = &GlobalPosition{
Position: position,
InPositionOrder: inPositionOrder,
}
}
}
func PositionBetween(min, max *GlobalPosition) paginationOpt {
return func(p *Pagination) {
GlobalPositionGreater(min)(p)
GlobalPositionLess(max)(p)
}
}
// GlobalPositionLess prepares the condition as follows
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
// if inPositionOrder is NOT set: position >
func GlobalPositionLess(position *GlobalPosition) paginationOpt {
return PositionLess(position.Position, position.InPositionOrder)
}
type Pagination struct {
pagination *database.Pagination
position *PositionCondition
desc bool
}
type paginationOpt func(*Pagination)
func (p *Pagination) Pagination() *database.Pagination {
if p == nil {
return nil
}
return p.pagination
}
func (p *Pagination) Position() *PositionCondition {
if p == nil {
return nil
}
return p.position
}
func (p *Pagination) Desc() bool {
if p == nil {
return false
}
return p.desc
}
func (p *Pagination) ensurePagination() {
if p.pagination != nil {
return
}
p.pagination = new(database.Pagination)
}
func (p *Pagination) ensurePosition() {
if p.position != nil {
return
}
p.position = new(PositionCondition)
}
func Descending() paginationOpt {
return func(p *Pagination) {
p.desc = true
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,80 @@
package eventstore
type UniqueConstraint struct {
// UniqueType is the table name for the unique constraint
UniqueType string
// UniqueField is the unique key
UniqueField string
// Action defines if unique constraint should be added or removed
Action UniqueConstraintAction
// ErrorMessage defines the translation file key for the error message
ErrorMessage string
// IsGlobal defines if the unique constraint is globally unique or just within a single instance
IsGlobal bool
}
type UniqueConstraintAction int8
const (
UniqueConstraintAdd UniqueConstraintAction = iota
UniqueConstraintRemove
UniqueConstraintInstanceRemove
uniqueConstraintActionCount
)
func (f UniqueConstraintAction) Valid() bool {
return f >= 0 && f < uniqueConstraintActionCount
}
func NewAddEventUniqueConstraint(
uniqueType,
uniqueField,
errMessage string) *UniqueConstraint {
return &UniqueConstraint{
UniqueType: uniqueType,
UniqueField: uniqueField,
ErrorMessage: errMessage,
Action: UniqueConstraintAdd,
}
}
func NewRemoveUniqueConstraint(
uniqueType,
uniqueField string) *UniqueConstraint {
return &UniqueConstraint{
UniqueType: uniqueType,
UniqueField: uniqueField,
Action: UniqueConstraintRemove,
}
}
func NewRemoveInstanceUniqueConstraints() *UniqueConstraint {
return &UniqueConstraint{
Action: UniqueConstraintInstanceRemove,
}
}
func NewAddGlobalUniqueConstraint(
uniqueType,
uniqueField,
errMessage string) *UniqueConstraint {
return &UniqueConstraint{
UniqueType: uniqueType,
UniqueField: uniqueField,
ErrorMessage: errMessage,
IsGlobal: true,
Action: UniqueConstraintAdd,
}
}
func NewRemoveGlobalUniqueConstraint(
uniqueType,
uniqueField string) *UniqueConstraint {
return &UniqueConstraint{
UniqueType: uniqueType,
UniqueField: uniqueField,
IsGlobal: true,
Action: UniqueConstraintRemove,
}
}