policies implemented

This commit is contained in:
adlerhurst
2020-11-06 22:09:19 +01:00
parent f7f810caa5
commit 57fc3ddd16
22 changed files with 667 additions and 87 deletions

View File

@@ -0,0 +1,34 @@
package eventstore
func NewAggregate(id string) *Aggregate {
return &Aggregate{
ID: id,
Events: []EventPusher{},
}
}
type Aggregate struct {
PreviousSequence uint64 `json:"-"`
ID string `json:"-"`
Events []EventPusher `json:"-"`
}
//AppendEvents adds all the events to the aggregate.
// The function doesn't compute the new state of the aggregate
func (a *Aggregate) AppendEvents(events ...EventPusher) *Aggregate {
a.Events = append(a.Events, events...)
return a
}
//Reduce must be the last step in the reduce function of the extension
func (a *Aggregate) Reduce() error {
if len(a.Events) == 0 {
return nil
}
a.PreviousSequence = a.Events[len(a.Events)-1].Sequence()
// all events processed and not needed anymore
a.Events = nil
a.Events = []Event{}
return nil
}

View File

@@ -39,9 +39,3 @@ type EventReader interface {
PreviousSequence() uint64
CreationDate() time.Time
}
//Event is the representation of a state change
type Event interface {
EventPusher
EventReader
}

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/api/service"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
@@ -77,10 +78,11 @@ func BaseEventFromRepo(event *repository.Event) *BaseEvent {
}
}
func NewBaseEventForPush(ctx context.Context, service string, typ EventType) *BaseEvent {
func NewBaseEventForPush(ctx context.Context, typ EventType) *BaseEvent {
svcName := service.FromContext(ctx)
return &BaseEvent{
User: authz.GetCtxData(ctx).UserID,
Service: service,
Service: svcName,
EventType: typ,
}
}

View File

@@ -7,6 +7,7 @@ import (
"sync"
"testing"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/v2/repository"
)
@@ -56,7 +57,7 @@ func newTestEvent(description string, data func() interface{}, checkPrevious boo
data: data,
shouldCheckPrevious: checkPrevious,
BaseEvent: *NewBaseEventForPush(
"editorUser",
authz.NewMockContext("resourceOwner", "editorUser"),
"editorService",
"test.event",
),
@@ -87,7 +88,7 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
mapper func(*repository.Event) (EventReader, error)
}
type res struct {
event Event
event EventReader
mapperCount int
}

View File

@@ -18,9 +18,8 @@ import (
// ------------------------------------------------------------
type UserAggregate struct {
eventstore.BaseEvent
eventstore.Aggregate
FirstName string
}
@@ -54,7 +53,7 @@ func NewUserAggregate(id string) *UserAggregate {
}
}
func (rm *UserAggregate) AppendEvents(events ...eventstore.Event) *UserAggregate {
func (rm *UserAggregate) AppendEvents(events ...eventstore.EventReader) *UserAggregate {
rm.Aggregate.AppendEvents(events...)
return rm
}

View File

@@ -2,9 +2,9 @@ package eventstore
import "time"
func NewReadModel(id string) *ReadModel {
func NewReadModel( /*id string*/ ) *ReadModel {
return &ReadModel{
ID: id,
// ID: id,
Events: []EventReader{},
}
}
@@ -12,11 +12,11 @@ func NewReadModel(id string) *ReadModel {
//ReadModel is the minimum representation of a View model.
// it might be saved in a database or in memory
type ReadModel struct {
ProcessedSequence uint64 `json:"-"`
ID string `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []EventReader `json:"-"`
ProcessedSequence uint64 `json:"-"`
// ID string `json:"-"`
CreationDate time.Time `json:"-"`
ChangeDate time.Time `json:"-"`
Events []EventReader `json:"-"`
}
//AppendEvents adds all the events to the read model.
@@ -42,36 +42,3 @@ func (rm *ReadModel) Reduce() error {
rm.Events = []EventReader{}
return nil
}
func NewAggregate(id string) *Aggregate {
return &Aggregate{
ID: id,
Events: []Event{},
}
}
type Aggregate struct {
PreviousSequence uint64 `json:"-"`
ID string `json:"-"`
Events []Event `json:"-"`
}
//AppendEvents adds all the events to the aggregate.
// The function doesn't compute the new state of the aggregate
func (a *Aggregate) AppendEvents(events ...Event) *Aggregate {
a.Events = append(a.Events, events...)
return a
}
//Reduce must be the last step in the reduce function of the extension
func (a *Aggregate) Reduce() error {
if len(a.Events) == 0 {
return nil
}
a.PreviousSequence = a.Events[len(a.Events)-1].Sequence()
// all events processed and not needed anymore
a.Events = nil
a.Events = []Event{}
return nil
}