feat: sequence and timestamp on searchrequests (#468)

* feat: reread events

* feat: sequence and timestamo on search requests

* feat: sequence and timestamo on search requests

* fix: better naming

* fix: log errors

* fix: read sequence before search request
This commit is contained in:
Fabi 2020-07-15 13:24:36 +02:00 committed by GitHub
parent 87155f8c9e
commit 423b86a03b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
102 changed files with 7389 additions and 6302 deletions

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"github.com/caos/logging"
admin_view "github.com/caos/zitadel/internal/admin/repository/eventsourcing/view"
"github.com/caos/zitadel/internal/config/systemdefaults"
iam_es_model "github.com/caos/zitadel/internal/iam/repository/view/model"
@ -44,16 +45,23 @@ func (repo *IamRepository) RemoveIamMember(ctx context.Context, userID string) e
func (repo *IamRepository) SearchIamMembers(ctx context.Context, request *iam_model.IamMemberSearchRequest) (*iam_model.IamMemberSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestIamMemberSequence()
logging.Log("EVENT-Slkci").OnError(err).Warn("could not read latest iam sequence")
members, count, err := repo.View.SearchIamMembers(request)
if err != nil {
return nil, err
}
return &iam_model.IamMemberSearchResponse{
result := &iam_model.IamMemberSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: iam_es_model.IamMembersToModel(members),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *IamRepository) GetIamMemberRoles() []string {

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"github.com/caos/logging"
admin_model "github.com/caos/zitadel/internal/admin/model"
admin_view "github.com/caos/zitadel/internal/admin/repository/eventsourcing/view"
@ -72,16 +73,23 @@ func (repo *OrgRepo) OrgByID(ctx context.Context, id string) (*org_model.Org, er
func (repo *OrgRepo) SearchOrgs(ctx context.Context, query *org_model.OrgSearchRequest) (*org_model.OrgSearchResult, error) {
query.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestOrgSequence()
logging.Log("EVENT-LXo9w").OnError(err).Warn("could not read latest iam sequence")
orgs, count, err := repo.View.SearchOrgs(query)
if err != nil {
return nil, err
}
return &org_model.OrgSearchResult{
result := &org_model.OrgSearchResult{
Offset: query.Offset,
Limit: query.Limit,
TotalResult: uint64(count),
Result: model.OrgsToModel(orgs),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *OrgRepo) IsOrgUnique(ctx context.Context, name, domain string) (isUnique bool, err error) {

View File

@ -37,7 +37,7 @@ func (m *IamMember) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(model.IamAggregate, usr_es_model.UserAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (m *IamMember) Reduce(event *models.Event) (err error) {

View File

@ -30,7 +30,7 @@ func (o *Org) EventQuery() (*es_models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.OrgQuery(sequence), nil
return eventsourcing.OrgQuery(sequence.CurrentSequence), nil
}
func (o *Org) Reduce(event *es_models.Event) error {

View File

@ -40,7 +40,7 @@ func (v *View) DeleteIamMember(iamID, userID string, eventSequence uint64) error
return v.ProcessedIamMemberSequence(eventSequence)
}
func (v *View) GetLatestIamMemberSequence() (uint64, error) {
func (v *View) GetLatestIamMemberSequence() (*global_view.CurrentSequence, error) {
return v.latestSequence(iamMemberTable)
}

View File

@ -35,7 +35,7 @@ func (v *View) ProcessedOrgFailedEvent(failedEvent *repository.FailedEvent) erro
return v.saveFailedEvent(failedEvent)
}
func (v *View) GetLatestOrgSequence() (uint64, error) {
func (v *View) GetLatestOrgSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(orgTable)
}

View File

@ -12,7 +12,7 @@ func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
}
func (v *View) latestSequence(viewName string) (uint64, error) {
func (v *View) latestSequence(viewName string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName)
}

View File

@ -1,8 +1,10 @@
package admin
import (
"github.com/caos/logging"
view_model "github.com/caos/zitadel/internal/view/model"
"github.com/caos/zitadel/pkg/grpc/admin"
"github.com/golang/protobuf/ptypes"
)
func viewsFromModel(views []*view_model.View) []*admin.View {
@ -24,10 +26,14 @@ func failedEventsFromModel(failedEvents []*view_model.FailedEvent) []*admin.Fail
}
func viewFromModel(view *view_model.View) *admin.View {
timestamp, err := ptypes.TimestampProto(view.CurrentTimestamp)
logging.Log("GRPC-KSo03").OnError(err).Debug("unable to parse timestamp")
return &admin.View{
Database: view.Database,
ViewName: view.ViewName,
Sequence: view.CurrentSequence,
Database: view.Database,
ViewName: view.ViewName,
ProcessedSequence: view.CurrentSequence,
ViewTimestamp: timestamp,
}
}

View File

@ -103,11 +103,15 @@ func iamMemberSearchMethodToModel(key admin.SearchMethod) model.SearchMethod {
}
func iamMemberSearchResponseFromModel(resp *iam_model.IamMemberSearchResponse) *admin.IamMemberSearchResponse {
timestamp, err := ptypes.TimestampProto(resp.Timestamp)
logging.Log("GRPC-5shu8").OnError(err).Debug("date parse failed")
return &admin.IamMemberSearchResponse{
Limit: resp.Limit,
Offset: resp.Offset,
TotalResult: resp.TotalResult,
Result: iamMembersFromView(resp.Result),
Limit: resp.Limit,
Offset: resp.Offset,
TotalResult: resp.TotalResult,
Result: iamMembersFromView(resp.Result),
ProcessedSequence: resp.Sequence,
ViewTimestamp: timestamp,
}
}
func iamMembersFromView(viewMembers []*iam_model.IamMemberView) []*admin.IamMemberView {

View File

@ -21,12 +21,7 @@ func (s *Server) SearchOrgs(ctx context.Context, request *admin.OrgSearchRequest
if err != nil {
return nil, err
}
return &admin.OrgSearchResponse{
Result: orgViewsFromModel(result.Result),
Limit: request.Limit,
Offset: request.Offset,
TotalResult: result.TotalResult,
}, nil
return orgSearchResponseFromModel(result), nil
}
func (s *Server) IsOrgUnique(ctx context.Context, request *admin.UniqueOrgRequest) (org *admin.UniqueOrgResponse, err error) {

View File

@ -72,6 +72,19 @@ func setUpOrgResponseFromModel(setUp *admin_model.SetupOrg) *admin.OrgSetUpRespo
}
}
func orgSearchResponseFromModel(request *org_model.OrgSearchResult) *admin.OrgSearchResponse {
timestamp, err := ptypes.TimestampProto(request.Timestamp)
logging.Log("GRPC-shu7s").OnError(err).Debug("unable to get timestamp from time")
return &admin.OrgSearchResponse{
Result: orgViewsFromModel(request.Result),
Limit: request.Limit,
Offset: request.Offset,
TotalResult: request.TotalResult,
ProcessedSequence: request.Sequence,
ViewTimestamp: timestamp,
}
}
func orgViewsFromModel(orgs []*org_model.OrgView) []*admin.Org {
result := make([]*admin.Org, len(orgs))
for i, org := range orgs {

View File

@ -1,8 +1,10 @@
package auth
import (
"github.com/caos/logging"
grant_model "github.com/caos/zitadel/internal/usergrant/model"
"github.com/caos/zitadel/pkg/grpc/auth"
"github.com/golang/protobuf/ptypes"
)
func userGrantSearchRequestsToModel(request *auth.UserGrantSearchRequest) *grant_model.UserGrantSearchRequest {
@ -76,11 +78,16 @@ func myProjectOrgSearchKeyToModel(key auth.MyProjectOrgSearchKey) grant_model.Us
}
func userGrantSearchResponseFromModel(response *grant_model.UserGrantSearchResponse) *auth.UserGrantSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-Lsp0d").OnError(err).Debug("unable to parse timestamp")
return &auth.UserGrantSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: userGrantViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: userGrantViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}

View File

@ -146,11 +146,15 @@ func applicationSearchKeyToModel(key management.ApplicationSearchKey) proj_model
}
func applicationSearchResponseFromModel(response *proj_model.ApplicationSearchResponse) *management.ApplicationSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-Lp06f").OnError(err).Debug("unable to parse timestamp")
return &management.ApplicationSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: applicationViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: applicationViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}

View File

@ -138,11 +138,15 @@ func orgDomainSearchKeyToModel(key management.OrgDomainSearchKey) org_model.OrgD
}
func orgDomainSearchResponseFromModel(resp *org_model.OrgDomainSearchResponse) *management.OrgDomainSearchResponse {
timestamp, err := ptypes.TimestampProto(resp.Timestamp)
logging.Log("GRPC-Mxi9w").OnError(err).Debug("unable to get timestamp from time")
return &management.OrgDomainSearchResponse{
Limit: resp.Limit,
Offset: resp.Offset,
TotalResult: resp.TotalResult,
Result: orgDomainsFromModel(resp.Result),
Limit: resp.Limit,
Offset: resp.Offset,
TotalResult: resp.TotalResult,
Result: orgDomainsFromModel(resp.Result),
ProcessedSequence: resp.Sequence,
ViewTimestamp: timestamp,
}
}
func orgDomainsFromModel(viewDomains []*org_model.OrgDomainView) []*management.OrgDomainView {

View File

@ -104,11 +104,15 @@ func orgMemberSearchMethodToModel(key management.SearchMethod) model.SearchMetho
}
func orgMemberSearchResponseFromModel(resp *org_model.OrgMemberSearchResponse) *management.OrgMemberSearchResponse {
timestamp, err := ptypes.TimestampProto(resp.Timestamp)
logging.Log("GRPC-Swmr6").OnError(err).Debug("date parse failed")
return &management.OrgMemberSearchResponse{
Limit: resp.Limit,
Offset: resp.Offset,
TotalResult: resp.TotalResult,
Result: orgMembersFromView(resp.Result),
Limit: resp.Limit,
Offset: resp.Offset,
TotalResult: resp.TotalResult,
Result: orgMembersFromView(resp.Result),
ProcessedSequence: resp.Sequence,
ViewTimestamp: timestamp,
}
}
func orgMembersFromView(viewMembers []*org_model.OrgMemberView) []*management.OrgMemberView {

View File

@ -105,8 +105,7 @@ func (s *Server) RemoveProjectRole(ctx context.Context, in *management.ProjectRo
func (s *Server) SearchProjectRoles(ctx context.Context, in *management.ProjectRoleSearchRequest) (*management.ProjectRoleSearchResponse, error) {
request := projectRoleSearchRequestsToModel(in)
request.AppendMyOrgQuery(authz.GetCtxData(ctx).OrgID)
request.AppendProjectQuery(in.ProjectId)
response, err := s.project.SearchProjectRoles(ctx, request)
response, err := s.project.SearchProjectRoles(ctx, in.ProjectId, request)
if err != nil {
return nil, err
}

View File

@ -32,11 +32,15 @@ func projectFromModel(project *proj_model.Project) *management.Project {
}
func projectSearchResponseFromModel(response *proj_model.ProjectViewSearchResponse) *management.ProjectSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-iejs3").OnError(err).Debug("unable to parse timestamp")
return &management.ProjectSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}
@ -67,11 +71,16 @@ func projectViewFromModel(project *proj_model.ProjectView) *management.ProjectVi
}
func projectRoleSearchResponseFromModel(response *proj_model.ProjectRoleSearchResponse) *management.ProjectRoleSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-Lps0c").OnError(err).Debug("unable to parse timestamp")
return &management.ProjectRoleSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectRoleViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectRoleViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}

View File

@ -91,11 +91,15 @@ func projectGrantViewSearchKeyToModel(key management.ProjectGrantSearchKey) proj
}
func projectGrantSearchResponseFromModel(response *proj_model.ProjectGrantViewSearchResponse) *management.ProjectGrantSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-MCjs7").OnError(err).Debug("unable to parse timestamp")
return &management.ProjectGrantSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectGrantsFromGrantedProjectModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectGrantsFromGrantedProjectModel(response.Result),
ViewTimestamp: timestamp,
ProcessedSequence: response.Sequence,
}
}

View File

@ -87,11 +87,15 @@ func projectGrantMemberSearchKeyToModel(key management.ProjectGrantMemberSearchK
}
func projectGrantMemberSearchResponseFromModel(response *proj_model.ProjectGrantMemberSearchResponse) *management.ProjectGrantMemberSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-MSn6g").OnError(err).Debug("unable to parse timestamp")
return &management.ProjectGrantMemberSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectGrantMemberViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectGrantMemberViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}

View File

@ -45,11 +45,11 @@ func projectMemberChangeToModel(member *management.ProjectMemberChange) *proj_mo
}
}
func projectMemberSearchRequestsToModel(role *management.ProjectMemberSearchRequest) *proj_model.ProjectMemberSearchRequest {
func projectMemberSearchRequestsToModel(member *management.ProjectMemberSearchRequest) *proj_model.ProjectMemberSearchRequest {
return &proj_model.ProjectMemberSearchRequest{
Offset: role.Offset,
Limit: role.Limit,
Queries: projectMemberSearchQueriesToModel(role.Queries),
Offset: member.Offset,
Limit: member.Limit,
Queries: projectMemberSearchQueriesToModel(member.Queries),
}
}
@ -85,11 +85,15 @@ func projectMemberSearchKeyToModel(key management.ProjectMemberSearchKey) proj_m
}
func projectMemberSearchResponseFromModel(response *proj_model.ProjectMemberSearchResponse) *management.ProjectMemberSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-LSo9j").OnError(err).Debug("unable to parse timestamp")
return &management.ProjectMemberSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectMemberViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: projectMemberViewsFromModel(response.Result),
ViewTimestamp: timestamp,
ProcessedSequence: response.Sequence,
}
}

