mirror of
https://github.com/zitadel/zitadel.git
synced 2025-05-08 00:36:47 +00:00
event data mapping in eventstore v2
This commit is contained in:
parent
64a0859d76
commit
53b02b7f5e
@ -2,6 +2,8 @@ package eventstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/caos/zitadel/internal/errors"
|
"github.com/caos/zitadel/internal/errors"
|
||||||
@ -87,8 +89,10 @@ func (es *Eventstore) aggregatesToEvents(aggregates []aggregater) ([]*repository
|
|||||||
for _, aggregate := range aggregates {
|
for _, aggregate := range aggregates {
|
||||||
var previousEvent *repository.Event
|
var previousEvent *repository.Event
|
||||||
for _, event := range aggregate.Events() {
|
for _, event := range aggregate.Events() {
|
||||||
//TODO: map event.Data() into json
|
data, err := eventData(event)
|
||||||
var data []byte
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
events = append(events, &repository.Event{
|
events = append(events, &repository.Event{
|
||||||
AggregateID: aggregate.ID(),
|
AggregateID: aggregate.ID(),
|
||||||
AggregateType: repository.AggregateType(aggregate.Type()),
|
AggregateType: repository.AggregateType(aggregate.Type()),
|
||||||
@ -168,8 +172,12 @@ func (es *Eventstore) FilterToReadModel(ctx context.Context, searchQuery *Search
|
|||||||
return readModel.Reduce()
|
return readModel.Reduce()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (es *Eventstore) LatestSequence(ctx context.Context, searchQuery *SearchQueryFactory) (uint64, error) {
|
func (es *Eventstore) LatestSequence(ctx context.Context, queryFactory *SearchQueryFactory) (uint64, error) {
|
||||||
return 0, nil
|
query, err := queryFactory.Build()
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return es.repo.LatestSequence(ctx, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
//RegisterPushEventMapper registers a function for mapping an eventstore event to an event
|
//RegisterPushEventMapper registers a function for mapping an eventstore event to an event
|
||||||
@ -203,3 +211,27 @@ func (es *Eventstore) RegisterPushEventMapper(eventType EventType, mapper func(E
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func eventData(event Event) ([]byte, error) {
|
||||||
|
switch data := event.Data().(type) {
|
||||||
|
case nil:
|
||||||
|
return nil, nil
|
||||||
|
case []byte:
|
||||||
|
if json.Valid(data) {
|
||||||
|
return data, nil
|
||||||
|
}
|
||||||
|
return nil, errors.ThrowInvalidArgument(nil, "V2-6SbbS", "data bytes are not json")
|
||||||
|
}
|
||||||
|
dataType := reflect.TypeOf(event.Data())
|
||||||
|
if dataType.Kind() == reflect.Ptr {
|
||||||
|
dataType = dataType.Elem()
|
||||||
|
}
|
||||||
|
if dataType.Kind() == reflect.Struct {
|
||||||
|
dataBytes, err := json.Marshal(event.Data())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.ThrowInvalidArgument(err, "V2-xG87M", "could not marhsal data")
|
||||||
|
}
|
||||||
|
return dataBytes, nil
|
||||||
|
}
|
||||||
|
return nil, errors.ThrowInvalidArgument(nil, "V2-91NRm", "wrong type of event data")
|
||||||
|
}
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
type testEvent struct {
|
type testEvent struct {
|
||||||
description string
|
description string
|
||||||
shouldCheckPrevious bool
|
shouldCheckPrevious bool
|
||||||
|
data func() interface{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *testEvent) CheckPrevious() bool {
|
func (e *testEvent) CheckPrevious() bool {
|
||||||
@ -28,7 +29,7 @@ func (e *testEvent) Type() EventType {
|
|||||||
return "test.event"
|
return "test.event"
|
||||||
}
|
}
|
||||||
func (e *testEvent) Data() interface{} {
|
func (e *testEvent) Data() interface{} {
|
||||||
return nil
|
return e.data()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *testEvent) PreviousSequence() uint64 {
|
func (e *testEvent) PreviousSequence() uint64 {
|
||||||
@ -282,3 +283,134 @@ func Test_eventstore_RegisterFilterEventMapper(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_eventData(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
type res struct {
|
||||||
|
jsonText []byte
|
||||||
|
wantErr bool
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
res res
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "data as json bytes",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
return []byte(`{"piff":"paff"}`)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(`{"piff":"paff"}`),
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data as invalid json bytes",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
return []byte(`{"piffpaff"}`)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(nil),
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data as struct",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
return struct {
|
||||||
|
Piff string `json:"piff"`
|
||||||
|
}{Piff: "paff"}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(`{"piff":"paff"}`),
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "data as ptr to struct",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
return &struct {
|
||||||
|
Piff string `json:"piff"`
|
||||||
|
}{Piff: "paff"}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(`{"piff":"paff"}`),
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "no data",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(nil),
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid because primitive",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
return ""
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(nil),
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "invalid because pointer to primitive",
|
||||||
|
args: args{
|
||||||
|
event: &testEvent{
|
||||||
|
data: func() interface{} {
|
||||||
|
var s string
|
||||||
|
return &s
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
res: res{
|
||||||
|
jsonText: []byte(nil),
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := eventData(tt.args.event)
|
||||||
|
if (err != nil) != tt.res.wantErr {
|
||||||
|
t.Errorf("eventData() error = %v, wantErr %v", err, tt.res.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.res.jsonText) {
|
||||||
|
t.Errorf("eventData() = %v, want %v", string(got), string(tt.res.jsonText))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user