refactor(v2): init events (#7823)

creates events structures for initial projections and read models
This commit is contained in:
Silvan
2024-05-23 06:36:08 +02:00
committed by GitHub
parent f37113194d
commit 12be21a3ff
56 changed files with 1952 additions and 316 deletions

View File

@@ -2,8 +2,8 @@ package postgres
import (
"encoding/json"
"github.com/zitadel/logging"
"reflect"
"time"
"github.com/zitadel/zitadel/internal/v2/eventstore"
"github.com/zitadel/zitadel/internal/zerrors"
@@ -13,52 +13,52 @@ func intentToCommands(intent *intent) (commands []*command, err error) {
commands = make([]*command, len(intent.Commands()))
for i, cmd := range intent.Commands() {
var payload unmarshalPayload
if cmd.Payload() != nil {
payload, err = json.Marshal(cmd.Payload())
if err != nil {
logging.WithError(err).Warn("marshal payload failed")
return nil, zerrors.ThrowInternal(err, "POSTG-MInPK", "Errors.Internal")
}
payload, err := marshalPayload(cmd.Payload)
if err != nil {
return nil, zerrors.ThrowInternal(err, "POSTG-MInPK", "Errors.Internal")
}
commands[i] = &command{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *intent.Aggregate(),
Creator: cmd.Creator(),
Revision: cmd.Revision(),
Type: cmd.Type(),
// always add at least 1 to the currently stored sequence
Sequence: intent.sequence + uint32(i) + 1,
Payload: payload,
},
intent: intent,
uniqueConstraints: cmd.UniqueConstraints(),
Command: cmd,
intent: intent,
sequence: intent.nextSequence(),
payload: payload,
}
}
return commands, nil
}
func marshalPayload(payload any) ([]byte, error) {
if reflect.ValueOf(payload).IsZero() {
return nil, nil
}
return json.Marshal(payload)
}
type command struct {
*eventstore.Event[eventstore.StoragePayload]
eventstore.Command
intent *intent
uniqueConstraints []*eventstore.UniqueConstraint
intent *intent
payload []byte
position eventstore.GlobalPosition
createdAt time.Time
sequence uint32
}
var _ eventstore.StoragePayload = (unmarshalPayload)(nil)
type unmarshalPayload []byte
// Unmarshal implements eventstore.StoragePayload.
func (p unmarshalPayload) Unmarshal(ptr any) error {
if len(p) == 0 {
return nil
func (cmd *command) toEvent() *eventstore.StorageEvent {
return &eventstore.StorageEvent{
Action: eventstore.Action[eventstore.Unmarshal]{
Creator: cmd.Creator,
Type: cmd.Type,
Revision: cmd.Revision,
Payload: func(ptr any) error {
return json.Unmarshal(cmd.payload, ptr)
},
},
Aggregate: *cmd.intent.Aggregate(),
Sequence: cmd.intent.sequence,
Position: cmd.position,
CreatedAt: cmd.createdAt,
}
if err := json.Unmarshal(p, ptr); err != nil {
return zerrors.ThrowInternal(err, "POSTG-u8qVo", "Errors.Internal")
}
return nil
}

View File

@@ -12,6 +12,11 @@ type intent struct {
sequence uint32
}
func (i *intent) nextSequence() uint32 {
i.sequence++
return i.sequence
}
func makeIntents(pushIntent *eventstore.PushIntent) []*intent {
res := make([]*intent, len(pushIntent.Aggregates()))

View File

@@ -140,19 +140,19 @@ func push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands
stmt.WriteString(", ")
}
cmd.Position.InPositionOrder = uint32(i)
cmd.position.InPositionOrder = uint32(i)
stmt.WriteString(`(`)
stmt.WriteArgs(
cmd.Aggregate.Instance,
cmd.Aggregate.Owner,
cmd.Aggregate.Type,
cmd.Aggregate.ID,
cmd.intent.Aggregate().Instance,
cmd.intent.Aggregate().Owner,
cmd.intent.Aggregate().Type,
cmd.intent.Aggregate().ID,
cmd.Revision,
cmd.Creator,
cmd.Type,
cmd.Payload,
cmd.Sequence,
i,
cmd.payload,
cmd.sequence,
cmd.position.InPositionOrder,
)
stmt.WriteString(", statement_timestamp(), EXTRACT(EPOCH FROM clock_timestamp())")
stmt.WriteString(`)`)
@@ -171,13 +171,13 @@ func push(ctx context.Context, tx *sql.Tx, reducer eventstore.Reducer, commands
defer func() { i++ }()
err := scan(
&commands[i].CreatedAt,
&commands[i].Position.Position,
&commands[i].createdAt,
&commands[i].position.Position,
)
if err != nil {
return err
}
return reducer.Reduce(commands[i].Event)
return reducer.Reduce(commands[i].toEvent())
})
}
@@ -188,12 +188,13 @@ func uniqueConstraints(ctx context.Context, tx *sql.Tx, commands []*command) (er
var stmt database.Statement
for _, cmd := range commands {
if len(cmd.uniqueConstraints) == 0 {
if len(cmd.UniqueConstraints) == 0 {
continue
}
for _, constraint := range cmd.uniqueConstraints {
for _, constraint := range cmd.UniqueConstraints {
stmt.Reset()
instance := cmd.Aggregate.Instance
instance := cmd.intent.PushAggregate.Aggregate().Instance
if constraint.IsGlobal {
instance = ""
}

View File

@@ -47,13 +47,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
},
},
},
},
@@ -72,13 +75,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddGlobalUniqueConstraint("test", "id", "error"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddGlobalUniqueConstraint("test", "id", "error"),
},
},
},
},
@@ -97,14 +103,17 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
eventstore.NewAddEventUniqueConstraint("test", "id2", "error"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
eventstore.NewAddEventUniqueConstraint("test", "id2", "error"),
},
},
},
},
@@ -128,23 +137,29 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "error"),
},
},
},
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id2", "error"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id2", "error"),
},
},
},
},
@@ -168,13 +183,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
},
},
},
},
@@ -193,23 +211,29 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
},
},
},
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveInstanceUniqueConstraints(),
},
},
},
},
@@ -233,13 +257,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
},
},
},
},
@@ -258,13 +285,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveGlobalUniqueConstraint("test", "id"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveGlobalUniqueConstraint("test", "id"),
},
},
},
},
@@ -283,14 +313,17 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
eventstore.NewRemoveUniqueConstraint("test", "id2"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
eventstore.NewRemoveUniqueConstraint("test", "id2"),
},
},
},
},
@@ -314,23 +347,29 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id"),
},
},
},
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id2"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewRemoveUniqueConstraint("test", "id2"),
},
},
},
},
@@ -354,13 +393,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", ""),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", ""),
},
},
},
},
@@ -385,13 +427,16 @@ func Test_uniqueConstraints(t *testing.T) {
args: args{
commands: []*command{
{
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
Instance: "instance",
},
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("", "", ""),
).Aggregates()[0],
},
uniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "My.Error"),
Command: eventstore.Command{
UniqueConstraints: []*eventstore.UniqueConstraint{
eventstore.NewAddEventUniqueConstraint("test", "id", "My.Error"),
},
},
},
},
@@ -741,16 +786,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
},
},
expectations: []mock.Expectation{
@@ -764,9 +807,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
nil,
mock.NilArg,
uint32(1),
0,
uint32(0),
),
mock.WithQueryResult(
[]string{"created_at", "position"},
@@ -798,16 +841,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
},
{
intent: &intent{
@@ -816,16 +857,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type2",
Sequence: 2,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type2",
},
},
sequence: 2,
},
},
expectations: []mock.Expectation{
@@ -839,9 +878,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
nil,
mock.NilArg,
uint32(1),
0,
uint32(0),
"instance",
"owner",
"testType",
@@ -849,9 +888,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type2",
nil,
mock.NilArg,
uint32(2),
1,
uint32(1),
),
mock.WithQueryResult(
[]string{"created_at", "position"},
@@ -887,36 +926,30 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
},
{
intent: &intent{
PushAggregate: eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
eventstore.AppendAggregate("owner", "type2", "id2"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: eventstore.Aggregate{
ID: "id2",
Type: "type2",
Instance: "instance",
Owner: "owner",
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type2",
},
Creator: "gigi",
Revision: 1,
Type: "test.type2",
Sequence: 10,
},
sequence: 10,
},
},
expectations: []mock.Expectation{
@@ -930,9 +963,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
nil,
mock.NilArg,
uint32(1),
0,
uint32(0),
"instance",
"owner",
"type2",
@@ -940,9 +973,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type2",
nil,
mock.NilArg,
uint32(10),
1,
uint32(1),
),
mock.WithQueryResult(
[]string{"created_at", "position"},
@@ -978,17 +1011,15 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Payload: unmarshalPayload(`{"name": "gigi"}`),
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
payload: []byte(`{"name": "gigi"}`),
},
},
expectations: []mock.Expectation{
@@ -1002,9 +1033,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
unmarshalPayload(`{"name": "gigi"}`),
[]byte(`{"name": "gigi"}`),
uint32(1),
0,
uint32(0),
),
mock.WithQueryResult(
[]string{"created_at", "position"},
@@ -1036,16 +1067,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
},
},
expectations: []mock.Expectation{
@@ -1059,9 +1088,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
nil,
mock.NilArg,
uint32(1),
0,
uint32(0),
),
mock.WithQueryResult(
[]string{"created_at", "position"},
@@ -1094,16 +1123,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
},
{
intent: &intent{
@@ -1112,16 +1139,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type2",
Sequence: 2,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type2",
},
},
sequence: 2,
},
},
expectations: []mock.Expectation{
@@ -1135,9 +1160,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
nil,
mock.NilArg,
uint32(1),
0,
uint32(0),
"instance",
"owner",
"testType",
@@ -1145,9 +1170,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type2",
nil,
mock.NilArg,
uint32(2),
1,
uint32(1),
),
mock.WithQueryResult(
[]string{"created_at", "position"},
@@ -1189,16 +1214,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type",
Sequence: 1,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type",
},
},
sequence: 1,
},
{
intent: &intent{
@@ -1207,16 +1230,14 @@ func Test_push(t *testing.T) {
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0],
},
Event: &eventstore.Event[eventstore.StoragePayload]{
Aggregate: *eventstore.NewPushIntent(
"instance",
eventstore.AppendAggregate("owner", "testType", "testID"),
).Aggregates()[0].Aggregate(),
Creator: "gigi",
Revision: 1,
Type: "test.type2",
Sequence: 2,
Command: eventstore.Command{
Action: eventstore.Action[any]{
Creator: "gigi",
Revision: 1,
Type: "test.type2",
},
},
sequence: 2,
},
},
expectations: []mock.Expectation{
@@ -1230,9 +1251,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type",
nil,
mock.NilArg,
uint32(1),
0,
uint32(0),
"instance",
"owner",
"testType",
@@ -1240,9 +1261,9 @@ func Test_push(t *testing.T) {
uint16(1),
"gigi",
"test.type2",
nil,
mock.NilArg,
uint32(2),
1,
uint32(1),
),
mock.WithQueryResult(
[]string{"created_at", "position"},

View File

@@ -3,6 +3,7 @@ package postgres
import (
"context"
"database/sql"
"encoding/json"
"slices"
"github.com/zitadel/logging"
@@ -38,7 +39,7 @@ func executeQuery(ctx context.Context, tx database.Querier, stmt *database.State
}
err = database.MapRowsToObject(rows, func(scan func(dest ...any) error) error {
e := new(eventstore.Event[eventstore.StoragePayload])
e := new(eventstore.StorageEvent)
var payload sql.Null[[]byte]
@@ -59,7 +60,12 @@ func executeQuery(ctx context.Context, tx database.Querier, stmt *database.State
if err != nil {
return err
}
e.Payload = unmarshalPayload(payload.V)
e.Payload = func(ptr any) error {
if len(payload.V) == 0 {
return nil
}
return json.Unmarshal(payload.V, ptr)
}
eventCount++
return reducer.Reduce(e)

View File

@@ -1143,7 +1143,7 @@ type testReducer struct {
}
// Reduce implements eventstore.Reducer.
func (r *testReducer) Reduce(events ...*eventstore.Event[eventstore.StoragePayload]) error {
func (r *testReducer) Reduce(events ...*eventstore.StorageEvent) error {
if r == nil {
return nil
}