fix: commandside queries (#1313)

* fix: move user by id to query side

* fix: move get passwordless to query side

# Conflicts:
#	internal/user/repository/eventsourcing/eventstore.go

* fix: move get passwordless to query side

* remove user eventstore

* remove unused models

* org changes

* org changes

* fix: move org queries to query side

* fix: remove org eventstore

* fix: remove org eventstore

* fix: remove org eventstore

* remove project from es v1

* project cleanup

* project cleanup

* fix: remove org eventstore

* fix: remove iam eventstore

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi
2021-02-22 14:08:47 +01:00
committed by GitHub
parent 2ba56595b1
commit 428ef4acdb
106 changed files with 2301 additions and 2799 deletions

View File

@@ -5,18 +5,16 @@ import (
"github.com/caos/zitadel/internal/v2/query"
"github.com/caos/zitadel/internal/iam/model"
iam_event "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
)
type IamRepo struct {
IAMID string
IAMEvents *iam_event.IAMEventstore
IAMID string
IAMV2Query *query.QuerySide
}
func (repo *IamRepo) Health(ctx context.Context) error {
return repo.IAMEvents.Health(ctx)
return nil
}
func (repo *IamRepo) IamByID(ctx context.Context) (*model.IAM, error) {

View File

@@ -2,28 +2,33 @@ package eventstore
import (
"context"
es_sdk "github.com/caos/zitadel/internal/eventstore/sdk"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/eventsourcing/model"
iam_view "github.com/caos/zitadel/internal/iam/repository/view"
"github.com/caos/zitadel/internal/v2/domain"
"k8s.io/apimachinery/pkg/api/errors"
"strings"
"time"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/models"
usr_view "github.com/caos/zitadel/internal/user/repository/view"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
"github.com/caos/zitadel/internal/crypto"
caos_errs "github.com/caos/zitadel/internal/errors"
iam_event "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
"github.com/caos/zitadel/internal/telemetry/tracing"
usr_model "github.com/caos/zitadel/internal/user/model"
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
"github.com/caos/zitadel/internal/user/repository/view/model"
)
type TokenVerifierRepo struct {
TokenVerificationKey [32]byte
IAMID string
IAMEvents *iam_event.IAMEventstore
ProjectEvents *proj_event.ProjectEventstore
UserEvents *usr_event.UserEventstore
Eventstore eventstore.Eventstore
View *view.View
}
@@ -38,7 +43,7 @@ func (repo *TokenVerifierRepo) TokenByID(ctx context.Context, tokenID, userID st
token.UserID = userID
}
events, esErr := repo.UserEvents.UserEventsByID(ctx, userID, token.Sequence)
events, esErr := repo.getUserEvents(ctx, userID, token.Sequence)
if caos_errs.IsNotFound(viewErr) && len(events) == 0 {
return nil, caos_errs.ThrowNotFound(nil, "EVENT-4T90g", "Errors.Token.NotFound")
}
@@ -110,7 +115,7 @@ func (repo *TokenVerifierRepo) VerifierClientID(ctx context.Context, appName str
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
iam, err := repo.IAMEvents.IAMByID(ctx, repo.IAMID)
iam, err := repo.getIAMByID(ctx)
if err != nil {
return "", err
}
@@ -120,3 +125,28 @@ func (repo *TokenVerifierRepo) VerifierClientID(ctx context.Context, appName str
}
return app.OIDCClientID, nil
}
func (r *TokenVerifierRepo) getUserEvents(ctx context.Context, userID string, sequence uint64) ([]*models.Event, error) {
query, err := usr_view.UserByIDQuery(userID, sequence)
if err != nil {
return nil, err
}
return r.Eventstore.FilterEvents(ctx, query)
}
func (u *TokenVerifierRepo) getIAMByID(ctx context.Context) (*iam_model.IAM, error) {
query, err := iam_view.IAMByIDQuery(domain.IAMID, 0)
if err != nil {
return nil, err
}
iam := &iam_es_model.IAM{
ObjectRoot: models.ObjectRoot{
AggregateID: domain.IAMID,
},
}
err = es_sdk.Filter(ctx, u.Eventstore.FilterEvents, iam.AppendEvents, query)
if err != nil && errors.IsNotFound(err) && iam.Sequence == 0 {
return nil, err
}
return iam_es_model.IAMToModel(iam), nil
}

View File

@@ -2,11 +2,17 @@ package eventstore
import (
"context"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/models"
es_sdk "github.com/caos/zitadel/internal/eventstore/sdk"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/eventsourcing/model"
iam_view "github.com/caos/zitadel/internal/iam/repository/view"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
caos_errs "github.com/caos/zitadel/internal/errors"
iam_event "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
global_model "github.com/caos/zitadel/internal/model"
user_model "github.com/caos/zitadel/internal/user/model"
user_view_model "github.com/caos/zitadel/internal/user/repository/view/model"
@@ -19,7 +25,7 @@ type UserGrantRepo struct {
IamID string
IamProjectID string
Auth authz.Config
IamEvents *iam_event.IAMEventstore
Eventstore eventstore.Eventstore
}
func (repo *UserGrantRepo) Health() error {
@@ -94,7 +100,7 @@ func (repo *UserGrantRepo) FillIamProjectID(ctx context.Context) error {
if repo.IamProjectID != "" {
return nil
}
iam, err := repo.IamEvents.IAMByID(ctx, repo.IamID)
iam, err := repo.getIAMByID(ctx)
if err != nil {
return err
}
@@ -118,6 +124,23 @@ func (repo *UserGrantRepo) mapRoleToPermission(permissions *grant_model.Permissi
return permissions
}
func (u *UserGrantRepo) getIAMByID(ctx context.Context) (*iam_model.IAM, error) {
query, err := iam_view.IAMByIDQuery(domain.IAMID, 0)
if err != nil {
return nil, err
}
iam := &iam_es_model.IAM{
ObjectRoot: models.ObjectRoot{
AggregateID: domain.IAMID,
},
}
err = es_sdk.Filter(ctx, u.Eventstore.FilterEvents, iam.AppendEvents, query)
if err != nil && errors.IsNotFound(err) && iam.Sequence == 0 {
return nil, err
}
return iam_es_model.IAMToModel(iam), nil
}
func userMembershipToMembership(membership *user_view_model.UserMembershipView) *authz.Membership {
return &authz.Membership{
MemberType: authz.MemberType(membership.MemberType),

View File

@@ -7,8 +7,8 @@ import (
"github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/query"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/project/repository/eventsourcing"
es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
"github.com/caos/zitadel/internal/project/repository/view"
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
)
@@ -61,7 +61,7 @@ func (a *Application) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.ProjectQuery(sequence.CurrentSequence), nil
return view.ProjectQuery(sequence.CurrentSequence), nil
}
func (a *Application) Reduce(event *models.Event) (err error) {

View File

@@ -1,7 +1,6 @@
package handler
import (
"github.com/caos/zitadel/internal/iam/repository/eventsourcing"
"time"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
@@ -9,8 +8,6 @@ import (
"github.com/caos/zitadel/internal/config/types"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/query"
org_events "github.com/caos/zitadel/internal/org/repository/eventsourcing"
project_events "github.com/caos/zitadel/internal/project/repository/eventsourcing"
)
type Configs map[string]*Config
@@ -32,22 +29,13 @@ func (h *handler) Eventstore() eventstore.Eventstore {
return h.es
}
type EventstoreRepos struct {
IAMEvents *eventsourcing.IAMEventstore
OrgEvents *org_events.OrgEventstore
ProjectEvents *project_events.ProjectEventstore
}
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es eventstore.Eventstore, repos EventstoreRepos, systemDefaults sd.SystemDefaults) []query.Handler {
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es eventstore.Eventstore, systemDefaults sd.SystemDefaults) []query.Handler {
return []query.Handler{
newUserGrant(
handler{view, bulkLimit, configs.cycleDuration("UserGrants"), errorCount, es},
repos.IAMEvents,
systemDefaults.IamID),
newUserMembership(
handler{view, bulkLimit, configs.cycleDuration("UserMemberships"), errorCount, es},
repos.OrgEvents,
repos.ProjectEvents),
handler{view, bulkLimit, configs.cycleDuration("UserMemberships"), errorCount, es}),
newApplication(
handler{view, bulkLimit, configs.cycleDuration("Application"), errorCount, es}),
newOrg(

View File

@@ -2,12 +2,12 @@ package handler
import (
"github.com/caos/logging"
"github.com/caos/zitadel/internal/org/repository/view"
"github.com/caos/zitadel/internal/eventstore"
es_models "github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/query"
"github.com/caos/zitadel/internal/eventstore/spooler"
"github.com/caos/zitadel/internal/org/repository/eventsourcing"
"github.com/caos/zitadel/internal/org/repository/eventsourcing/model"
org_model "github.com/caos/zitadel/internal/org/repository/view/model"
)
@@ -61,7 +61,7 @@ func (o *Org) EventQuery() (*es_models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.OrgQuery(sequence.CurrentSequence), nil
return view.OrgQuery(sequence.CurrentSequence), nil
}
func (o *Org) Reduce(event *es_models.Event) error {

View File

@@ -2,6 +2,9 @@ package handler
import (
"context"
es_sdk "github.com/caos/zitadel/internal/eventstore/sdk"
iam_model "github.com/caos/zitadel/internal/iam/model"
iam_view "github.com/caos/zitadel/internal/iam/repository/view"
"strings"
"github.com/caos/logging"
@@ -13,7 +16,6 @@ import (
es_models "github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/query"
"github.com/caos/zitadel/internal/eventstore/spooler"
iam_events "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/eventsourcing/model"
org_es_model "github.com/caos/zitadel/internal/org/repository/eventsourcing/model"
proj_es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
@@ -27,7 +29,6 @@ const (
type UserGrant struct {
handler
iamEvents *iam_events.IAMEventstore
iamID string
iamProjectID string
subscription *eventstore.Subscription
@@ -35,13 +36,11 @@ type UserGrant struct {
func newUserGrant(
handler handler,
iamEvents *iam_events.IAMEventstore,
iamID string,
) *UserGrant {
h := &UserGrant{
handler: handler,
iamEvents: iamEvents,
iamID: iamID,
handler: handler,
iamID: iamID,
}
h.subscribe()
@@ -258,7 +257,7 @@ func (u *UserGrant) setIamProjectID() error {
if u.iamProjectID != "" {
return nil
}
iam, err := u.iamEvents.IAMByID(context.Background(), u.iamID)
iam, err := u.getIAMByID(context.Background())
if err != nil {
return err
}
@@ -277,3 +276,20 @@ func (u *UserGrant) OnError(event *models.Event, err error) error {
func (u *UserGrant) OnSuccess() error {
return spooler.HandleSuccess(u.view.UpdateUserGrantSpoolerRunTimestamp)
}
func (u *UserGrant) getIAMByID(ctx context.Context) (*iam_model.IAM, error) {
query, err := iam_view.IAMByIDQuery(domain.IAMID, 0)
if err != nil {
return nil, err
}
iam := &iam_es_model.IAM{
ObjectRoot: models.ObjectRoot{
AggregateID: domain.IAMID,
},
}
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, iam.AppendEvents, query)
if err != nil && errors.IsNotFound(err) && iam.Sequence == 0 {
return nil, err
}
return iam_es_model.IAMToModel(iam), nil
}

View File

@@ -4,17 +4,21 @@ import (
"context"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/models"
es_models "github.com/caos/zitadel/internal/eventstore/models"
"github.com/caos/zitadel/internal/eventstore/query"
es_sdk "github.com/caos/zitadel/internal/eventstore/sdk"
"github.com/caos/zitadel/internal/eventstore/spooler"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/eventsourcing/model"
org_model "github.com/caos/zitadel/internal/org/model"
org_event "github.com/caos/zitadel/internal/org/repository/eventsourcing"
org_es_model "github.com/caos/zitadel/internal/org/repository/eventsourcing/model"
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
org_view "github.com/caos/zitadel/internal/org/repository/view"
proj_model "github.com/caos/zitadel/internal/project/model"
proj_es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
proj_view "github.com/caos/zitadel/internal/project/repository/view"
usr_model "github.com/caos/zitadel/internal/user/model"
"github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
usr_es_model "github.com/caos/zitadel/internal/user/repository/view/model"
@@ -26,20 +30,14 @@ const (
type UserMembership struct {
handler
orgEvents *org_event.OrgEventstore
projectEvents *proj_event.ProjectEventstore
subscription *eventstore.Subscription
subscription *eventstore.Subscription
}
func newUserMembership(
handler handler,
orgEvents *org_event.OrgEventstore,
projectEvents *proj_event.ProjectEventstore,
) *UserMembership {
h := &UserMembership{
handler: handler,
orgEvents: orgEvents,
projectEvents: projectEvents,
handler: handler,
}
h.subscribe()
@@ -156,7 +154,7 @@ func (m *UserMembership) processOrg(event *models.Event) (err error) {
}
func (m *UserMembership) fillOrgName(member *usr_es_model.UserMembershipView) (err error) {
org, err := m.orgEvents.OrgByID(context.Background(), org_model.NewOrg(member.ResourceOwner))
org, err := m.getOrgByID(context.Background(), member.ResourceOwner)
if err != nil {
return err
}
@@ -168,7 +166,7 @@ func (m *UserMembership) fillOrgName(member *usr_es_model.UserMembershipView) (e
}
func (m *UserMembership) updateOrgName(event *models.Event) error {
org, err := m.orgEvents.OrgByID(context.Background(), org_model.NewOrg(event.AggregateID))
org, err := m.getOrgByID(context.Background(), event.AggregateID)
if err != nil {
return err
}
@@ -231,7 +229,7 @@ func (m *UserMembership) processProject(event *models.Event) (err error) {
}
func (m *UserMembership) fillProjectDisplayName(member *usr_es_model.UserMembershipView) (err error) {
project, err := m.projectEvents.ProjectByID(context.Background(), member.AggregateID)
project, err := m.getProjectByID(context.Background(), member.AggregateID)
if err != nil {
return err
}
@@ -240,7 +238,7 @@ func (m *UserMembership) fillProjectDisplayName(member *usr_es_model.UserMembers
}
func (m *UserMembership) updateProjectDisplayName(event *models.Event) error {
project, err := m.projectEvents.ProjectByID(context.Background(), event.AggregateID)
project, err := m.getProjectByID(context.Background(), event.AggregateID)
if err != nil {
return err
}
@@ -272,3 +270,42 @@ func (m *UserMembership) OnError(event *models.Event, err error) error {
func (m *UserMembership) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdateUserMembershipSpoolerRunTimestamp)
}
func (u *UserMembership) getOrgByID(ctx context.Context, orgID string) (*org_model.Org, error) {
query, err := org_view.OrgByIDQuery(orgID, 0)
if err != nil {
return nil, err
}
var esOrg *org_es_model.Org
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esOrg.AppendEvents, query)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
if esOrg.Sequence == 0 {
return nil, errors.ThrowNotFound(nil, "EVENT-3m9vs", "Errors.Org.NotFound")
}
return org_es_model.OrgToModel(esOrg), nil
}
func (u *UserMembership) getProjectByID(ctx context.Context, projID string) (*proj_model.Project, error) {
query, err := proj_view.ProjectByIDQuery(projID, 0)
if err != nil {
return nil, err
}
esProject := &proj_es_model.Project{
ObjectRoot: models.ObjectRoot{
AggregateID: projID,
},
}
err = es_sdk.Filter(ctx, u.Eventstore().FilterEvents, esProject.AppendEvents, query)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
if esProject.Sequence == 0 {
return nil, errors.ThrowNotFound(nil, "EVENT-Dfrt2", "Errors.Project.NotFound")
}
return proj_es_model.ProjectToModel(esProject), nil
}

View File

@@ -2,24 +2,19 @@ package eventsourcing
import (
"context"
"github.com/caos/zitadel/internal/v2/query"
es_org "github.com/caos/zitadel/internal/org/repository/eventsourcing"
es_user "github.com/caos/zitadel/internal/user/repository/eventsourcing"
"github.com/caos/zitadel/internal/v2/query"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/auth_request/repository/cache"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/eventstore"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/handler"
"github.com/caos/zitadel/internal/authz/repository/eventsourcing/spooler"
authz_view "github.com/caos/zitadel/internal/authz/repository/eventsourcing/view"
sd "github.com/caos/zitadel/internal/config/systemdefaults"
"github.com/caos/zitadel/internal/config/types"
es_int "github.com/caos/zitadel/internal/eventstore"
es_spol "github.com/caos/zitadel/internal/eventstore/spooler"
es_iam "github.com/caos/zitadel/internal/iam/repository/eventsourcing"
"github.com/caos/zitadel/internal/id"
es_proj "github.com/caos/zitadel/internal/project/repository/eventsourcing"
)
type Config struct {
@@ -55,60 +50,30 @@ func Start(conf Config, authZ authz.Config, systemDefaults sd.SystemDefaults) (*
return nil, err
}
iam, err := es_iam.StartIAM(es_iam.IAMConfig{
Eventstore: es,
Cache: conf.Eventstore.Cache,
}, systemDefaults)
if err != nil {
return nil, err
}
org := es_org.StartOrg(es_org.OrgConfig{Eventstore: es, IAMDomain: conf.Domain}, systemDefaults)
project, err := es_proj.StartProject(es_proj.ProjectConfig{
Eventstore: es,
Cache: conf.Eventstore.Cache,
}, systemDefaults)
if err != nil {
return nil, err
}
user, err := es_user.StartUser(
es_user.UserConfig{
Eventstore: es,
Cache: conf.Eventstore.Cache,
},
systemDefaults,
)
if err != nil {
return nil, err
}
iamV2, err := query.StartQuerySide(&query.Config{Eventstore: esV2, SystemDefaults: systemDefaults})
if err != nil {
return nil, err
}
repos := handler.EventstoreRepos{IAMEvents: iam, OrgEvents: org, ProjectEvents: project}
spool := spooler.StartSpooler(conf.Spooler, es, view, sqlClient, repos, systemDefaults)
spool := spooler.StartSpooler(conf.Spooler, es, view, sqlClient, systemDefaults)
return &EsRepository{
spool,
eventstore.UserGrantRepo{
View: view,
IamID: systemDefaults.IamID,
Auth: authZ,
IamEvents: iam,
View: view,
IamID: systemDefaults.IamID,
Auth: authZ,
Eventstore: es,
},
eventstore.IamRepo{
IAMID: systemDefaults.IamID,
IAMEvents: iam,
IAMV2Query: iamV2,
},
eventstore.TokenVerifierRepo{
//TODO: Add Token Verification Key
IAMID: systemDefaults.IamID,
IAMEvents: iam,
ProjectEvents: project,
UserEvents: user,
View: view,
Eventstore: es,
IAMID: systemDefaults.IamID,
View: view,
},
}, nil
}

View File

@@ -19,12 +19,12 @@ type SpoolerConfig struct {
Handlers handler.Configs
}
func StartSpooler(c SpoolerConfig, es eventstore.Eventstore, view *view.View, sql *sql.DB, repos handler.EventstoreRepos, systemDefaults sd.SystemDefaults) *spooler.Spooler {
func StartSpooler(c SpoolerConfig, es eventstore.Eventstore, view *view.View, sql *sql.DB, systemDefaults sd.SystemDefaults) *spooler.Spooler {
spoolerConfig := spooler.Config{
Eventstore: es,
Locker: &locker{dbClient: sql},
ConcurrentWorkers: c.ConcurrentWorkers,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, repos, systemDefaults),
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults),
}
spool := spoolerConfig.New()
spool.Start()