View File

@ -335,11 +335,15 @@ func updateAddressToModel(address *management.UpdateUserAddressRequest) *usr_mod
}
func userSearchResponseFromModel(response *usr_model.UserSearchResponse) *management.UserSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-aBezr").OnError(err).Debug("unable to parse timestamp")
return &management.UserSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: userViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: userViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}

View File

@ -131,11 +131,15 @@ func userGrantSearchKeyToModel(key management.UserGrantSearchKey) grant_model.Us
}
func userGrantSearchResponseFromModel(response *grant_model.UserGrantSearchResponse) *management.UserGrantSearchResponse {
timestamp, err := ptypes.TimestampProto(response.Timestamp)
logging.Log("GRPC-Wd7hs").OnError(err).Debug("unable to parse timestamp")
return &management.UserGrantSearchResponse{
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: userGrantViewsFromModel(response.Result),
Offset: response.Offset,
Limit: response.Limit,
TotalResult: response.TotalResult,
Result: userGrantViewsFromModel(response.Result),
ProcessedSequence: response.Sequence,
ViewTimestamp: timestamp,
}
}

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"github.com/caos/logging"
auth_view "github.com/caos/zitadel/internal/auth/repository/eventsourcing/view"
org_model "github.com/caos/zitadel/internal/org/model"
org_es "github.com/caos/zitadel/internal/org/repository/eventsourcing"
@ -16,14 +17,21 @@ type OrgRepository struct {
func (repo *OrgRepository) SearchOrgs(ctx context.Context, request *org_model.OrgSearchRequest) (*org_model.OrgSearchResult, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestOrgSequence()
logging.Log("EVENT-7Udhz").OnError(err).Warn("could not read latest org sequence")
members, count, err := repo.View.SearchOrgs(request)
if err != nil {
return nil, err
}
return &org_model.OrgSearchResult{
result := &org_model.OrgSearchResult{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.OrgsToModel(members),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/api/authz"
"github.com/caos/zitadel/internal/auth/repository/eventsourcing/view"
@ -24,17 +25,24 @@ type UserGrantRepo struct {
func (repo *UserGrantRepo) SearchMyUserGrants(ctx context.Context, request *grant_model.UserGrantSearchRequest) (*grant_model.UserGrantSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestUserGrantSequence()
logging.Log("EVENT-Hd7s3").OnError(err).Warn("could not read latest user grant sequence")
request.Queries = append(request.Queries, &grant_model.UserGrantSearchQuery{Key: grant_model.UserGrantSearchKeyUserID, Method: global_model.SearchMethodEquals, Value: authz.GetCtxData(ctx).UserID})
grants, count, err := repo.View.SearchUserGrants(request)
if err != nil {
return nil, err
}
return &grant_model.UserGrantSearchResponse{
result := &grant_model.UserGrantSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.UserGrantsToModel(grants),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *UserGrantRepo) SearchMyProjectOrgs(ctx context.Context, request *grant_model.UserGrantSearchRequest) (*grant_model.ProjectOrgSearchResponse, error) {

View File

@ -31,7 +31,7 @@ func (p *Application) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.ProjectQuery(sequence), nil
return eventsourcing.ProjectQuery(sequence.CurrentSequence), nil
}
func (p *Application) Reduce(event *models.Event) (err error) {

View File

@ -32,7 +32,7 @@ func (k *Key) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.KeyPairQuery(sequence), nil
return eventsourcing.KeyPairQuery(sequence.CurrentSequence), nil
}
func (k *Key) Reduce(event *models.Event) error {

View File

@ -29,7 +29,7 @@ func (o *Org) EventQuery() (*es_models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.OrgQuery(sequence), nil
return eventsourcing.OrgQuery(sequence.CurrentSequence), nil
}
func (o *Org) Reduce(event *es_models.Event) error {

View File

@ -41,7 +41,7 @@ func (u *Token) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(user_es_model.UserAggregate, project_es_model.ProjectAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *Token) Reduce(event *models.Event) (err error) {

View File

@ -41,7 +41,7 @@ func (p *User) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(es_model.UserAggregate, org_es_model.OrgAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *User) Reduce(event *models.Event) (err error) {

View File

@ -61,7 +61,7 @@ func (u *UserGrant) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(grant_es_model.UserGrantAggregate, iam_es_model.IamAggregate, org_es_model.OrgAggregate, usr_es_model.UserAggregate, proj_es_model.ProjectAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *UserGrant) Reduce(event *models.Event) (err error) {

View File

@ -36,7 +36,7 @@ func (u *UserSession) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.UserQuery(sequence), nil
return eventsourcing.UserQuery(sequence.CurrentSequence), nil
}
func (u *UserSession) Reduce(event *models.Event) (err error) {

View File

@ -2,13 +2,12 @@ package view
import (
"context"
"github.com/caos/zitadel/internal/view/repository"
"github.com/caos/zitadel/internal/errors"
global_model "github.com/caos/zitadel/internal/model"
proj_model "github.com/caos/zitadel/internal/project/model"
"github.com/caos/zitadel/internal/project/repository/view"
"github.com/caos/zitadel/internal/project/repository/view/model"
"github.com/caos/zitadel/internal/view/repository"
)
const (
@ -39,7 +38,7 @@ func (v *View) DeleteApplication(appID string, eventSequence uint64) error {
return v.ProcessedApplicationSequence(eventSequence)
}
func (v *View) GetLatestApplicationSequence() (uint64, error) {
func (v *View) GetLatestApplicationSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(applicationTable)
}

View File

@ -55,7 +55,7 @@ func (v *View) DeleteKeyPair(keyID string, eventSequence uint64) error {
return v.ProcessedKeySequence(eventSequence)
}
func (v *View) GetLatestKeySequence() (uint64, error) {
func (v *View) GetLatestKeySequence() (*repository.CurrentSequence, error) {
return v.latestSequence(keyTable)
}

View File

@ -35,7 +35,7 @@ func (v *View) ProcessedOrgFailedEvent(failedEvent *repository.FailedEvent) erro
return v.saveFailedEvent(failedEvent)
}
func (v *View) GetLatestOrgSequence() (uint64, error) {
func (v *View) GetLatestOrgSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(orgTable)
}

View File

@ -12,6 +12,6 @@ func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
}
func (v *View) latestSequence(viewName string) (uint64, error) {
func (v *View) latestSequence(viewName string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName)
}

View File

@ -82,7 +82,7 @@ func (v *View) DeleteApplicationTokens(eventSequence uint64, ids ...string) erro
return v.ProcessedTokenSequence(eventSequence)
}
func (v *View) GetLatestTokenSequence() (uint64, error) {
func (v *View) GetLatestTokenSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(tokenTable)
}

View File

@ -58,7 +58,7 @@ func (v *View) DeleteUser(userID string, eventSequence uint64) error {
return v.ProcessedUserSequence(eventSequence)
}
func (v *View) GetLatestUserSequence() (uint64, error) {
func (v *View) GetLatestUserSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(userTable)
}

View File

@ -51,7 +51,7 @@ func (v *View) DeleteUserGrant(grantID string, eventSequence uint64) error {
return v.ProcessedUserGrantSequence(eventSequence)
}
func (v *View) GetLatestUserGrantSequence() (uint64, error) {
func (v *View) GetLatestUserGrantSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(userGrantTable)
}

View File

@ -38,7 +38,7 @@ func (v *View) DeleteUserSessions(userID string, eventSequence uint64) error {
return v.ProcessedUserSessionSequence(eventSequence)
}
func (v *View) GetLatestUserSessionSequence() (uint64, error) {
func (v *View) GetLatestUserSessionSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(userSessionTable)
}

View File

@ -29,7 +29,7 @@ func (p *Application) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.ProjectQuery(sequence), nil
return eventsourcing.ProjectQuery(sequence.CurrentSequence), nil
}
func (p *Application) Reduce(event *models.Event) (err error) {

View File

@ -50,7 +50,7 @@ func (u *UserGrant) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(iam_es_model.IamAggregate, org_es_model.OrgAggregate, proj_es_model.ProjectAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *UserGrant) Reduce(event *models.Event) (err error) {

View File

@ -43,7 +43,7 @@ func (v *View) DeleteApplication(appID string, eventSequence uint64) error {
return v.ProcessedApplicationSequence(eventSequence)
}
func (v *View) GetLatestApplicationSequence() (uint64, error) {
func (v *View) GetLatestApplicationSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(applicationTable)
}

View File

@ -12,6 +12,6 @@ func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
}
func (v *View) latestSequence(viewName string) (uint64, error) {
func (v *View) latestSequence(viewName string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName)
}

View File

@ -42,7 +42,7 @@ func (v *View) DeleteSessionTokens(agentID, userID string, eventSequence uint64)
return v.ProcessedTokenSequence(eventSequence)
}
func (v *View) GetLatestTokenSequence() (uint64, error) {
func (v *View) GetLatestTokenSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(tokenTable)
}

View File

@ -47,7 +47,7 @@ func (v *View) DeleteUserGrant(grantID string, eventSequence uint64) error {
return v.ProcessedUserGrantSequence(eventSequence)
}
func (v *View) GetLatestUserGrantSequence() (uint64, error) {
func (v *View) GetLatestUserGrantSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(userGrantTable)
}

View File

@ -49,6 +49,8 @@ type IamMemberSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*IamMemberView
Sequence uint64
Timestamp time.Time
}
func (r *IamMemberSearchRequest) EnsureLimit(limit uint64) {

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"github.com/caos/logging"
"strings"
"github.com/caos/zitadel/internal/api/authz"
@ -57,16 +58,23 @@ func (repo *OrgRepository) GetMyOrgIamPolicy(ctx context.Context) (*org_model.Or
func (repo *OrgRepository) SearchMyOrgDomains(ctx context.Context, request *org_model.OrgDomainSearchRequest) (*org_model.OrgDomainSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
request.Queries = append(request.Queries, &org_model.OrgDomainSearchQuery{Key: org_model.OrgDomainSearchKeyOrgID, Method: global_model.SearchMethodEquals, Value: authz.GetCtxData(ctx).OrgID})
sequence, err := repo.View.GetLatestOrgDomainSequence()
logging.Log("EVENT-SLowp").OnError(err).Warn("could not read latest org domain sequence")
domains, count, err := repo.View.SearchOrgDomains(request)
if err != nil {
return nil, err
}
return &org_model.OrgDomainSearchResponse{
result := &org_model.OrgDomainSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.OrgDomainsToModel(domains),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *OrgRepository) AddMyOrgDomain(ctx context.Context, domain *org_model.OrgDomain) (*org_model.OrgDomain, error) {
@ -120,16 +128,23 @@ func (repo *OrgRepository) RemoveMyOrgMember(ctx context.Context, userID string)
func (repo *OrgRepository) SearchMyOrgMembers(ctx context.Context, request *org_model.OrgMemberSearchRequest) (*org_model.OrgMemberSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
request.Queries[len(request.Queries)-1] = &org_model.OrgMemberSearchQuery{Key: org_model.OrgMemberSearchKeyOrgID, Method: global_model.SearchMethodEquals, Value: authz.GetCtxData(ctx).OrgID}
sequence, err := repo.View.GetLatestOrgMemberSequence()
logging.Log("EVENT-Smu3d").OnError(err).Warn("could not read latest org member sequence")
members, count, err := repo.View.SearchOrgMembers(request)
if err != nil {
return nil, err
}
return &org_model.OrgMemberSearchResponse{
result := &org_model.OrgMemberSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.OrgMembersToModel(members),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *OrgRepository) GetOrgMemberRoles() []string {

View File

@ -89,16 +89,23 @@ func (repo *ProjectRepo) SearchProjects(ctx context.Context, request *proj_model
request.Queries = append(request.Queries, &proj_model.ProjectViewSearchQuery{Key: proj_model.ProjectViewSearchKeyProjectID, Method: global_model.SearchMethodIsOneOf, Value: ids})
}
sequence, err := repo.View.GetLatestProjectSequence()
logging.Log("EVENT-Edc56").OnError(err).Warn("could not read latest project sequence")
projects, count, err := repo.View.SearchProjects(request)
if err != nil {
return nil, err
}
return &proj_model.ProjectViewSearchResponse{
result := &proj_model.ProjectViewSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.ProjectsToModel(projects),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *ProjectRepo) ProjectGrantViewByID(ctx context.Context, grantID string) (project *proj_model.ProjectGrantView, err error) {
@ -132,16 +139,23 @@ func (repo *ProjectRepo) RemoveProjectMember(ctx context.Context, projectID, use
func (repo *ProjectRepo) SearchProjectMembers(ctx context.Context, request *proj_model.ProjectMemberSearchRequest) (*proj_model.ProjectMemberSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestProjectMemberSequence()
logging.Log("EVENT-3dgt6").OnError(err).Warn("could not read latest project member sequence")
members, count, err := repo.View.SearchProjectMembers(request)
if err != nil {
return nil, err
}
return &proj_model.ProjectMemberSearchResponse{
result := &proj_model.ProjectMemberSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.ProjectMembersToModel(members),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *ProjectRepo) AddProjectRole(ctx context.Context, role *proj_model.ProjectRole) (*proj_model.ProjectRole, error) {
@ -194,18 +208,27 @@ func (repo *ProjectRepo) RemoveProjectRole(ctx context.Context, projectID, key s
return nil
}
func (repo *ProjectRepo) SearchProjectRoles(ctx context.Context, request *proj_model.ProjectRoleSearchRequest) (*proj_model.ProjectRoleSearchResponse, error) {
func (repo *ProjectRepo) SearchProjectRoles(ctx context.Context, projectID string, request *proj_model.ProjectRoleSearchRequest) (*proj_model.ProjectRoleSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
request.AppendProjectQuery(projectID)
sequence, err := repo.View.GetLatestProjectRoleSequence()
logging.Log("LSp0d-47suf").OnError(err).Warn("could not read latest project role sequence")
roles, count, err := repo.View.SearchProjectRoles(request)
if err != nil {
return nil, err
}
return &proj_model.ProjectRoleSearchResponse{
result := &proj_model.ProjectRoleSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.ProjectRolesToModel(roles),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *ProjectRepo) ProjectChanges(ctx context.Context, id string, lastSequence uint64, limit uint64, sortAscending bool) (*proj_model.ProjectChanges, error) {
@ -254,16 +277,23 @@ func (repo *ProjectRepo) RemoveApplication(ctx context.Context, projectID, appID
func (repo *ProjectRepo) SearchApplications(ctx context.Context, request *proj_model.ApplicationSearchRequest) (*proj_model.ApplicationSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestApplicationSequence()
logging.Log("EVENT-SKe8s").OnError(err).Warn("could not read latest application sequence")
apps, count, err := repo.View.SearchApplications(request)
if err != nil {
return nil, err
}
return &proj_model.ApplicationSearchResponse{
result := &proj_model.ApplicationSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.ApplicationViewsToModel(apps),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *ProjectRepo) ApplicationChanges(ctx context.Context, id string, appId string, lastSequence uint64, limit uint64, sortAscending bool) (*proj_model.ApplicationChanges, error) {
@ -299,16 +329,23 @@ func (repo *ProjectRepo) ProjectGrantByID(ctx context.Context, grantID string) (
func (repo *ProjectRepo) SearchProjectGrants(ctx context.Context, request *proj_model.ProjectGrantViewSearchRequest) (*proj_model.ProjectGrantViewSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestProjectGrantSequence()
logging.Log("EVENT-Skw9f").OnError(err).Warn("could not read latest project grant sequence")
projects, count, err := repo.View.SearchProjectGrants(request)
if err != nil {
return nil, err
}
return &proj_model.ProjectGrantViewSearchResponse{
result := &proj_model.ProjectGrantViewSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.ProjectGrantsToModel(projects),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *ProjectRepo) AddProjectGrant(ctx context.Context, grant *proj_model.ProjectGrant) (*proj_model.ProjectGrant, error) {
@ -430,16 +467,23 @@ func (repo *ProjectRepo) RemoveProjectGrantMember(ctx context.Context, projectID
func (repo *ProjectRepo) SearchProjectGrantMembers(ctx context.Context, request *proj_model.ProjectGrantMemberSearchRequest) (*proj_model.ProjectGrantMemberSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestProjectGrantMemberSequence()
logging.Log("EVENT-Du8sk").OnError(err).Warn("could not read latest project grant sequence")
members, count, err := repo.View.SearchProjectGrantMembers(request)
if err != nil {
return nil, err
}
return &proj_model.ProjectGrantMemberSearchResponse{
result := &proj_model.ProjectGrantMemberSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.ProjectGrantMembersToModel(members),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *ProjectRepo) GetProjectMemberRoles() []string {

View File

@ -87,16 +87,23 @@ func (repo *UserRepo) UnlockUser(ctx context.Context, id string) (*usr_model.Use
func (repo *UserRepo) SearchUsers(ctx context.Context, request *usr_model.UserSearchRequest) (*usr_model.UserSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestUserSequence()
logging.Log("EVENT-Lcn7d").OnError(err).Warn("could not read latest user sequence")
projects, count, err := repo.View.SearchUsers(request)
if err != nil {
return nil, err
}
return &usr_model.UserSearchResponse{
result := &usr_model.UserSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.UsersToModel(projects),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}
func (repo *UserRepo) UserChanges(ctx context.Context, id string, lastSequence uint64, limit uint64, sortAscending bool) (*usr_model.UserChanges, error) {

View File

@ -2,6 +2,7 @@ package eventstore
import (
"context"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/management/repository/eventsourcing/view"
grant_model "github.com/caos/zitadel/internal/usergrant/model"
grant_event "github.com/caos/zitadel/internal/usergrant/repository/eventsourcing"
@ -56,14 +57,21 @@ func (repo *UserGrantRepo) BulkRemoveUserGrant(ctx context.Context, grantIDs ...
func (repo *UserGrantRepo) SearchUserGrants(ctx context.Context, request *grant_model.UserGrantSearchRequest) (*grant_model.UserGrantSearchResponse, error) {
request.EnsureLimit(repo.SearchLimit)
sequence, err := repo.View.GetLatestUserGrantSequence()
logging.Log("EVENT-5Viwf").OnError(err).Warn("could not read latest user grant sequence")
grants, count, err := repo.View.SearchUserGrants(request)
if err != nil {
return nil, err
}
return &grant_model.UserGrantSearchResponse{
result := &grant_model.UserGrantSearchResponse{
Offset: request.Offset,
Limit: request.Limit,
TotalResult: uint64(count),
Result: model.UserGrantsToModel(grants),
}, nil
}
if err == nil {
result.Sequence = sequence.CurrentSequence
result.Timestamp = sequence.CurrentTimestamp
}
return result, nil
}

View File

@ -31,7 +31,7 @@ func (p *Application) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.ProjectQuery(sequence), nil
return eventsourcing.ProjectQuery(sequence.CurrentSequence), nil
}
func (p *Application) Reduce(event *models.Event) (err error) {

View File

@ -30,7 +30,7 @@ func (o *Org) EventQuery() (*es_models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.OrgQuery(sequence), nil
return eventsourcing.OrgQuery(sequence.CurrentSequence), nil
}
func (o *Org) Reduce(event *es_models.Event) error {

View File

@ -32,7 +32,7 @@ func (d *OrgDomain) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(model.OrgAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (d *OrgDomain) Reduce(event *models.Event) (err error) {

View File

@ -37,7 +37,7 @@ func (m *OrgMember) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(model.OrgAggregate, usr_es_model.UserAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (m *OrgMember) Reduce(event *models.Event) (err error) {

View File

@ -33,7 +33,7 @@ func (p *Project) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return proj_event.ProjectQuery(sequence), nil
return proj_event.ProjectQuery(sequence.CurrentSequence), nil
}
func (p *Project) Reduce(event *models.Event) (err error) {

View File

@ -39,7 +39,7 @@ func (p *ProjectGrant) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return proj_event.ProjectQuery(sequence), nil
return proj_event.ProjectQuery(sequence.CurrentSequence), nil
}
func (p *ProjectGrant) Reduce(event *models.Event) (err error) {
@ -128,4 +128,3 @@ func (p *ProjectGrant) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-is8wa", "id", event.AggregateID).WithError(err).Warn("something went wrong in granted projecthandler")
return spooler.HandleError(event, err, p.view.GetLatestProjectGrantFailedEvent, p.view.ProcessedProjectGrantFailedEvent, p.view.ProcessedProjectGrantSequence, p.errorCountUntilSkip)
}

View File

@ -36,7 +36,7 @@ func (p *ProjectGrantMember) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(proj_es_model.ProjectAggregate, usr_es_model.UserAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (p *ProjectGrantMember) Reduce(event *models.Event) (err error) {

View File

@ -36,7 +36,7 @@ func (p *ProjectMember) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(proj_es_model.ProjectAggregate, usr_es_model.UserAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (p *ProjectMember) Reduce(event *models.Event) (err error) {

View File

@ -31,7 +31,7 @@ func (p *ProjectRole) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.ProjectQuery(sequence), nil
return eventsourcing.ProjectQuery(sequence.CurrentSequence), nil
}
func (p *ProjectRole) Reduce(event *models.Event) (err error) {

View File

@ -41,7 +41,7 @@ func (p *User) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(es_model.UserAggregate, org_es_model.OrgAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *User) Reduce(event *models.Event) (err error) {

View File

@ -48,7 +48,7 @@ func (u *UserGrant) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(grant_es_model.UserGrantAggregate, usr_es_model.UserAggregate, proj_es_model.ProjectAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *UserGrant) Reduce(event *models.Event) (err error) {

View File

@ -35,7 +35,7 @@ func (v *View) DeleteApplication(appID string, eventSequence uint64) error {
return v.ProcessedApplicationSequence(eventSequence)
}
func (v *View) GetLatestApplicationSequence() (uint64, error) {
func (v *View) GetLatestApplicationSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(applicationTable)
}

View File

@ -30,7 +30,7 @@ func (v *View) ProcessedOrgFailedEvent(failedEvent *repository.FailedEvent) erro
return v.saveFailedEvent(failedEvent)
}
func (v *View) GetLatestOrgSequence() (uint64, error) {
func (v *View) GetLatestOrgSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(orgTable)
}

View File

@ -46,7 +46,7 @@ func (v *View) DeleteOrgDomain(orgID, domain string, eventSequence uint64) error
return v.ProcessedOrgDomainSequence(eventSequence)
}
func (v *View) GetLatestOrgDomainSequence() (uint64, error) {
func (v *View) GetLatestOrgDomainSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(orgDomainTable)
}

View File

@ -39,7 +39,7 @@ func (v *View) DeleteOrgMember(orgID, userID string, eventSequence uint64) error
return v.ProcessedOrgMemberSequence(eventSequence)
}
func (v *View) GetLatestOrgMemberSequence() (uint64, error) {
func (v *View) GetLatestOrgMemberSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(orgMemberTable)
}

View File

@ -35,7 +35,7 @@ func (v *View) DeleteProject(projectID string, eventSequence uint64) error {
return v.ProcessedProjectSequence(eventSequence)
}
func (v *View) GetLatestProjectSequence() (uint64, error) {
func (v *View) GetLatestProjectSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(projectTable)
}

View File

@ -47,7 +47,7 @@ func (v *View) DeleteProjectGrant(grantID string, eventSequence uint64) error {
return v.ProcessedProjectGrantSequence(eventSequence)
}
func (v *View) GetLatestProjectGrantSequence() (uint64, error) {
func (v *View) GetLatestProjectGrantSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(grantedProjectTable)
}

View File

@ -39,7 +39,7 @@ func (v *View) DeleteProjectGrantMember(grantID, userID string, eventSequence ui
return v.ProcessedProjectGrantMemberSequence(eventSequence)
}
func (v *View) GetLatestProjectGrantMemberSequence() (uint64, error) {
func (v *View) GetLatestProjectGrantMemberSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(projectGrantMemberTable)
}

View File

@ -39,7 +39,7 @@ func (v *View) DeleteProjectMember(projectID, userID string, eventSequence uint6
return v.ProcessedProjectMemberSequence(eventSequence)
}
func (v *View) GetLatestProjectMemberSequence() (uint64, error) {
func (v *View) GetLatestProjectMemberSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(projectMemberTable)
}

View File

@ -43,7 +43,7 @@ func (v *View) DeleteProjectRole(projectID, orgID, key string, eventSequence uin
return v.ProcessedProjectRoleSequence(eventSequence)
}
func (v *View) GetLatestProjectRoleSequence() (uint64, error) {
func (v *View) GetLatestProjectRoleSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(projectRoleTable)
}

View File

@ -12,6 +12,6 @@ func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
}
func (v *View) latestSequence(viewName string) (uint64, error) {
func (v *View) latestSequence(viewName string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName)
}

View File

@ -54,7 +54,7 @@ func (v *View) DeleteUser(userID string, eventSequence uint64) error {
return v.ProcessedUserSequence(eventSequence)
}
func (v *View) GetLatestUserSequence() (uint64, error) {
func (v *View) GetLatestUserSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(userTable)
}

View File

@ -55,7 +55,7 @@ func (v *View) DeleteUserGrant(grantID string, eventSequence uint64) error {
return v.ProcessedUserGrantSequence(eventSequence)
}
func (v *View) GetLatestUserGrantSequence() (uint64, error) {
func (v *View) GetLatestUserGrantSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(userGrantTable)
}

View File

@ -26,7 +26,7 @@ type ProjectRepository interface {
AddProjectRole(ctx context.Context, role *model.ProjectRole) (*model.ProjectRole, error)
ChangeProjectRole(ctx context.Context, role *model.ProjectRole) (*model.ProjectRole, error)
RemoveProjectRole(ctx context.Context, projectID, key string) error
SearchProjectRoles(ctx context.Context, request *model.ProjectRoleSearchRequest) (*model.ProjectRoleSearchResponse, error)
SearchProjectRoles(ctx context.Context, projectId string, request *model.ProjectRoleSearchRequest) (*model.ProjectRoleSearchResponse, error)
ProjectChanges(ctx context.Context, id string, lastSequence uint64, limit uint64, sortAscending bool) (*model.ProjectChanges, error)
BulkAddProjectRole(ctx context.Context, role []*model.ProjectRole) error

View File

@ -46,7 +46,7 @@ func (n *Notification) EventQuery() (*models.SearchQuery, error) {
if err != nil {
return nil, err
}
return eventsourcing.UserQuery(sequence), nil
return eventsourcing.UserQuery(sequence.CurrentSequence), nil
}
func (n *Notification) Reduce(event *models.Event) (err error) {

View File

@ -40,7 +40,7 @@ func (p *NotifyUser) EventQuery() (*models.SearchQuery, error) {
}
return es_models.NewSearchQuery().
AggregateTypeFilter(es_model.UserAggregate, org_es_model.OrgAggregate).
LatestSequenceFilter(sequence), nil
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (u *NotifyUser) Reduce(event *models.Event) (err error) {

View File

@ -8,7 +8,7 @@ const (
notificationTable = "notification.notifications"
)
func (v *View) GetLatestNotificationSequence() (uint64, error) {
func (v *View) GetLatestNotificationSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(notificationTable)
}

View File

@ -37,7 +37,7 @@ func (v *View) DeleteNotifyUser(userID string, eventSequence uint64) error {
return v.ProcessedNotifyUserSequence(eventSequence)
}
func (v *View) GetLatestNotifyUserSequence() (uint64, error) {
func (v *View) GetLatestNotifyUserSequence() (*repository.CurrentSequence, error) {
return v.latestSequence(notifyUserTable)
}

View File

@ -12,6 +12,6 @@ func (v *View) saveCurrentSequence(viewName string, sequence uint64) error {
return repository.SaveCurrentSequence(v.Db, sequencesTable, viewName, sequence)
}
func (v *View) latestSequence(viewName string) (uint64, error) {
func (v *View) latestSequence(viewName string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName)
}

View File

@ -43,6 +43,8 @@ type OrgDomainSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*OrgDomainView
Sequence uint64
Timestamp time.Time
}
func (r *OrgDomainSearchRequest) EnsureLimit(limit uint64) {

View File

@ -50,6 +50,8 @@ type OrgMemberSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*OrgMemberView
Sequence uint64
Timestamp time.Time
}
func (r *OrgMemberSearchRequest) EnsureLimit(limit uint64) {

View File

@ -48,6 +48,8 @@ type OrgSearchResult struct {
Limit uint64
TotalResult uint64
Result []*OrgView
Sequence uint64
Timestamp time.Time
}
func (r *OrgSearchRequest) EnsureLimit(limit uint64) {

View File

@ -54,6 +54,8 @@ type ApplicationSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*ApplicationView
Sequence uint64
Timestamp time.Time
}
func (r *ApplicationSearchRequest) EnsureLimit(limit uint64) {

View File

@ -50,6 +50,8 @@ type ProjectGrantMemberSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*ProjectGrantMemberView
Sequence uint64
Timestamp time.Time
}
func (r *ProjectGrantMemberSearchRequest) EnsureLimit(limit uint64) {

View File

@ -52,6 +52,8 @@ type ProjectGrantViewSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*ProjectGrantView
Sequence uint64
Timestamp time.Time
}
func (r *ProjectGrantViewSearchRequest) AppendMyOrgQuery(orgID string) {

View File

@ -49,6 +49,8 @@ type ProjectMemberSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*ProjectMemberView
Sequence uint64
Timestamp time.Time
}
func (r *ProjectMemberSearchRequest) EnsureLimit(limit uint64) {

View File

@ -46,6 +46,8 @@ type ProjectRoleSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*ProjectRoleView
Sequence uint64
Timestamp time.Time
}
func (r *ProjectRoleSearchRequest) AppendMyOrgQuery(orgID string) {

View File

@ -43,6 +43,8 @@ type ProjectViewSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*ProjectView
Sequence uint64
Timestamp time.Time
}
func (r *ProjectViewSearchRequest) AppendMyResourceOwnerQuery(orgID string) {

View File

@ -79,6 +79,8 @@ type UserSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*UserView
Sequence uint64
Timestamp time.Time
}
func (r *UserSearchRequest) EnsureLimit(limit uint64) {

View File

@ -61,6 +61,8 @@ type UserGrantSearchResponse struct {
Limit uint64
TotalResult uint64
Result []*UserGrantView
Sequence uint64
Timestamp time.Time
}
func (r *UserGrantSearchRequest) EnsureLimit(limit uint64) {

View File

@ -1,7 +1,12 @@
package model
import (
"time"
)
type View struct {
Database string
ViewName string
CurrentSequence uint64
Database string
ViewName string
CurrentSequence uint64
CurrentTimestamp time.Time
}

View File

@ -5,15 +5,13 @@ import (
"github.com/caos/zitadel/internal/view/model"
"github.com/jinzhu/gorm"
"strings"
"time"
)
type actualSequece struct {
ActualSequence uint64 `gorm:"column:current_sequence"`
}
type CurrentSequence struct {
ViewName string `gorm:"column:view_name;primary_key"`
CurrentSequence uint64 `gorm:"column:current_sequence"`
ViewName string `gorm:"column:view_name;primary_key"`
CurrentSequence uint64 `gorm:"column:current_sequence"`
CurrentTimestamp time.Time `gorm:"column:timestamp"`
}
type SequenceSearchKey int32
@ -37,15 +35,16 @@ func (key sequenceSearchKey) ToColumnName() string {
func CurrentSequenceToModel(sequence *CurrentSequence) *model.View {
dbView := strings.Split(sequence.ViewName, ".")
return &model.View{
Database: dbView[0],
ViewName: dbView[1],
CurrentSequence: sequence.CurrentSequence,
Database: dbView[0],
ViewName: dbView[1],
CurrentSequence: sequence.CurrentSequence,
CurrentTimestamp: sequence.CurrentTimestamp,
}
}
func SaveCurrentSequence(db *gorm.DB, table, viewName string, sequence uint64) error {
save := PrepareSave(table)
err := save(db, &CurrentSequence{viewName, sequence})
err := save(db, &CurrentSequence{viewName, sequence, time.Now()})
if err != nil {
return caos_errs.ThrowInternal(err, "VIEW-5kOhP", "unable to updated processed sequence")
@ -53,19 +52,19 @@ func SaveCurrentSequence(db *gorm.DB, table, viewName string, sequence uint64) e
return nil
}
func LatestSequence(db *gorm.DB, table, viewName string) (uint64, error) {
sequence := new(actualSequece)
func LatestSequence(db *gorm.DB, table, viewName string) (*CurrentSequence, error) {
sequence := new(CurrentSequence)
query := PrepareGetByKey(table, sequenceSearchKey(SequenceSearchKeyViewName), viewName)
err := query(db, sequence)
if err == nil {
return sequence.ActualSequence, nil
return sequence, nil
}
if caos_errs.IsNotFound(err) {
return 0, nil
return sequence, nil
}
return 0, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName)
return nil, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName)
}
func AllCurrentSequences(db *gorm.DB, table string) ([]*CurrentSequence, error) {

View File

@ -0,0 +1,9 @@
BEGIN;
ALTER TABLE authz.current_sequences ADD COLUMN timestamp TIMESTAMPTZ;
ALTER TABLE auth.current_sequences ADD COLUMN timestamp TIMESTAMPTZ;
ALTER TABLE management.current_sequences ADD COLUMN timestamp TIMESTAMPTZ;
ALTER TABLE notification.current_sequences ADD COLUMN timestamp TIMESTAMPTZ;
ALTER TABLE adminapi.current_sequences ADD COLUMN timestamp TIMESTAMPTZ;
COMMIT;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -449,7 +449,7 @@
"200": {
"description": "A successful response.",
"schema": {
"type": "object"
"$ref": "#/definitions/protobufStruct"
}
}
},
@ -506,6 +506,19 @@
}
},
"definitions": {
"protobufListValue": {
"type": "object",
"properties": {
"values": {
"type": "array",
"items": {
"$ref": "#/definitions/protobufValue"
},
"description": "Repeated field of dynamically typed values."
}
},
"description": "`ListValue` is a wrapper around a repeated field of values.\n\nThe JSON representation for `ListValue` is JSON array."
},
"protobufNullValue": {
"type": "string",
"enum": [
@ -514,6 +527,51 @@
"default": "NULL_VALUE",
"description": "`NullValue` is a singleton enumeration to represent the null value for the\n`Value` type union.\n\n The JSON representation for `NullValue` is JSON `null`.\n\n - NULL_VALUE: Null value."
},
"protobufStruct": {
"type": "object",
"properties": {
"fields": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/protobufValue"
},
"description": "Unordered map of dynamically typed values."
}
},
"description": "`Struct` represents a structured data value, consisting of fields\nwhich map to dynamically typed values. In some languages, `Struct`\nmight be supported by a native representation. For example, in\nscripting languages like JS a struct is represented as an\nobject. The details of that representation are described together\nwith the proto support for the language.\n\nThe JSON representation for `Struct` is JSON object."
},
"protobufValue": {
"type": "object",
"properties": {
"null_value": {
"$ref": "#/definitions/protobufNullValue",
"description": "Represents a null value."
},
"number_value": {
"type": "number",
"format": "double",
"description": "Represents a double value."
},
"string_value": {
"type": "string",
"description": "Represents a string value."
},
"bool_value": {
"type": "boolean",
"format": "boolean",
"description": "Represents a boolean value."
},
"struct_value": {
"$ref": "#/definitions/protobufStruct",
"description": "Represents a structured value."
},
"list_value": {
"$ref": "#/definitions/protobufListValue",
"description": "Represents a repeated `Value`."
}
},
"description": "`Value` represents a dynamically typed value which can be either\nnull, a number, a string, a boolean, a recursive struct value, or a\nlist of values. A producer of value is expected to set one of that\nvariants, absence of any variant indicates an error.\n\nThe JSON representation for `Value` is JSON value."
},
"v1AddIamMemberRequest": {
"type": "object",
"properties": {
@ -752,6 +810,14 @@
"items": {
"$ref": "#/definitions/v1IamMemberView"
}
},
"processed_sequence": {
"type": "string",
"format": "uint64"
},
"view_timestamp": {
"type": "string",
"format": "date-time"
}
}
},
@ -943,6 +1009,14 @@
"items": {
"$ref": "#/definitions/v1Org"
}
},
"processed_sequence": {
"type": "string",
"format": "uint64"
},
"view_timestamp": {
"type": "string",
"format": "date-time"
}
}
},
@ -1098,9 +1172,13 @@
"view_name": {
"type": "string"
},
"sequence": {
"processed_sequence": {
"type": "string",
"format": "uint64"
},
"view_timestamp": {
"type": "string",
"format": "date-time"
}
}
},

View File

@ -290,6 +290,8 @@ message OrgSearchResponse {
uint64 limit = 2;
uint64 total_result = 3;
repeated Org result = 4;
uint64 processed_sequence = 5;
google.protobuf.Timestamp view_timestamp = 6;
}
enum OrgSearchMethod {
@ -424,6 +426,8 @@ message IamMemberSearchResponse {
uint64 limit = 2;
uint64 total_result = 3;
repeated IamMemberView result = 4;
uint64 processed_sequence = 5;
google.protobuf.Timestamp view_timestamp = 6;
}
message IamMemberView {
@ -502,7 +506,8 @@ message Views {
message View {
string database = 1;
string view_name = 2;
uint64 sequence = 3;
uint64 processed_sequence = 3;
google.protobuf.Timestamp view_timestamp = 4;
}

File diff suppressed because it is too large Load Diff

View File

@ -1230,6 +1230,14 @@
"items": {
"$ref": "#/definitions/v1UserGrantView"
}
},
"processed_sequence": {
"type": "string",
"format": "uint64"
},
"view_timestamp": {
"type": "string",
"format": "date-time"
}
}
},

View File

@ -602,6 +602,8 @@ message UserGrantSearchResponse {
uint64 limit = 2;
uint64 total_result = 3;
repeated UserGrantView result = 4;
uint64 processed_sequence = 5;
google.protobuf.Timestamp view_timestamp = 6;
}
message UserGrantView {

Some files were not shown because too many files have changed in this diff Show More