mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:17:32 +00:00
start implementing new eventstore on iam
This commit is contained in:
@@ -4,6 +4,8 @@ import (
|
||||
"github.com/caos/zitadel/internal/cache/config"
|
||||
"github.com/caos/zitadel/internal/eventstore/internal/repository/sql"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
es_v2 "github.com/caos/zitadel/internal/eventstore/v2"
|
||||
sql_v2 "github.com/caos/zitadel/internal/eventstore/v2/repository/sql"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
@@ -13,7 +15,7 @@ type Config struct {
|
||||
}
|
||||
|
||||
func Start(conf Config) (Eventstore, error) {
|
||||
repo, err := sql.Start(conf.Repository)
|
||||
repo, sqlClient, err := sql.Start(conf.Repository)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -21,5 +23,6 @@ func Start(conf Config) (Eventstore, error) {
|
||||
return &eventstore{
|
||||
repo: repo,
|
||||
aggregateCreator: models.NewAggregateCreator(conf.ServiceName),
|
||||
esV2: es_v2.NewEventstore(sql_v2.NewCRDB(sqlClient)),
|
||||
}, nil
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore/internal/repository"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
es_v2 "github.com/caos/zitadel/internal/eventstore/v2"
|
||||
)
|
||||
|
||||
type Eventstore interface {
|
||||
@@ -14,6 +15,7 @@ type Eventstore interface {
|
||||
PushAggregates(ctx context.Context, aggregates ...*models.Aggregate) error
|
||||
FilterEvents(ctx context.Context, searchQuery *models.SearchQuery) (events []*models.Event, err error)
|
||||
LatestSequence(ctx context.Context, searchQuery *models.SearchQueryFactory) (uint64, error)
|
||||
V2() *es_v2.Eventstore
|
||||
}
|
||||
|
||||
var _ Eventstore = (*eventstore)(nil)
|
||||
@@ -21,6 +23,8 @@ var _ Eventstore = (*eventstore)(nil)
|
||||
type eventstore struct {
|
||||
repo repository.Repository
|
||||
aggregateCreator *models.AggregateCreator
|
||||
|
||||
esV2 *es_v2.Eventstore
|
||||
}
|
||||
|
||||
func (es *eventstore) AggregateCreator() *models.AggregateCreator {
|
||||
@@ -62,3 +66,7 @@ func (es *eventstore) LatestSequence(ctx context.Context, queryFactory *models.S
|
||||
func (es *eventstore) Health(ctx context.Context) error {
|
||||
return es.repo.Health(ctx)
|
||||
}
|
||||
|
||||
func (es *eventstore) V2() *es_v2.Eventstore {
|
||||
return es.esV2
|
||||
}
|
||||
|
@@ -1,6 +1,8 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
_ "github.com/lib/pq"
|
||||
|
||||
"github.com/caos/zitadel/internal/config/types"
|
||||
@@ -11,12 +13,12 @@ type Config struct {
|
||||
SQL types.SQL
|
||||
}
|
||||
|
||||
func Start(conf Config) (*SQL, error) {
|
||||
func Start(conf Config) (*SQL, *sql.DB, error) {
|
||||
client, err := conf.SQL.Start()
|
||||
if err != nil {
|
||||
return nil, errors.ThrowPreconditionFailed(err, "SQL-9qBtr", "unable to open database connection")
|
||||
return nil, nil, errors.ThrowPreconditionFailed(err, "SQL-9qBtr", "unable to open database connection")
|
||||
}
|
||||
return &SQL{
|
||||
client: client,
|
||||
}, nil
|
||||
}, client, nil
|
||||
}
|
||||
|
@@ -124,7 +124,9 @@ func (es *Eventstore) mapEvents(events []*repository.Event) (mappedEvents []Even
|
||||
for i, event := range events {
|
||||
interceptors, ok := es.eventInterceptors[EventType(event.Type)]
|
||||
if !ok || interceptors.eventMapper == nil {
|
||||
return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined")
|
||||
mappedEvents[i] = BaseEventFromRepo(event)
|
||||
continue
|
||||
// return nil, errors.ThrowPreconditionFailed(nil, "V2-usujB", "event mapper not defined")
|
||||
}
|
||||
mappedEvents[i], err = interceptors.eventMapper(event)
|
||||
if err != nil {
|
||||
|
@@ -17,6 +17,7 @@ type ReadModel struct {
|
||||
CreationDate time.Time `json:"-"`
|
||||
ChangeDate time.Time `json:"-"`
|
||||
Events []EventReader `json:"-"`
|
||||
ResourceOwner string `json:"-"`
|
||||
}
|
||||
|
||||
//AppendEvents adds all the events to the read model.
|
||||
@@ -36,6 +37,9 @@ func (rm *ReadModel) Reduce() error {
|
||||
if rm.AggregateID == "" {
|
||||
rm.AggregateID = rm.Events[0].AggregateID()
|
||||
}
|
||||
if rm.ResourceOwner == "" {
|
||||
rm.ResourceOwner = rm.Events[0].ResourceOwner()
|
||||
}
|
||||
|
||||
if rm.CreationDate.IsZero() {
|
||||
rm.CreationDate = rm.Events[0].CreationDate()
|
||||
|
Reference in New Issue
Block a user