mirror of
https://github.com/zitadel/zitadel.git
synced 2025-10-18 02:42:44 +00:00
feat: project view (#90)
* init for views (spooler, handler) * init for views (spooler, handler) * start view in management * granted project * implement granted project view * search granted projects * fix search column * update all projects on project change * search roles * filter org * project members * project grant members * fix tests * application view * project grant search * mock * test appendevents * test appendevents * Update internal/view/query.go Co-authored-by: Livio Amstutz <livio.a@gmail.com> * Update internal/eventstore/spooler/spooler.go Co-authored-by: Livio Amstutz <livio.a@gmail.com> * Update internal/view/query.go Co-authored-by: Livio Amstutz <livio.a@gmail.com> * merge request changes * Update internal/project/repository/view/model/application.go Co-authored-by: Livio Amstutz <livio.a@gmail.com> * merge request changes * Project view sql (#92) * sql and configs * error handling * sql start in eventstore * on error handling, config * read user on members * Update internal/project/repository/view/application_view.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/application.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/application.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/application.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/application.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/application.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/application_query.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_grant_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_grant_member_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_grant_member_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_member_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_member_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/granted_project.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * return caos errors * Update internal/project/repository/view/model/granted_project_query.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/project_grant_member.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/project_grant_member_query.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/project_member.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/project_member_query.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/project_role.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update internal/project/repository/view/model/project_role_query.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/application_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/application_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update pkg/management/api/grpc/project_converter.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * converter fix Co-authored-by: Livio Amstutz <livio.a@gmail.com> Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
@@ -1,15 +1,18 @@
|
||||
package eventsourcing
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/view"
|
||||
"github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
)
|
||||
|
||||
type ProjectRepo struct {
|
||||
SearchLimit uint64
|
||||
ProjectEvents *proj_event.ProjectEventstore
|
||||
//view *view.View
|
||||
View *view.View
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) ProjectByID(ctx context.Context, id string) (project *proj_model.Project, err error) {
|
||||
@@ -33,6 +36,20 @@ func (repo *ProjectRepo) ReactivateProject(ctx context.Context, id string) (*pro
|
||||
return repo.ProjectEvents.ReactivateProject(ctx, id)
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) SearchGrantedProjects(ctx context.Context, request *proj_model.GrantedProjectSearchRequest) (*proj_model.GrantedProjectSearchResponse, error) {
|
||||
request.EnsureLimit(repo.SearchLimit)
|
||||
projects, count, err := repo.View.SearchGrantedProjects(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &proj_model.GrantedProjectSearchResponse{
|
||||
Offset: request.Offset,
|
||||
Limit: request.Limit,
|
||||
TotalResult: uint64(count),
|
||||
Result: model.GrantedProjectsToModel(projects),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) ProjectMemberByID(ctx context.Context, projectID, userID string) (member *proj_model.ProjectMember, err error) {
|
||||
member = proj_model.NewProjectMember(projectID, userID)
|
||||
return repo.ProjectEvents.ProjectMemberByIDs(ctx, member)
|
||||
@@ -51,6 +68,20 @@ func (repo *ProjectRepo) RemoveProjectMember(ctx context.Context, projectID, use
|
||||
return repo.ProjectEvents.RemoveProjectMember(ctx, member)
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) SearchProjectMembers(ctx context.Context, request *proj_model.ProjectMemberSearchRequest) (*proj_model.ProjectMemberSearchResponse, error) {
|
||||
request.EnsureLimit(repo.SearchLimit)
|
||||
members, count, err := repo.View.SearchProjectMembers(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &proj_model.ProjectMemberSearchResponse{
|
||||
Offset: request.Offset,
|
||||
Limit: request.Limit,
|
||||
TotalResult: uint64(count),
|
||||
Result: model.ProjectMembersToModel(members),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) AddProjectRole(ctx context.Context, member *proj_model.ProjectRole) (*proj_model.ProjectRole, error) {
|
||||
return repo.ProjectEvents.AddProjectRole(ctx, member)
|
||||
}
|
||||
@@ -64,6 +95,20 @@ func (repo *ProjectRepo) RemoveProjectRole(ctx context.Context, projectID, key s
|
||||
return repo.ProjectEvents.RemoveProjectRole(ctx, member)
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) SearchProjectRoles(ctx context.Context, request *proj_model.ProjectRoleSearchRequest) (*proj_model.ProjectRoleSearchResponse, error) {
|
||||
request.EnsureLimit(repo.SearchLimit)
|
||||
roles, count, err := repo.View.SearchProjectRoles(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &proj_model.ProjectRoleSearchResponse{
|
||||
Offset: request.Offset,
|
||||
Limit: request.Limit,
|
||||
TotalResult: uint64(count),
|
||||
Result: model.ProjectRolesToModel(roles),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) ApplicationByID(ctx context.Context, projectID, appID string) (app *proj_model.Application, err error) {
|
||||
return repo.ProjectEvents.ApplicationByIDs(ctx, projectID, appID)
|
||||
}
|
||||
@@ -89,6 +134,20 @@ func (repo *ProjectRepo) RemoveApplication(ctx context.Context, projectID, appID
|
||||
return repo.ProjectEvents.RemoveApplication(ctx, app)
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) SearchApplications(ctx context.Context, request *proj_model.ApplicationSearchRequest) (*proj_model.ApplicationSearchResponse, error) {
|
||||
request.EnsureLimit(repo.SearchLimit)
|
||||
apps, count, err := repo.View.SearchApplications(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &proj_model.ApplicationSearchResponse{
|
||||
Offset: request.Offset,
|
||||
Limit: request.Limit,
|
||||
TotalResult: uint64(count),
|
||||
Result: model.ApplicationViewsToModel(apps),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) ChangeOIDCConfig(ctx context.Context, config *proj_model.OIDCConfig) (*proj_model.OIDCConfig, error) {
|
||||
return repo.ProjectEvents.ChangeOIDCConfig(ctx, config)
|
||||
}
|
||||
@@ -139,3 +198,17 @@ func (repo *ProjectRepo) RemoveProjectGrantMember(ctx context.Context, projectID
|
||||
member := proj_model.NewProjectGrantMember(projectID, grantID, userID)
|
||||
return repo.ProjectEvents.RemoveProjectGrantMember(ctx, member)
|
||||
}
|
||||
|
||||
func (repo *ProjectRepo) SearchProjectGrantMembers(ctx context.Context, request *proj_model.ProjectGrantMemberSearchRequest) (*proj_model.ProjectGrantMemberSearchResponse, error) {
|
||||
request.EnsureLimit(repo.SearchLimit)
|
||||
members, count, err := repo.View.SearchProjectGrantMembers(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &proj_model.ProjectGrantMemberSearchResponse{
|
||||
Offset: request.Offset,
|
||||
Limit: request.Limit,
|
||||
TotalResult: uint64(count),
|
||||
Result: model.ProjectGrantMembersToModel(members),
|
||||
}, nil
|
||||
}
|
@@ -1,4 +1,4 @@
|
||||
package eventsourcing
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"context"
|
@@ -1,4 +1,4 @@
|
||||
package eventsourcing
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"context"
|
@@ -0,0 +1,74 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
"github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
|
||||
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Application struct {
|
||||
handler
|
||||
projectEvents *proj_event.ProjectEventstore
|
||||
}
|
||||
|
||||
const (
|
||||
applicationTable = "management.applications"
|
||||
)
|
||||
|
||||
func (p *Application) MinimumCycleDuration() time.Duration { return p.cycleDuration }
|
||||
|
||||
func (p *Application) ViewModel() string {
|
||||
return applicationTable
|
||||
}
|
||||
|
||||
func (p *Application) EventQuery() (*models.SearchQuery, error) {
|
||||
sequence, err := p.view.GetLatestApplicationSequence()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventsourcing.ProjectQuery(sequence), nil
|
||||
}
|
||||
|
||||
func (p *Application) Process(event *models.Event) (err error) {
|
||||
app := new(view_model.ApplicationView)
|
||||
switch event.Type {
|
||||
case es_model.ApplicationAdded:
|
||||
app.AppendEvent(event)
|
||||
case es_model.ApplicationChanged,
|
||||
es_model.OIDCConfigAdded,
|
||||
es_model.OIDCConfigChanged,
|
||||
es_model.ApplicationDeactivated,
|
||||
es_model.ApplicationReactivated:
|
||||
err := app.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
app, err = p.view.ApplicationByID(app.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
app.AppendEvent(event)
|
||||
case es_model.ApplicationRemoved:
|
||||
err := app.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.DeleteApplication(app.ID, event.Sequence)
|
||||
default:
|
||||
return p.view.ProcessedApplicationSequence(event.Sequence)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.PutApplication(app)
|
||||
}
|
||||
|
||||
func (p *Application) OnError(event *models.Event, soolerError error) error {
|
||||
logging.LogWithFields("SPOOL-ls9ew", "id", event.AggregateID).WithError(soolerError).Warn("something went wrong in project app handler")
|
||||
return spooler.HandleError(event, p.view.GetLatestApplicationFailedEvent, p.view.ProcessedApplicationFailedEvent, p.view.ProcessedApplicationSequence, p.errorCountUntilSkip)
|
||||
}
|
@@ -0,0 +1,125 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/caos/logging"
|
||||
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
"github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
|
||||
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
)
|
||||
|
||||
type GrantedProject struct {
|
||||
handler
|
||||
eventstore eventstore.Eventstore
|
||||
projectEvents *proj_event.ProjectEventstore
|
||||
}
|
||||
|
||||
const (
|
||||
grantedProjectTable = "management.granted_projects"
|
||||
)
|
||||
|
||||
func (p *GrantedProject) MinimumCycleDuration() time.Duration { return p.cycleDuration }
|
||||
|
||||
func (p *GrantedProject) ViewModel() string {
|
||||
return grantedProjectTable
|
||||
}
|
||||
|
||||
func (p *GrantedProject) EventQuery() (*models.SearchQuery, error) {
|
||||
sequence, err := p.view.GetLatestGrantedProjectSequence()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventsourcing.ProjectQuery(sequence), nil
|
||||
}
|
||||
|
||||
func (p *GrantedProject) Process(event *models.Event) (err error) {
|
||||
grantedProject := new(view_model.GrantedProjectView)
|
||||
switch event.Type {
|
||||
case es_model.ProjectAdded:
|
||||
grantedProject.AppendEvent(event)
|
||||
case es_model.ProjectChanged:
|
||||
grantedProject, err = p.view.GrantedProjectByIDs(event.AggregateID, event.ResourceOwner)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = grantedProject.AppendEvent(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.updateExistingProjects(grantedProject)
|
||||
case es_model.ProjectDeactivated, es_model.ProjectReactivated:
|
||||
grantedProject, err = p.view.GrantedProjectByIDs(event.AggregateID, event.ResourceOwner)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = grantedProject.AppendEvent(event)
|
||||
case es_model.ProjectGrantAdded:
|
||||
err = grantedProject.AppendEvent(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
project, err := p.getProject(grantedProject.ProjectID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
grantedProject.Name = project.Name
|
||||
//TODO: read org
|
||||
case es_model.ProjectGrantChanged:
|
||||
grant := new(view_model.ProjectGrant)
|
||||
err := grant.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
grantedProject, err = p.view.GrantedProjectByIDs(event.AggregateID, grant.GrantedOrgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = grantedProject.AppendEvent(event)
|
||||
case es_model.ProjectGrantRemoved:
|
||||
grant := new(view_model.ProjectGrant)
|
||||
err := grant.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.DeleteGrantedProject(event.AggregateID, grant.GrantedOrgID, event.Sequence)
|
||||
default:
|
||||
return p.view.ProcessedGrantedProjectSequence(event.Sequence)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.PutGrantedProject(grantedProject)
|
||||
}
|
||||
|
||||
func (p *GrantedProject) getOrg(orgID string) {
|
||||
//TODO: Get Org
|
||||
}
|
||||
|
||||
func (p *GrantedProject) getProject(projectID string) (*model.Project, error) {
|
||||
return p.projectEvents.ProjectByID(context.Background(), projectID)
|
||||
}
|
||||
|
||||
func (p *GrantedProject) updateExistingProjects(project *view_model.GrantedProjectView) {
|
||||
projects, err := p.view.GrantedProjectsByID(project.ProjectID)
|
||||
if err != nil {
|
||||
logging.LogWithFields("SPOOL-los03", "id", project.ProjectID).WithError(err).Warn("could not update existing projects")
|
||||
}
|
||||
for _, existing := range projects {
|
||||
existing.Name = project.Name
|
||||
err := p.view.PutGrantedProject(existing)
|
||||
logging.LogWithFields("SPOOL-sjwi3", "id", existing.ProjectID).WithError(err).Warn("could not update existing project")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *GrantedProject) OnError(event *models.Event, err error) error {
|
||||
logging.LogWithFields("SPOOL-is8wa", "id", event.AggregateID).WithError(err).Warn("something went wrong in granted projecthandler")
|
||||
return spooler.HandleError(event, p.view.GetLatestGrantedProjectFailedEvent, p.view.ProcessedGrantedProjectFailedEvent, p.view.ProcessedGrantedProjectSequence, p.errorCountUntilSkip)
|
||||
}
|
@@ -0,0 +1,44 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/view"
|
||||
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Configs map[string]*Config
|
||||
|
||||
type Config struct {
|
||||
MinimumCycleDurationMillisecond int
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
view *view.View
|
||||
bulkLimit uint64
|
||||
cycleDuration time.Duration
|
||||
errorCountUntilSkip uint64
|
||||
}
|
||||
|
||||
type EventstoreRepos struct {
|
||||
ProjectEvents *proj_event.ProjectEventstore
|
||||
}
|
||||
|
||||
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, eventstore eventstore.Eventstore, repos EventstoreRepos) []spooler.Handler {
|
||||
return []spooler.Handler{
|
||||
&GrantedProject{handler: handler{view, bulkLimit, configs.cycleDuration("GrantedProject"), errorCount}, eventstore: eventstore, projectEvents: repos.ProjectEvents},
|
||||
&ProjectRole{handler: handler{view, bulkLimit, configs.cycleDuration("ProjectRole"), errorCount}, projectEvents: repos.ProjectEvents},
|
||||
&ProjectMember{handler: handler{view, bulkLimit, configs.cycleDuration("ProjectMember"), errorCount}},
|
||||
&ProjectGrantMember{handler: handler{view, bulkLimit, configs.cycleDuration("ProjectGrantMember"), errorCount}},
|
||||
&Application{handler: handler{view, bulkLimit, configs.cycleDuration("Application"), errorCount}},
|
||||
}
|
||||
}
|
||||
|
||||
func (configs Configs) cycleDuration(viewModel string) time.Duration {
|
||||
c, ok := configs[viewModel]
|
||||
if !ok {
|
||||
return 1 * time.Second
|
||||
}
|
||||
return time.Duration(c.MinimumCycleDurationMillisecond) * time.Millisecond
|
||||
}
|
@@ -0,0 +1,127 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
es_models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
proj_es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
|
||||
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
usr_model "github.com/caos/zitadel/internal/user/model"
|
||||
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
|
||||
usr_es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ProjectGrantMember struct {
|
||||
handler
|
||||
userEvents *usr_event.UserEventstore
|
||||
}
|
||||
|
||||
const (
|
||||
projectGrantMemberTable = "management.project_grant_members"
|
||||
)
|
||||
|
||||
func (p *ProjectGrantMember) MinimumCycleDuration() time.Duration { return p.cycleDuration }
|
||||
|
||||
func (p *ProjectGrantMember) ViewModel() string {
|
||||
return projectGrantMemberTable
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) EventQuery() (*models.SearchQuery, error) {
|
||||
sequence, err := p.view.GetLatestProjectMemberSequence()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return es_models.NewSearchQuery().
|
||||
AggregateTypeFilter(proj_es_model.ProjectAggregate, usr_es_model.UserAggregate).
|
||||
LatestSequenceFilter(sequence), nil
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) Process(event *models.Event) (err error) {
|
||||
switch event.AggregateType {
|
||||
case proj_es_model.ProjectAggregate:
|
||||
err = p.processProjectGrantMember(event)
|
||||
case usr_es_model.UserAggregate:
|
||||
err = p.processUser(event)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) processProjectGrantMember(event *models.Event) (err error) {
|
||||
member := new(view_model.ProjectGrantMemberView)
|
||||
switch event.Type {
|
||||
case proj_es_model.ProjectGrantMemberAdded:
|
||||
member.AppendEvent(event)
|
||||
p.fillData(member)
|
||||
case proj_es_model.ProjectGrantMemberChanged:
|
||||
err := member.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
member, err = p.view.ProjectGrantMemberByIDs(member.GrantID, member.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
member.AppendEvent(event)
|
||||
case proj_es_model.ProjectGrantMemberRemoved:
|
||||
err := member.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.DeleteProjectGrantMember(event.AggregateID, member.UserID, event.Sequence)
|
||||
default:
|
||||
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.PutProjectGrantMember(member, member.Sequence)
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) processUser(event *models.Event) (err error) {
|
||||
switch event.Type {
|
||||
case usr_es_model.UserProfileChanged,
|
||||
usr_es_model.UserEmailChanged:
|
||||
members, err := p.view.ProjectGrantMembersByUserID(event.AggregateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
user, err := p.userEvents.UserByID(context.Background(), event.AggregateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, member := range members {
|
||||
p.fillUserData(member, user)
|
||||
err = p.view.PutProjectGrantMember(member, event.Sequence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
default:
|
||||
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) fillData(member *view_model.ProjectGrantMemberView) (err error) {
|
||||
user, err := p.userEvents.UserByID(context.Background(), member.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.fillUserData(member, user)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) fillUserData(member *view_model.ProjectGrantMemberView, user *usr_model.User) {
|
||||
member.UserName = user.UserName
|
||||
member.FirstName = user.FirstName
|
||||
member.LastName = user.LastName
|
||||
member.Email = user.EmailAddress
|
||||
}
|
||||
|
||||
func (p *ProjectGrantMember) OnError(event *models.Event, err error) error {
|
||||
logging.LogWithFields("SPOOL-kls93", "id", event.AggregateID).WithError(err).Warn("something went wrong in projectmember handler")
|
||||
return spooler.HandleError(event, p.view.GetLatestProjectGrantMemberFailedEvent, p.view.ProcessedProjectGrantMemberFailedEvent, p.view.ProcessedProjectGrantMemberSequence, p.errorCountUntilSkip)
|
||||
}
|
@@ -0,0 +1,126 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
es_models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
proj_es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
|
||||
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
usr_model "github.com/caos/zitadel/internal/user/model"
|
||||
usr_event "github.com/caos/zitadel/internal/user/repository/eventsourcing"
|
||||
usr_es_model "github.com/caos/zitadel/internal/user/repository/eventsourcing/model"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ProjectMember struct {
|
||||
handler
|
||||
userEvents *usr_event.UserEventstore
|
||||
}
|
||||
|
||||
const (
|
||||
projectMemberTable = "management.project_members"
|
||||
)
|
||||
|
||||
func (p *ProjectMember) MinimumCycleDuration() time.Duration { return p.cycleDuration }
|
||||
|
||||
func (p *ProjectMember) ViewModel() string {
|
||||
return projectMemberTable
|
||||
}
|
||||
|
||||
func (p *ProjectMember) EventQuery() (*models.SearchQuery, error) {
|
||||
sequence, err := p.view.GetLatestProjectMemberSequence()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return es_models.NewSearchQuery().
|
||||
AggregateTypeFilter(proj_es_model.ProjectAggregate, usr_es_model.UserAggregate).
|
||||
LatestSequenceFilter(sequence), nil
|
||||
}
|
||||
|
||||
func (p *ProjectMember) Process(event *models.Event) (err error) {
|
||||
switch event.AggregateType {
|
||||
case proj_es_model.ProjectAggregate:
|
||||
err = p.processProjectMember(event)
|
||||
case usr_es_model.UserAggregate:
|
||||
err = p.processUser(event)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (p *ProjectMember) processProjectMember(event *models.Event) (err error) {
|
||||
member := new(view_model.ProjectMemberView)
|
||||
switch event.Type {
|
||||
case proj_es_model.ProjectMemberAdded:
|
||||
member.AppendEvent(event)
|
||||
p.fillData(member)
|
||||
case proj_es_model.ProjectMemberChanged:
|
||||
err := member.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
member, err = p.view.ProjectMemberByIDs(event.AggregateID, member.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
member.AppendEvent(event)
|
||||
case proj_es_model.ProjectMemberRemoved:
|
||||
err := member.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.DeleteProjectMember(event.AggregateID, member.UserID, event.Sequence)
|
||||
default:
|
||||
return p.view.ProcessedProjectMemberSequence(event.Sequence)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.PutProjectMember(member, member.Sequence)
|
||||
}
|
||||
|
||||
func (p *ProjectMember) processUser(event *models.Event) (err error) {
|
||||
switch event.Type {
|
||||
case usr_es_model.UserProfileChanged,
|
||||
usr_es_model.UserEmailChanged:
|
||||
members, err := p.view.ProjectMembersByUserID(event.AggregateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
user, err := p.userEvents.UserByID(context.Background(), event.AggregateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, member := range members {
|
||||
p.fillUserData(member, user)
|
||||
err = p.view.PutProjectMember(member, event.Sequence)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
default:
|
||||
return p.view.ProcessedProjectMemberSequence(event.Sequence)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectMember) fillData(member *view_model.ProjectMemberView) (err error) {
|
||||
user, err := p.userEvents.UserByID(context.Background(), member.UserID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.fillUserData(member, user)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectMember) fillUserData(member *view_model.ProjectMemberView, user *usr_model.User) {
|
||||
member.UserName = user.UserName
|
||||
member.FirstName = user.FirstName
|
||||
member.LastName = user.LastName
|
||||
member.Email = user.EmailAddress
|
||||
}
|
||||
func (p *ProjectMember) OnError(event *models.Event, err error) error {
|
||||
logging.LogWithFields("SPOOL-u73es", "id", event.AggregateID).WithError(err).Warn("something went wrong in projectmember handler")
|
||||
return spooler.HandleError(event, p.view.GetLatestProjectMemberFailedEvent, p.view.ProcessedProjectMemberFailedEvent, p.view.ProcessedProjectMemberSequence, p.errorCountUntilSkip)
|
||||
}
|
@@ -0,0 +1,152 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
proj_event "github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
es_model "github.com/caos/zitadel/internal/project/repository/eventsourcing/model"
|
||||
view_model "github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
"time"
|
||||
)
|
||||
|
||||
type ProjectRole struct {
|
||||
handler
|
||||
projectEvents *proj_event.ProjectEventstore
|
||||
}
|
||||
|
||||
const (
|
||||
projectRoleTable = "management.project_roles"
|
||||
)
|
||||
|
||||
func (p *ProjectRole) MinimumCycleDuration() time.Duration { return p.cycleDuration }
|
||||
|
||||
func (p *ProjectRole) ViewModel() string {
|
||||
return projectRoleTable
|
||||
}
|
||||
|
||||
func (p *ProjectRole) EventQuery() (*models.SearchQuery, error) {
|
||||
sequence, err := p.view.GetLatestProjectRoleSequence()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return eventsourcing.ProjectQuery(sequence), nil
|
||||
}
|
||||
|
||||
func (p *ProjectRole) Process(event *models.Event) (err error) {
|
||||
role := new(view_model.ProjectRoleView)
|
||||
switch event.Type {
|
||||
case es_model.ProjectRoleAdded:
|
||||
role.AppendEvent(event)
|
||||
case es_model.ProjectRoleChanged:
|
||||
err := role.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
role, err = p.view.ProjectRoleByIDs(event.AggregateID, event.ResourceOwner, role.Key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
role.AppendEvent(event)
|
||||
case es_model.ProjectRoleRemoved:
|
||||
err := role.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = p.removeRoleFromAllResourceowners(event, role)
|
||||
case es_model.ProjectGrantAdded:
|
||||
return p.addGrantRoles(event)
|
||||
case es_model.ProjectGrantChanged:
|
||||
err = p.removeRolesFromResourceowner(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.addGrantRoles(event)
|
||||
case es_model.ProjectGrantRemoved:
|
||||
return p.removeRolesFromResourceowner(event)
|
||||
default:
|
||||
return p.view.ProcessedProjectRoleSequence(event.Sequence)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return p.view.PutProjectRole(role)
|
||||
}
|
||||
|
||||
func (p *ProjectRole) removeRoleFromAllResourceowners(event *models.Event, role *view_model.ProjectRoleView) error {
|
||||
roles, err := p.view.ResourceOwnerProjectRolesByKey(event.AggregateID, event.ResourceOwner, role.Key)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-slo03", "aggregateID", event.AggregateID, "ResourceOwner", event.ResourceOwner, "Key", role.Key).WithError(err).Warn("could not read roles to remove")
|
||||
return err
|
||||
}
|
||||
for _, r := range roles {
|
||||
err = p.view.DeleteProjectRole(r.ProjectID, r.OrgID, r.Key, event.Sequence)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-kloa2", "aggregateID", event.AggregateID, "ResourceOwner", event.ResourceOwner, "OrgID", r.OrgID, "Key", role.Key).WithError(err).Warn("could not remove role")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectRole) removeRolesFromResourceowner(event *models.Event) error {
|
||||
roles, err := p.view.ResourceOwnerProjectRoles(event.AggregateID, event.ResourceOwner)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-slo03", "aggregateID", event.AggregateID, "ResourceOwner", event.ResourceOwner, "Key").WithError(err).Warn("could not read roles to remove")
|
||||
return err
|
||||
}
|
||||
for _, r := range roles {
|
||||
err = p.view.DeleteProjectRole(r.ProjectID, r.OrgID, r.Key, event.Sequence)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-kloa2", "aggregateID", event.AggregateID, "ResourceOwner", event.ResourceOwner, "OrgID", r.OrgID).WithError(err).Warn("could not remove role")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectRole) addGrantRoles(event *models.Event) error {
|
||||
project, err := p.projectEvents.ProjectByID(context.Background(), event.AggregateID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
grant := new(view_model.ProjectGrant)
|
||||
err = grant.SetData(event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, roleKey := range grant.RoleKeys {
|
||||
role := getRoleFromProject(roleKey, project)
|
||||
projectRole := &view_model.ProjectRoleView{
|
||||
OrgID: grant.GrantedOrgID,
|
||||
ProjectID: event.AggregateID,
|
||||
Key: roleKey,
|
||||
DisplayName: role.DisplayName,
|
||||
Group: role.Group,
|
||||
ResourceOwner: event.ResourceOwner,
|
||||
CreationDate: event.CreationDate,
|
||||
Sequence: event.Sequence,
|
||||
}
|
||||
err := p.view.PutProjectRole(projectRole)
|
||||
logging.LogWithFields("HANDL-sj3TG", "eventID", event.ID).OnError(err).Warn("could not save project role")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRoleFromProject(roleKey string, project *proj_model.Project) *proj_model.ProjectRole {
|
||||
for _, role := range project.Roles {
|
||||
if roleKey == role.Key {
|
||||
return role
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ProjectRole) OnError(event *models.Event, err error) error {
|
||||
logging.LogWithFields("SPOOL-lso9w", "id", event.AggregateID).WithError(err).Warn("something went wrong in project role handler")
|
||||
return spooler.HandleError(event, p.view.GetLatestProjectRoleFailedEvent, p.view.ProcessedProjectRoleFailedEvent, p.view.ProcessedProjectRoleSequence, p.errorCountUntilSkip)
|
||||
}
|
@@ -3,24 +3,30 @@ package eventsourcing
|
||||
import (
|
||||
"context"
|
||||
sd "github.com/caos/zitadel/internal/config/systemdefaults"
|
||||
|
||||
"github.com/caos/zitadel/internal/config/types"
|
||||
es_int "github.com/caos/zitadel/internal/eventstore"
|
||||
es_spol "github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/eventstore"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/handler"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/spooler"
|
||||
mgmt_view "github.com/caos/zitadel/internal/management/repository/eventsourcing/view"
|
||||
es_proj "github.com/caos/zitadel/internal/project/repository/eventsourcing"
|
||||
es_usr "github.com/caos/zitadel/internal/user/repository/eventsourcing"
|
||||
es_grant "github.com/caos/zitadel/internal/usergrant/repository/eventsourcing"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Eventstore es_int.Config
|
||||
//View view.ViewConfig
|
||||
//Spooler spooler.SpoolerConfig
|
||||
SearchLimit uint64
|
||||
Eventstore es_int.Config
|
||||
View types.SQL
|
||||
Spooler spooler.SpoolerConfig
|
||||
}
|
||||
|
||||
type EsRepository struct {
|
||||
//spooler *es_spooler.Spooler
|
||||
ProjectRepo
|
||||
UserRepo
|
||||
UserGrantRepo
|
||||
spooler *es_spol.Spooler
|
||||
eventstore.ProjectRepo
|
||||
eventstore.UserRepo
|
||||
eventstore.UserGrantRepo
|
||||
}
|
||||
|
||||
func Start(conf Config, systemDefaults sd.SystemDefaults) (*EsRepository, error) {
|
||||
@@ -29,15 +35,14 @@ func Start(conf Config, systemDefaults sd.SystemDefaults) (*EsRepository, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//view, sql, err := mgmt_view.StartView(conf.View)
|
||||
//if err != nil {
|
||||
// return nil, err
|
||||
//}
|
||||
|
||||
//conf.Spooler.View = view
|
||||
//conf.Spooler.EsClient = es.Client
|
||||
//conf.Spooler.SQL = sql
|
||||
//spool := spooler.StartSpooler(conf.Spooler)
|
||||
sqlClient, err := conf.View.Start()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
view, err := mgmt_view.StartView(sqlClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
project, err := es_proj.StartProject(es_proj.ProjectConfig{
|
||||
Eventstore: es,
|
||||
@@ -60,10 +65,14 @@ func Start(conf Config, systemDefaults sd.SystemDefaults) (*EsRepository, error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
eventstoreRepos := handler.EventstoreRepos{ProjectEvents: project}
|
||||
spool := spooler.StartSpooler(conf.Spooler, es, view, sqlClient, eventstoreRepos)
|
||||
|
||||
return &EsRepository{
|
||||
ProjectRepo{project},
|
||||
UserRepo{user},
|
||||
UserGrantRepo{usergrant},
|
||||
spool,
|
||||
eventstore.ProjectRepo{conf.SearchLimit, project, view},
|
||||
eventstore.UserRepo{user},
|
||||
eventstore.UserGrantRepo{usergrant},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
46
internal/management/repository/eventsourcing/spooler/lock.go
Normal file
46
internal/management/repository/eventsourcing/spooler/lock.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
caos_errs "github.com/caos/zitadel/internal/errors"
|
||||
"time"
|
||||
|
||||
"github.com/cockroachdb/cockroach-go/crdb"
|
||||
)
|
||||
|
||||
const (
|
||||
lockTable = "management.locks"
|
||||
lockedUntilKey = "locked_until"
|
||||
lockerIDKey = "locker_id"
|
||||
objectTypeKey = "object_type"
|
||||
)
|
||||
|
||||
type locker struct {
|
||||
dbClient *sql.DB
|
||||
}
|
||||
|
||||
type lock struct {
|
||||
LockerID string `gorm:"column:locker_id;primary_key"`
|
||||
LockedUntil time.Time `gorm:"column:locked_until"`
|
||||
ViewName string `gorm:"column:object_type;primary_key"`
|
||||
}
|
||||
|
||||
func (l *locker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
|
||||
return crdb.ExecuteTx(context.Background(), l.dbClient, nil, func(tx *sql.Tx) error {
|
||||
query := fmt.Sprintf("INSERT INTO %s (%s, %s, %s) VALUES ($1, $2, now()+$3) ON CONFLICT (%s) DO UPDATE SET %s = now()+$4, %s = $5 WHERE (locks.%s < now() OR locks.%s = $6) AND locks.%s = $7",
|
||||
lockTable, objectTypeKey, lockerIDKey, lockedUntilKey, objectTypeKey, lockedUntilKey, lockerIDKey, lockedUntilKey, lockerIDKey, objectTypeKey)
|
||||
|
||||
rs, err := tx.Exec(query, viewModel, lockerID, waitTime.Seconds(), waitTime.Seconds(), lockerID, lockerID, viewModel)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
if rows, _ := rs.RowsAffected(); rows == 0 {
|
||||
tx.Rollback()
|
||||
return caos_errs.ThrowAlreadyExists(nil, "SPOOL-lso0e", "view already locked")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
@@ -0,0 +1,127 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
)
|
||||
|
||||
type dbMock struct {
|
||||
db *sql.DB
|
||||
mock sqlmock.Sqlmock
|
||||
}
|
||||
|
||||
func mockDB(t *testing.T) *dbMock {
|
||||
mockDB := dbMock{}
|
||||
var err error
|
||||
mockDB.db, mockDB.mock, err = sqlmock.New()
|
||||
if err != nil {
|
||||
t.Fatalf("error occured while creating stub db %v", err)
|
||||
}
|
||||
|
||||
mockDB.mock.MatchExpectationsInOrder(true)
|
||||
|
||||
return &mockDB
|
||||
}
|
||||
|
||||
func (db *dbMock) expectCommit() *dbMock {
|
||||
db.mock.ExpectCommit()
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *dbMock) expectRollback() *dbMock {
|
||||
db.mock.ExpectRollback()
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *dbMock) expectBegin() *dbMock {
|
||||
db.mock.ExpectBegin()
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *dbMock) expectSavepoint() *dbMock {
|
||||
db.mock.ExpectExec("SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *dbMock) expectReleaseSavepoint() *dbMock {
|
||||
db.mock.ExpectExec("RELEASE SAVEPOINT").WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func (db *dbMock) expectRenew(lockerID, view string, affectedRows int64) *dbMock {
|
||||
query := db.mock.
|
||||
ExpectExec(`INSERT INTO management\.locks \(object_type, locker_id, locked_until\) VALUES \(\$1, \$2, now\(\)\+\$3\) ON CONFLICT \(object_type\) DO UPDATE SET locked_until = now\(\)\+\$4, locker_id = \$5 WHERE \(locks\.locked_until < now\(\) OR locks\.locker_id = \$6\) AND locks\.object_type = \$7`).
|
||||
WithArgs(view, lockerID, sqlmock.AnyArg(), sqlmock.AnyArg(), lockerID, lockerID, view).
|
||||
WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
|
||||
if affectedRows == 0 {
|
||||
query.WillReturnResult(sqlmock.NewResult(0, 0))
|
||||
} else {
|
||||
query.WillReturnResult(sqlmock.NewResult(1, affectedRows))
|
||||
}
|
||||
|
||||
return db
|
||||
}
|
||||
|
||||
func Test_locker_Renew(t *testing.T) {
|
||||
type fields struct {
|
||||
db *dbMock
|
||||
}
|
||||
type args struct {
|
||||
lockerID string
|
||||
viewModel string
|
||||
waitTime time.Duration
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "renew succeeded",
|
||||
fields: fields{
|
||||
db: mockDB(t).
|
||||
expectBegin().
|
||||
expectSavepoint().
|
||||
expectRenew("locker", "view", 1).
|
||||
expectReleaseSavepoint().
|
||||
expectCommit(),
|
||||
},
|
||||
args: args{lockerID: "locker", viewModel: "view", waitTime: 1 * time.Second},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "renew now rows updated",
|
||||
fields: fields{
|
||||
db: mockDB(t).
|
||||
expectBegin().
|
||||
expectSavepoint().
|
||||
expectRenew("locker", "view", 0).
|
||||
expectRollback(),
|
||||
},
|
||||
args: args{lockerID: "locker", viewModel: "view", waitTime: 1 * time.Second},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
l := &locker{
|
||||
dbClient: tt.fields.db.db,
|
||||
}
|
||||
if err := l.Renew(tt.args.lockerID, tt.args.viewModel, tt.args.waitTime); (err != nil) != tt.wantErr {
|
||||
t.Errorf("locker.Renew() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
if err := tt.fields.db.mock.ExpectationsWereMet(); err != nil {
|
||||
t.Errorf("not all database expectations met: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@@ -0,0 +1,28 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/handler"
|
||||
"github.com/caos/zitadel/internal/management/repository/eventsourcing/view"
|
||||
)
|
||||
|
||||
type SpoolerConfig struct {
|
||||
BulkLimit uint64
|
||||
FailureCountUntilSkip uint64
|
||||
ConcurrentTasks int
|
||||
Handlers handler.Configs
|
||||
}
|
||||
|
||||
func StartSpooler(c SpoolerConfig, es eventstore.Eventstore, view *view.View, sql *sql.DB, eventstoreRepos handler.EventstoreRepos) *spooler.Spooler {
|
||||
spoolerConfig := spooler.Config{
|
||||
Eventstore: es,
|
||||
Locker: &locker{dbClient: sql},
|
||||
ConcurrentTasks: c.ConcurrentTasks,
|
||||
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, eventstoreRepos),
|
||||
}
|
||||
spool := spoolerConfig.New()
|
||||
spool.Start()
|
||||
return spool
|
||||
}
|
@@ -0,0 +1,52 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/view"
|
||||
"github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
global_view "github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
applicationTable = "management.applications"
|
||||
)
|
||||
|
||||
func (v *View) ApplicationByID(appID string) (*model.ApplicationView, error) {
|
||||
return view.ApplicationByID(v.Db, applicationTable, appID)
|
||||
}
|
||||
|
||||
func (v *View) SearchApplications(request *proj_model.ApplicationSearchRequest) ([]*model.ApplicationView, int, error) {
|
||||
return view.SearchApplications(v.Db, applicationTable, request)
|
||||
}
|
||||
|
||||
func (v *View) PutApplication(project *model.ApplicationView) error {
|
||||
err := view.PutApplication(v.Db, applicationTable, project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return v.ProcessedApplicationSequence(project.Sequence)
|
||||
}
|
||||
|
||||
func (v *View) DeleteApplication(appID string, eventSequence uint64) error {
|
||||
err := view.DeleteApplication(v.Db, applicationTable, appID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v.ProcessedApplicationSequence(eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestApplicationSequence() (uint64, error) {
|
||||
return v.latestSequence(applicationTable)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedApplicationSequence(eventSequence uint64) error {
|
||||
return v.saveCurrentSequence(applicationTable, eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestApplicationFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
|
||||
return v.latestFailedEvent(applicationTable, sequence)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedApplicationFailedEvent(failedEvent *global_view.FailedEvent) error {
|
||||
return v.saveFailedEvent(failedEvent)
|
||||
}
|
@@ -0,0 +1,17 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
"github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
errTable = "management.failed_event"
|
||||
)
|
||||
|
||||
func (v *View) saveFailedEvent(failedEvent *view.FailedEvent) error {
|
||||
return view.SaveFailedEvent(v.Db, errTable, failedEvent)
|
||||
}
|
||||
|
||||
func (v *View) latestFailedEvent(viewName string, sequence uint64) (*view.FailedEvent, error) {
|
||||
return view.LatestFailedEvent(v.Db, errTable, viewName, sequence)
|
||||
}
|
@@ -0,0 +1,56 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/view"
|
||||
"github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
global_view "github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
grantedProjectTable = "management.granted_projects"
|
||||
)
|
||||
|
||||
func (v *View) GrantedProjectByIDs(projectID, orgID string) (*model.GrantedProjectView, error) {
|
||||
return view.GrantedProjectByIDs(v.Db, grantedProjectTable, projectID, orgID)
|
||||
}
|
||||
|
||||
func (v *View) GrantedProjectsByID(projectID string) ([]*model.GrantedProjectView, error) {
|
||||
return view.GrantedProjectsByID(v.Db, grantedProjectTable, projectID)
|
||||
}
|
||||
|
||||
func (v *View) SearchGrantedProjects(request *proj_model.GrantedProjectSearchRequest) ([]*model.GrantedProjectView, int, error) {
|
||||
return view.SearchGrantedProjects(v.Db, grantedProjectTable, request)
|
||||
}
|
||||
|
||||
func (v *View) PutGrantedProject(project *model.GrantedProjectView) error {
|
||||
err := view.PutGrantedProject(v.Db, grantedProjectTable, project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return v.ProcessedGrantedProjectSequence(project.Sequence)
|
||||
}
|
||||
|
||||
func (v *View) DeleteGrantedProject(projectID, orgID string, eventSequence uint64) error {
|
||||
err := view.DeleteGrantedProject(v.Db, grantedProjectTable, projectID, orgID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v.ProcessedGrantedProjectSequence(eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestGrantedProjectSequence() (uint64, error) {
|
||||
return v.latestSequence(grantedProjectTable)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedGrantedProjectSequence(eventSequence uint64) error {
|
||||
return v.saveCurrentSequence(grantedProjectTable, eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestGrantedProjectFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
|
||||
return v.latestFailedEvent(grantedProjectTable, sequence)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedGrantedProjectFailedEvent(failedEvent *global_view.FailedEvent) error {
|
||||
return v.saveFailedEvent(failedEvent)
|
||||
}
|
@@ -0,0 +1,56 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/view"
|
||||
"github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
global_view "github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
projectGrantMemberTable = "management.project_grant_members"
|
||||
)
|
||||
|
||||
func (v *View) ProjectGrantMemberByIDs(projectID, userID string) (*model.ProjectGrantMemberView, error) {
|
||||
return view.ProjectGrantMemberByIDs(v.Db, projectGrantMemberTable, projectID, userID)
|
||||
}
|
||||
|
||||
func (v *View) SearchProjectGrantMembers(request *proj_model.ProjectGrantMemberSearchRequest) ([]*model.ProjectGrantMemberView, int, error) {
|
||||
return view.SearchProjectGrantMembers(v.Db, projectGrantMemberTable, request)
|
||||
}
|
||||
|
||||
func (v *View) ProjectGrantMembersByUserID(userID string) ([]*model.ProjectGrantMemberView, error) {
|
||||
return view.ProjectGrantMembersByUserID(v.Db, projectGrantMemberTable, userID)
|
||||
}
|
||||
|
||||
func (v *View) PutProjectGrantMember(project *model.ProjectGrantMemberView, sequence uint64) error {
|
||||
err := view.PutProjectGrantMember(v.Db, projectGrantMemberTable, project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return v.ProcessedProjectGrantMemberSequence(sequence)
|
||||
}
|
||||
|
||||
func (v *View) DeleteProjectGrantMember(projectID, userID string, eventSequence uint64) error {
|
||||
err := view.DeleteProjectGrantMember(v.Db, projectGrantMemberTable, projectID, userID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v.ProcessedProjectGrantMemberSequence(eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestProjectGrantMemberSequence() (uint64, error) {
|
||||
return v.latestSequence(projectGrantMemberTable)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedProjectGrantMemberSequence(eventSequence uint64) error {
|
||||
return v.saveCurrentSequence(projectGrantMemberTable, eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestProjectGrantMemberFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
|
||||
return v.latestFailedEvent(projectGrantMemberTable, sequence)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedProjectGrantMemberFailedEvent(failedEvent *global_view.FailedEvent) error {
|
||||
return v.saveFailedEvent(failedEvent)
|
||||
}
|
@@ -0,0 +1,56 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/view"
|
||||
"github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
global_view "github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
projectMemberTable = "management.project_members"
|
||||
)
|
||||
|
||||
func (v *View) ProjectMemberByIDs(projectID, userID string) (*model.ProjectMemberView, error) {
|
||||
return view.ProjectMemberByIDs(v.Db, projectMemberTable, projectID, userID)
|
||||
}
|
||||
|
||||
func (v *View) SearchProjectMembers(request *proj_model.ProjectMemberSearchRequest) ([]*model.ProjectMemberView, int, error) {
|
||||
return view.SearchProjectMembers(v.Db, projectMemberTable, request)
|
||||
}
|
||||
|
||||
func (v *View) ProjectMembersByUserID(userID string) ([]*model.ProjectMemberView, error) {
|
||||
return view.ProjectMembersByUserID(v.Db, projectMemberTable, userID)
|
||||
}
|
||||
|
||||
func (v *View) PutProjectMember(project *model.ProjectMemberView, sequence uint64) error {
|
||||
err := view.PutProjectMember(v.Db, projectMemberTable, project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return v.ProcessedProjectMemberSequence(sequence)
|
||||
}
|
||||
|
||||
func (v *View) DeleteProjectMember(projectID, userID string, eventSequence uint64) error {
|
||||
err := view.DeleteProjectMember(v.Db, projectMemberTable, projectID, userID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v.ProcessedProjectMemberSequence(eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestProjectMemberSequence() (uint64, error) {
|
||||
return v.latestSequence(projectMemberTable)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedProjectMemberSequence(eventSequence uint64) error {
|
||||
return v.saveCurrentSequence(projectMemberTable, eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestProjectMemberFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
|
||||
return v.latestFailedEvent(projectMemberTable, sequence)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedProjectMemberFailedEvent(failedEvent *global_view.FailedEvent) error {
|
||||
return v.saveFailedEvent(failedEvent)
|
||||
}
|
@@ -0,0 +1,60 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
proj_model "github.com/caos/zitadel/internal/project/model"
|
||||
"github.com/caos/zitadel/internal/project/repository/view"
|
||||
"github.com/caos/zitadel/internal/project/repository/view/model"
|
||||
global_view "github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
projectRoleTable = "management.project_roles"
|
||||
)
|
||||
|
||||
func (v *View) ProjectRoleByIDs(projectID, orgID, key string) (*model.ProjectRoleView, error) {
|
||||
return view.ProjectRoleByIDs(v.Db, projectRoleTable, projectID, orgID, key)
|
||||
}
|
||||
|
||||
func (v *View) ResourceOwnerProjectRolesByKey(projectID, resourceowner, key string) ([]*model.ProjectRoleView, error) {
|
||||
return view.ResourceOwnerProjectRolesByKey(v.Db, projectRoleTable, projectID, resourceowner, key)
|
||||
}
|
||||
|
||||
func (v *View) ResourceOwnerProjectRoles(projectID, resourceowner string) ([]*model.ProjectRoleView, error) {
|
||||
return view.ResourceOwnerProjectRoles(v.Db, projectRoleTable, projectID, resourceowner)
|
||||
}
|
||||
|
||||
func (v *View) SearchProjectRoles(request *proj_model.ProjectRoleSearchRequest) ([]*model.ProjectRoleView, int, error) {
|
||||
return view.SearchProjectRoles(v.Db, projectRoleTable, request)
|
||||
}
|
||||
|
||||
func (v *View) PutProjectRole(project *model.ProjectRoleView) error {
|
||||
err := view.PutProjectRole(v.Db, projectRoleTable, project)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return v.ProcessedProjectRoleSequence(project.Sequence)
|
||||
}
|
||||
|
||||
func (v *View) DeleteProjectRole(projectID, orgID, key string, eventSequence uint64) error {
|
||||
err := view.DeleteProjectRole(v.Db, projectRoleTable, projectID, orgID, key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return v.ProcessedProjectRoleSequence(eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestProjectRoleSequence() (uint64, error) {
|
||||
return v.latestSequence(projectRoleTable)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedProjectRoleSequence(eventSequence uint64) error {
|
||||
return v.saveCurrentSequence(projectRoleTable, eventSequence)
|
||||
}
|
||||
|
||||
func (v *View) GetLatestProjectRoleFailedEvent(sequence uint64) (*global_view.FailedEvent, error) {
|
||||
return v.latestFailedEvent(projectRoleTable, sequence)
|
||||
}
|
||||
|
||||
func (v *View) ProcessedProjectRoleFailedEvent(failedEvent *global_view.FailedEvent) error {
|
||||
return v.saveFailedEvent(failedEvent)
|
||||
}
|
@@ -0,0 +1,17 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
"github.com/caos/zitadel/internal/view"
|
||||
)
|
||||
|
||||
const (
|
||||
sequencesTable = "management.current_sequences"
|
||||
)
|
||||
|
||||
func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
|
||||
return view.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
|
||||
}
|
||||
|
||||
func (v *View) latestSequence(viewName string) (uint64, error) {
|
||||
return view.LatestSequence(v.Db, sequencesTable, viewName)
|
||||
}
|
25
internal/management/repository/eventsourcing/view/view.go
Normal file
25
internal/management/repository/eventsourcing/view/view.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package view
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
)
|
||||
|
||||
type View struct {
|
||||
Db *gorm.DB
|
||||
}
|
||||
|
||||
func StartView(sqlClient *sql.DB) (*View, error) {
|
||||
gorm, err := gorm.Open("postgres", sqlClient)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &View{
|
||||
Db: gorm,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (v *View) Health() (err error) {
|
||||
return v.Db.DB().Ping()
|
||||
}
|
Reference in New Issue
Block a user