mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-13 19:44:21 +00:00
12be21a3ff
creates events structures for initial projections and read models
757 lines
16 KiB
Go
757 lines
16 KiB
Go
package eventstore
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/zitadel/zitadel/internal/v2/database"
|
|
)
|
|
|
|
type Querier interface {
|
|
healthier
|
|
Query(ctx context.Context, query *Query) (eventCount int, err error)
|
|
}
|
|
|
|
type Query struct {
|
|
instances *filter[[]string]
|
|
filters []*Filter
|
|
tx *sql.Tx
|
|
pagination *Pagination
|
|
reducer Reducer
|
|
// TODO: await push
|
|
}
|
|
|
|
func (q *Query) Instance() database.Condition {
|
|
return q.instances.condition
|
|
}
|
|
|
|
func (q *Query) Filters() []*Filter {
|
|
return q.filters
|
|
}
|
|
|
|
func (q *Query) Tx() *sql.Tx {
|
|
return q.tx
|
|
}
|
|
|
|
func (q *Query) Pagination() *Pagination {
|
|
q.ensurePagination()
|
|
return q.pagination
|
|
}
|
|
|
|
func (q *Query) Reduce(events ...*StorageEvent) error {
|
|
return q.reducer.Reduce(events...)
|
|
}
|
|
|
|
func NewQuery(instance string, reducer Reducer, opts ...QueryOpt) *Query {
|
|
query := &Query{
|
|
reducer: reducer,
|
|
}
|
|
|
|
for _, opt := range append([]QueryOpt{SetInstance(instance)}, opts...) {
|
|
opt(query)
|
|
}
|
|
|
|
return query
|
|
}
|
|
|
|
type QueryOpt func(q *Query)
|
|
|
|
func SetInstance(instance string) QueryOpt {
|
|
return InstancesEqual(instance)
|
|
}
|
|
|
|
func InstancesEqual(instances ...string) QueryOpt {
|
|
return func(q *Query) {
|
|
var cond database.Condition
|
|
switch len(instances) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
cond = database.NewTextEqual(instances[0])
|
|
default:
|
|
cond = database.NewListEquals(instances...)
|
|
}
|
|
q.instances = &filter[[]string]{
|
|
condition: cond,
|
|
value: &instances,
|
|
}
|
|
}
|
|
}
|
|
|
|
func InstancesContains(instances ...string) QueryOpt {
|
|
return func(f *Query) {
|
|
var cond database.Condition
|
|
switch len(instances) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
cond = database.NewTextEqual(instances[0])
|
|
default:
|
|
cond = database.NewListContains(instances...)
|
|
}
|
|
|
|
f.instances = &filter[[]string]{
|
|
condition: cond,
|
|
value: &instances,
|
|
}
|
|
}
|
|
}
|
|
|
|
func InstancesNotContains(instances ...string) QueryOpt {
|
|
return func(f *Query) {
|
|
var cond database.Condition
|
|
switch len(instances) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
cond = database.NewTextUnequal(instances[0])
|
|
default:
|
|
cond = database.NewListNotContains(instances...)
|
|
}
|
|
f.instances = &filter[[]string]{
|
|
condition: cond,
|
|
value: &instances,
|
|
}
|
|
}
|
|
}
|
|
|
|
func SetQueryTx(tx *sql.Tx) QueryOpt {
|
|
return func(query *Query) {
|
|
query.tx = tx
|
|
}
|
|
}
|
|
|
|
func QueryPagination(opts ...paginationOpt) QueryOpt {
|
|
return func(query *Query) {
|
|
query.ensurePagination()
|
|
|
|
for _, opt := range opts {
|
|
opt(query.pagination)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Query) ensurePagination() {
|
|
if q.pagination != nil {
|
|
return
|
|
}
|
|
q.pagination = new(Pagination)
|
|
}
|
|
|
|
func AppendFilters(filters ...*Filter) QueryOpt {
|
|
return func(query *Query) {
|
|
for _, filter := range filters {
|
|
filter.parent = query
|
|
}
|
|
query.filters = append(query.filters, filters...)
|
|
}
|
|
}
|
|
|
|
func SetFilters(filters ...*Filter) QueryOpt {
|
|
return func(query *Query) {
|
|
for _, filter := range filters {
|
|
filter.parent = query
|
|
}
|
|
query.filters = filters
|
|
}
|
|
}
|
|
|
|
func AppendFilter(opts ...FilterOpt) QueryOpt {
|
|
return AppendFilters(NewFilter(opts...))
|
|
}
|
|
|
|
var ErrFilterMerge = errors.New("merge failed")
|
|
|
|
type FilterCreator func() []*Filter
|
|
|
|
func MergeFilters(filters ...[]*Filter) []*Filter {
|
|
// TODO: improve merge by checking fields of filters and merge filters if possible
|
|
// this will reduce cost of queries which do multiple filters
|
|
return slices.Concat(filters...)
|
|
}
|
|
|
|
type Filter struct {
|
|
parent *Query
|
|
pagination *Pagination
|
|
|
|
aggregateFilters []*AggregateFilter
|
|
}
|
|
|
|
func (f *Filter) Parent() *Query {
|
|
return f.parent
|
|
}
|
|
|
|
func (f *Filter) Pagination() *Pagination {
|
|
if f.pagination == nil {
|
|
return f.parent.Pagination()
|
|
}
|
|
return f.pagination
|
|
}
|
|
|
|
func (f *Filter) AggregateFilters() []*AggregateFilter {
|
|
return f.aggregateFilters
|
|
}
|
|
|
|
func NewFilter(opts ...FilterOpt) *Filter {
|
|
f := new(Filter)
|
|
|
|
for _, opt := range opts {
|
|
opt(f)
|
|
}
|
|
|
|
return f
|
|
}
|
|
|
|
type FilterOpt func(f *Filter)
|
|
|
|
func AppendAggregateFilter(typ string, opts ...AggregateFilterOpt) FilterOpt {
|
|
return AppendAggregateFilters(NewAggregateFilter(typ, opts...))
|
|
}
|
|
|
|
func AppendAggregateFilters(filters ...*AggregateFilter) FilterOpt {
|
|
return func(mf *Filter) {
|
|
mf.aggregateFilters = append(mf.aggregateFilters, filters...)
|
|
}
|
|
}
|
|
|
|
func SetAggregateFilters(filters ...*AggregateFilter) FilterOpt {
|
|
return func(mf *Filter) {
|
|
mf.aggregateFilters = filters
|
|
}
|
|
}
|
|
|
|
func FilterPagination(opts ...paginationOpt) FilterOpt {
|
|
return func(filter *Filter) {
|
|
filter.ensurePagination()
|
|
|
|
for _, opt := range opts {
|
|
opt(filter.pagination)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *Filter) ensurePagination() {
|
|
if f.pagination != nil {
|
|
return
|
|
}
|
|
f.pagination = new(Pagination)
|
|
}
|
|
|
|
func NewAggregateFilter(typ string, opts ...AggregateFilterOpt) *AggregateFilter {
|
|
filter := &AggregateFilter{
|
|
typ: typ,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(filter)
|
|
}
|
|
|
|
return filter
|
|
}
|
|
|
|
type AggregateFilter struct {
|
|
typ string
|
|
ids []string
|
|
events []*EventFilter
|
|
}
|
|
|
|
func (f *AggregateFilter) Type() *database.TextFilter[string] {
|
|
return database.NewTextEqual(f.typ)
|
|
}
|
|
|
|
func (f *AggregateFilter) IDs() database.Condition {
|
|
if len(f.ids) == 0 {
|
|
return nil
|
|
}
|
|
if len(f.ids) == 1 {
|
|
return database.NewTextEqual(f.ids[0])
|
|
}
|
|
|
|
return database.NewListContains(f.ids...)
|
|
}
|
|
|
|
func (f *AggregateFilter) Events() []*EventFilter {
|
|
return f.events
|
|
}
|
|
|
|
type AggregateFilterOpt func(f *AggregateFilter)
|
|
|
|
func SetAggregateID(id string) AggregateFilterOpt {
|
|
return func(filter *AggregateFilter) {
|
|
filter.ids = []string{id}
|
|
}
|
|
}
|
|
|
|
func AppendAggregateIDs(ids ...string) AggregateFilterOpt {
|
|
return func(f *AggregateFilter) {
|
|
f.ids = append(f.ids, ids...)
|
|
}
|
|
}
|
|
|
|
// AggregateIDs sets the given ids as search param
|
|
func AggregateIDs(ids ...string) AggregateFilterOpt {
|
|
return func(f *AggregateFilter) {
|
|
f.ids = ids
|
|
}
|
|
}
|
|
|
|
func AppendEvent(opts ...EventFilterOpt) AggregateFilterOpt {
|
|
return AppendEvents(NewEventFilter(opts...))
|
|
}
|
|
|
|
func AppendEvents(events ...*EventFilter) AggregateFilterOpt {
|
|
return func(filter *AggregateFilter) {
|
|
filter.events = append(filter.events, events...)
|
|
}
|
|
}
|
|
|
|
func SetEvents(events ...*EventFilter) AggregateFilterOpt {
|
|
return func(filter *AggregateFilter) {
|
|
filter.events = events
|
|
}
|
|
}
|
|
|
|
func NewEventFilter(opts ...EventFilterOpt) *EventFilter {
|
|
filter := new(EventFilter)
|
|
|
|
for _, opt := range opts {
|
|
opt(filter)
|
|
}
|
|
|
|
return filter
|
|
}
|
|
|
|
type EventFilter struct {
|
|
types []string
|
|
revision *filter[uint16]
|
|
createdAt *filter[time.Time]
|
|
sequence *filter[uint32]
|
|
creators *filter[[]string]
|
|
}
|
|
|
|
type filter[T any] struct {
|
|
condition database.Condition
|
|
// the following fields are considered as one of
|
|
// you can either have value and max or value
|
|
min, max *T
|
|
value *T
|
|
}
|
|
|
|
func (f *EventFilter) Types() database.Condition {
|
|
switch len(f.types) {
|
|
case 0:
|
|
return nil
|
|
case 1:
|
|
return database.NewTextEqual(f.types[0])
|
|
default:
|
|
return database.NewListContains(f.types...)
|
|
}
|
|
}
|
|
|
|
func (f *EventFilter) Revision() database.Condition {
|
|
if f.revision == nil {
|
|
return nil
|
|
}
|
|
return f.revision.condition
|
|
}
|
|
|
|
func (f *EventFilter) CreatedAt() database.Condition {
|
|
if f.createdAt == nil {
|
|
return nil
|
|
}
|
|
return f.createdAt.condition
|
|
}
|
|
|
|
func (f *EventFilter) Sequence() database.Condition {
|
|
if f.sequence == nil {
|
|
return nil
|
|
}
|
|
return f.sequence.condition
|
|
}
|
|
|
|
func (f *EventFilter) Creators() database.Condition {
|
|
if f.creators == nil {
|
|
return nil
|
|
}
|
|
return f.creators.condition
|
|
}
|
|
|
|
type EventFilterOpt func(f *EventFilter)
|
|
|
|
func SetEventType(typ string) EventFilterOpt {
|
|
return func(filter *EventFilter) {
|
|
filter.types = []string{typ}
|
|
}
|
|
}
|
|
|
|
// SetEventTypes overwrites the currently set types
|
|
func SetEventTypes(types ...string) EventFilterOpt {
|
|
return func(filter *EventFilter) {
|
|
filter.types = types
|
|
}
|
|
}
|
|
|
|
// AppendEventTypes appends the types the currently set types
|
|
func AppendEventTypes(types ...string) EventFilterOpt {
|
|
return func(filter *EventFilter) {
|
|
filter.types = append(filter.types, types...)
|
|
}
|
|
}
|
|
|
|
func EventRevisionEquals(revision uint16) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.revision = &filter[uint16]{
|
|
condition: database.NewNumberEquals(revision),
|
|
value: &revision,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventRevisionAtLeast(revision uint16) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.revision = &filter[uint16]{
|
|
condition: database.NewNumberAtLeast(revision),
|
|
value: &revision,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventRevisionGreater(revision uint16) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.revision = &filter[uint16]{
|
|
condition: database.NewNumberGreater(revision),
|
|
value: &revision,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventRevisionAtMost(revision uint16) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.revision = &filter[uint16]{
|
|
condition: database.NewNumberAtMost(revision),
|
|
value: &revision,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventRevisionLess(revision uint16) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.revision = &filter[uint16]{
|
|
condition: database.NewNumberLess(revision),
|
|
value: &revision,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventRevisionBetween(min, max uint16) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.revision = &filter[uint16]{
|
|
condition: database.NewNumberBetween(min, max),
|
|
min: &min,
|
|
max: &max,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatedAtEquals(createdAt time.Time) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.createdAt = &filter[time.Time]{
|
|
condition: database.NewNumberEquals(createdAt),
|
|
value: &createdAt,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatedAtAtLeast(createdAt time.Time) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.createdAt = &filter[time.Time]{
|
|
condition: database.NewNumberAtLeast(createdAt),
|
|
value: &createdAt,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatedAtGreater(createdAt time.Time) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.createdAt = &filter[time.Time]{
|
|
condition: database.NewNumberGreater(createdAt),
|
|
value: &createdAt,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatedAtAtMost(createdAt time.Time) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.createdAt = &filter[time.Time]{
|
|
condition: database.NewNumberAtMost(createdAt),
|
|
value: &createdAt,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatedAtLess(createdAt time.Time) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.createdAt = &filter[time.Time]{
|
|
condition: database.NewNumberLess(createdAt),
|
|
value: &createdAt,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatedAtBetween(min, max time.Time) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.createdAt = &filter[time.Time]{
|
|
condition: database.NewNumberBetween(min, max),
|
|
min: &min,
|
|
max: &max,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventSequenceEquals(sequence uint32) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.sequence = &filter[uint32]{
|
|
condition: database.NewNumberEquals(sequence),
|
|
value: &sequence,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventSequenceAtLeast(sequence uint32) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.sequence = &filter[uint32]{
|
|
condition: database.NewNumberAtLeast(sequence),
|
|
value: &sequence,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventSequenceGreater(sequence uint32) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.sequence = &filter[uint32]{
|
|
condition: database.NewNumberGreater(sequence),
|
|
value: &sequence,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventSequenceAtMost(sequence uint32) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.sequence = &filter[uint32]{
|
|
condition: database.NewNumberAtMost(sequence),
|
|
value: &sequence,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventSequenceLess(sequence uint32) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.sequence = &filter[uint32]{
|
|
condition: database.NewNumberLess(sequence),
|
|
value: &sequence,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventSequenceBetween(min, max uint32) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
f.sequence = &filter[uint32]{
|
|
condition: database.NewNumberBetween(min, max),
|
|
min: &min,
|
|
max: &max,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatorsEqual(creators ...string) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
var cond database.Condition
|
|
switch len(creators) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
cond = database.NewTextEqual(creators[0])
|
|
default:
|
|
cond = database.NewListEquals(creators...)
|
|
}
|
|
f.creators = &filter[[]string]{
|
|
condition: cond,
|
|
value: &creators,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatorsContains(creators ...string) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
var cond database.Condition
|
|
switch len(creators) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
cond = database.NewTextEqual(creators[0])
|
|
default:
|
|
cond = database.NewListContains(creators...)
|
|
}
|
|
|
|
f.creators = &filter[[]string]{
|
|
condition: cond,
|
|
value: &creators,
|
|
}
|
|
}
|
|
}
|
|
|
|
func EventCreatorsNotContains(creators ...string) EventFilterOpt {
|
|
return func(f *EventFilter) {
|
|
var cond database.Condition
|
|
switch len(creators) {
|
|
case 0:
|
|
return
|
|
case 1:
|
|
cond = database.NewTextUnequal(creators[0])
|
|
default:
|
|
cond = database.NewListNotContains(creators...)
|
|
}
|
|
f.creators = &filter[[]string]{
|
|
condition: cond,
|
|
value: &creators,
|
|
}
|
|
}
|
|
}
|
|
|
|
func Limit(limit uint32) paginationOpt {
|
|
return func(p *Pagination) {
|
|
p.ensurePagination()
|
|
|
|
p.pagination.Limit = limit
|
|
}
|
|
}
|
|
|
|
func Offset(offset uint32) paginationOpt {
|
|
return func(p *Pagination) {
|
|
p.ensurePagination()
|
|
|
|
p.pagination.Offset = offset
|
|
}
|
|
}
|
|
|
|
type PositionCondition struct {
|
|
min, max *GlobalPosition
|
|
}
|
|
|
|
func (pc *PositionCondition) Max() *GlobalPosition {
|
|
if pc == nil || pc.max == nil {
|
|
return nil
|
|
}
|
|
max := *pc.max
|
|
return &max
|
|
}
|
|
|
|
func (pc *PositionCondition) Min() *GlobalPosition {
|
|
if pc == nil || pc.min == nil {
|
|
return nil
|
|
}
|
|
min := *pc.min
|
|
return &min
|
|
}
|
|
|
|
// PositionGreater prepares the condition as follows
|
|
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
|
|
// if inPositionOrder is NOT set: position >
|
|
func PositionGreater(position float64, inPositionOrder uint32) paginationOpt {
|
|
return func(p *Pagination) {
|
|
p.ensurePosition()
|
|
p.position.min = &GlobalPosition{
|
|
Position: position,
|
|
InPositionOrder: inPositionOrder,
|
|
}
|
|
}
|
|
}
|
|
|
|
// GlobalPositionGreater prepares the condition as follows
|
|
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
|
|
// if inPositionOrder is NOT set: position >
|
|
func GlobalPositionGreater(position *GlobalPosition) paginationOpt {
|
|
return PositionGreater(position.Position, position.InPositionOrder)
|
|
}
|
|
|
|
// PositionLess prepares the condition as follows
|
|
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
|
|
// if inPositionOrder is NOT set: position >
|
|
func PositionLess(position float64, inPositionOrder uint32) paginationOpt {
|
|
return func(p *Pagination) {
|
|
p.ensurePosition()
|
|
p.position.max = &GlobalPosition{
|
|
Position: position,
|
|
InPositionOrder: inPositionOrder,
|
|
}
|
|
}
|
|
}
|
|
|
|
func PositionBetween(min, max *GlobalPosition) paginationOpt {
|
|
return func(p *Pagination) {
|
|
GlobalPositionGreater(min)(p)
|
|
GlobalPositionLess(max)(p)
|
|
}
|
|
}
|
|
|
|
// GlobalPositionLess prepares the condition as follows
|
|
// if inPositionOrder is set: position = AND in_tx_order > OR or position >
|
|
// if inPositionOrder is NOT set: position >
|
|
func GlobalPositionLess(position *GlobalPosition) paginationOpt {
|
|
return PositionLess(position.Position, position.InPositionOrder)
|
|
}
|
|
|
|
type Pagination struct {
|
|
pagination *database.Pagination
|
|
position *PositionCondition
|
|
|
|
desc bool
|
|
}
|
|
|
|
type paginationOpt func(*Pagination)
|
|
|
|
func (p *Pagination) Pagination() *database.Pagination {
|
|
if p == nil {
|
|
return nil
|
|
}
|
|
return p.pagination
|
|
}
|
|
|
|
func (p *Pagination) Position() *PositionCondition {
|
|
if p == nil {
|
|
return nil
|
|
}
|
|
return p.position
|
|
}
|
|
|
|
func (p *Pagination) Desc() bool {
|
|
if p == nil {
|
|
return false
|
|
}
|
|
|
|
return p.desc
|
|
}
|
|
|
|
func (p *Pagination) ensurePagination() {
|
|
if p.pagination != nil {
|
|
return
|
|
}
|
|
p.pagination = new(database.Pagination)
|
|
}
|
|
|
|
func (p *Pagination) ensurePosition() {
|
|
if p.position != nil {
|
|
return
|
|
}
|
|
p.position = new(PositionCondition)
|
|
}
|
|
|
|
func Descending() paginationOpt {
|
|
return func(p *Pagination) {
|
|
p.desc = true
|
|
}
|
|
}
|