feat: metrics (#1024)

* refactor: switch from opencensus to opentelemetry

* tempo works as designed nooooot

* fix: log traceids

* with grafana agent

* fix: http tracing

* fix: cleanup files

* chore: remove todo

* fix: bad test

* fix: ignore methods in grpc interceptors

* fix: remove test log

* clean up

* typo

* fix(config): configure tracing endpoint

* fix(span): add error id to span

* feat: metrics package

* feat: metrics package

* fix: counter

* fix: metric

* try metrics

* fix: coutner metrics

* fix: active sessin counter

* fix: active sessin counter

* fix: change current Sequence table

* fix: change current Sequence table

* fix: current sequences

* fix: spooler div metrics

* fix: console view

* fix: merge master

* fix: Last spool run on search result instead of eventtimestamp

* fix: go mod

* Update console/src/assets/i18n/de.json

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix: pr review

* fix: map

* update oidc pkg

* fix: handlers

* fix: value observer

* fix: remove fmt

* fix: handlers

* fix: tests

* fix: handler minimum cycle duration 1s

* fix(spooler): handler channel buffer

* fix interceptors

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
Co-authored-by: Livio Amstutz <livio.a@gmail.com>
This commit is contained in:
Fabi
2020-12-02 08:50:59 +01:00
committed by GitHub
parent 723b6b5189
commit 6b3f5b984c
194 changed files with 2570 additions and 1096 deletions

View File

@@ -65,33 +65,37 @@ func (a *Application) Reduce(event *models.Event) (err error) {
if err != nil {
return err
}
return a.view.DeleteApplication(app.ID, event.Sequence)
return a.view.DeleteApplication(app.ID, event.Sequence, event.CreationDate)
case es_model.ProjectChanged:
apps, err := a.view.ApplicationsByProjectID(event.AggregateID)
if err != nil {
return err
}
if len(apps) == 0 {
return a.view.ProcessedApplicationSequence(event.Sequence)
return a.view.ProcessedApplicationSequence(event.Sequence, event.CreationDate)
}
for _, app := range apps {
if err := app.AppendEvent(event); err != nil {
return err
}
}
return a.view.PutApplications(apps, event.Sequence)
return a.view.PutApplications(apps, event.Sequence, event.CreationDate)
case es_model.ProjectRemoved:
return a.view.DeleteApplicationsByProjectID(event.AggregateID)
default:
return a.view.ProcessedApplicationSequence(event.Sequence)
return a.view.ProcessedApplicationSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return a.view.PutApplication(app)
return a.view.PutApplication(app, event.CreationDate)
}
func (a *Application) OnError(event *models.Event, spoolerError error) error {
logging.LogWithFields("SPOOL-ls9ew", "id", event.AggregateID).WithError(spoolerError).Warn("something went wrong in project app handler")
return spooler.HandleError(event, spoolerError, a.view.GetLatestApplicationFailedEvent, a.view.ProcessedApplicationFailedEvent, a.view.ProcessedApplicationSequence, a.errorCountUntilSkip)
}
func (a *Application) OnSuccess() error {
return spooler.HandleSuccess(a.view.UpdateApplicationSpoolerRunTimestamp)
}

View File

@@ -66,17 +66,21 @@ func (m *IDPConfig) processIdpConfig(providerType iam_model.IDPProviderType, eve
if err != nil {
return err
}
return m.view.DeleteIDPConfig(idp.IDPConfigID, event.Sequence)
return m.view.DeleteIDPConfig(idp.IDPConfigID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedIDPConfigSequence(event.Sequence)
return m.view.ProcessedIDPConfigSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutIDPConfig(idp, idp.Sequence)
return m.view.PutIDPConfig(idp, idp.Sequence, event.CreationDate)
}
func (m *IDPConfig) OnError(event *models.Event, err error) error {
func (i *IDPConfig) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Nxu8s", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler")
return spooler.HandleError(event, err, m.view.GetLatestIDPConfigFailedEvent, m.view.ProcessedIDPConfigFailedEvent, m.view.ProcessedIDPConfigSequence, m.errorCountUntilSkip)
return spooler.HandleError(event, err, i.view.GetLatestIDPConfigFailedEvent, i.view.ProcessedIDPConfigFailedEvent, i.view.ProcessedIDPConfigSequence, i.errorCountUntilSkip)
}
func (i *IDPConfig) OnSuccess() error {
return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp)
}

View File

@@ -32,7 +32,7 @@ func (m *IDPProvider) ViewModel() string {
}
func (m *IDPProvider) EventQuery() (*models.SearchQuery, error) {
sequence, err := m.view.GetLatestIdpProviderSequence()
sequence, err := m.view.GetLatestIDPProviderSequence()
if err != nil {
return nil, err
}
@@ -64,7 +64,7 @@ func (m *IDPProvider) processIdpProvider(event *models.Event) (err error) {
if err != nil {
return err
}
return m.view.DeleteIdpProvider(event.AggregateID, provider.IDPConfigID, event.Sequence)
return m.view.DeleteIDPProvider(event.AggregateID, provider.IDPConfigID, event.Sequence, event.CreationDate)
case model.IDPConfigChanged, org_es_model.IDPConfigChanged:
esConfig := new(iam_view_model.IDPConfigView)
providerType := iam_model.IDPProviderTypeSystem
@@ -72,7 +72,7 @@ func (m *IDPProvider) processIdpProvider(event *models.Event) (err error) {
providerType = iam_model.IDPProviderTypeOrg
}
esConfig.AppendEvent(providerType, event)
providers, err := m.view.IdpProvidersByIdpConfigID(event.AggregateID, esConfig.IDPConfigID)
providers, err := m.view.IDPProvidersByIdpConfigID(event.AggregateID, esConfig.IDPConfigID)
if err != nil {
return err
}
@@ -88,16 +88,16 @@ func (m *IDPProvider) processIdpProvider(event *models.Event) (err error) {
for _, provider := range providers {
m.fillConfigData(provider, config)
}
return m.view.PutIdpProviders(event.Sequence, providers...)
return m.view.PutIDPProviders(event.Sequence, event.CreationDate, providers...)
case org_es_model.LoginPolicyRemoved:
return m.view.DeleteIdpProvidersByAggregateID(event.AggregateID, event.Sequence)
return m.view.DeleteIDPProvidersByAggregateID(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedIdpProviderSequence(event.Sequence)
return m.view.ProcessedIDPProviderSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutIdpProvider(provider, provider.Sequence)
return m.view.PutIDPProvider(provider, provider.Sequence, event.CreationDate)
}
func (m *IDPProvider) fillData(provider *iam_view_model.IDPProviderView) (err error) {
@@ -123,5 +123,9 @@ func (m *IDPProvider) fillConfigData(provider *iam_view_model.IDPProviderView, c
func (m *IDPProvider) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Msj8c", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp provider handler")
return spooler.HandleError(event, err, m.view.GetLatestIdpProviderFailedEvent, m.view.ProcessedIdpProviderFailedEvent, m.view.ProcessedIdpProviderSequence, m.errorCountUntilSkip)
return spooler.HandleError(event, err, m.view.GetLatestIDPProviderFailedEvent, m.view.ProcessedIDPProviderFailedEvent, m.view.ProcessedIDPProviderSequence, m.errorCountUntilSkip)
}
func (m *IDPProvider) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdateIDPProviderSpoolerRunTimestamp)
}

View File

@@ -53,15 +53,19 @@ func (m *LabelPolicy) processLabelPolicy(event *models.Event) (err error) {
}
err = policy.AppendEvent(event)
default:
return m.view.ProcessedLabelPolicySequence(event.Sequence)
return m.view.ProcessedLabelPolicySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutLabelPolicy(policy, policy.Sequence)
return m.view.PutLabelPolicy(policy, policy.Sequence, event.CreationDate)
}
func (m *LabelPolicy) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-4Djo9", "id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler")
return spooler.HandleError(event, err, m.view.GetLatestLabelPolicyFailedEvent, m.view.ProcessedLabelPolicyFailedEvent, m.view.ProcessedLabelPolicySequence, m.errorCountUntilSkip)
}
func (m *LabelPolicy) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdateLabelPolicySpoolerRunTimestamp)
}

View File

@@ -57,17 +57,21 @@ func (m *LoginPolicy) processLoginPolicy(event *models.Event) (err error) {
}
err = policy.AppendEvent(event)
case model.LoginPolicyRemoved:
return m.view.DeleteLoginPolicy(event.AggregateID, event.Sequence)
return m.view.DeleteLoginPolicy(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedLoginPolicySequence(event.Sequence)
return m.view.ProcessedLoginPolicySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutLoginPolicy(policy, policy.Sequence)
return m.view.PutLoginPolicy(policy, policy.Sequence, event.CreationDate)
}
func (m *LoginPolicy) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-4Djo9", "id", event.AggregateID).WithError(err).Warn("something went wrong in login policy handler")
return spooler.HandleError(event, err, m.view.GetLatestLoginPolicyFailedEvent, m.view.ProcessedLoginPolicyFailedEvent, m.view.ProcessedLoginPolicySequence, m.errorCountUntilSkip)
}
func (m *LoginPolicy) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdateLoginPolicySpoolerRunTimestamp)
}

View File

@@ -48,26 +48,30 @@ func (d *MachineKeys) processMachineKeys(event *models.Event) (err error) {
case model.MachineKeyAdded:
err = key.AppendEvent(event)
if key.ExpirationDate.Before(time.Now()) {
return d.view.ProcessedMachineKeySequence(event.Sequence)
return d.view.ProcessedMachineKeySequence(event.Sequence, event.CreationDate)
}
case model.MachineKeyRemoved:
err = key.SetData(event)
if err != nil {
return err
}
return d.view.DeleteMachineKey(key.ID, event.Sequence)
return d.view.DeleteMachineKey(key.ID, event.Sequence, event.CreationDate)
case model.UserRemoved:
return d.view.DeleteMachineKeysByUserID(event.AggregateID, event.Sequence)
return d.view.DeleteMachineKeysByUserID(event.AggregateID, event.Sequence, event.CreationDate)
default:
return d.view.ProcessedMachineKeySequence(event.Sequence)
return d.view.ProcessedMachineKeySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return d.view.PutMachineKey(key, key.Sequence)
return d.view.PutMachineKey(key, key.Sequence, event.CreationDate)
}
func (d *MachineKeys) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-S9fe", "id", event.AggregateID).WithError(err).Warn("something went wrong in machine key handler")
return spooler.HandleError(event, err, d.view.GetLatestMachineKeyFailedEvent, d.view.ProcessedMachineKeyFailedEvent, d.view.ProcessedMachineKeySequence, d.errorCountUntilSkip)
}
func (d *MachineKeys) OnSuccess() error {
return spooler.HandleSuccess(d.view.UpdateMachineKeySpoolerRunTimestamp)
}

View File

@@ -47,15 +47,19 @@ func (o *Org) Reduce(event *es_models.Event) (err error) {
}
err = org.AppendEvent(event)
default:
return o.view.ProcessedOrgSequence(event.Sequence)
return o.view.ProcessedOrgSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return o.view.PutOrg(org)
return o.view.PutOrg(org, event.CreationDate)
}
func (o *Org) OnError(event *es_models.Event, spoolerErr error) error {
logging.LogWithFields("SPOOL-ls9ew", "id", event.AggregateID).WithError(spoolerErr).Warn("something went wrong in project app handler")
return spooler.HandleError(event, spoolerErr, o.view.GetLatestOrgFailedEvent, o.view.ProcessedOrgFailedEvent, o.view.ProcessedOrgSequence, o.errorCountUntilSkip)
}
func (o *Org) OnSuccess() error {
return spooler.HandleSuccess(o.view.UpdateOrgSpoolerRunTimestamp)
}

View File

@@ -72,7 +72,7 @@ func (d *OrgDomain) processOrgDomain(event *models.Event) (err error) {
for _, existingDomain := range existingDomains {
existingDomain.Primary = false
}
err = d.view.PutOrgDomains(existingDomains, 0)
err = d.view.PutOrgDomains(existingDomains, 0, event.CreationDate)
if err != nil {
return err
}
@@ -82,17 +82,21 @@ func (d *OrgDomain) processOrgDomain(event *models.Event) (err error) {
if err != nil {
return err
}
return d.view.DeleteOrgDomain(event.AggregateID, domain.Domain, event.Sequence)
return d.view.DeleteOrgDomain(event.AggregateID, domain.Domain, event.Sequence, event.CreationDate)
default:
return d.view.ProcessedOrgDomainSequence(event.Sequence)
return d.view.ProcessedOrgDomainSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return d.view.PutOrgDomain(domain, domain.Sequence)
return d.view.PutOrgDomain(domain, domain.Sequence, event.CreationDate)
}
func (d *OrgDomain) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-us4sj", "id", event.AggregateID).WithError(err).Warn("something went wrong in orgdomain handler")
return spooler.HandleError(event, err, d.view.GetLatestOrgDomainFailedEvent, d.view.ProcessedOrgDomainFailedEvent, d.view.ProcessedOrgDomainSequence, d.errorCountUntilSkip)
}
func (o *OrgDomain) OnSuccess() error {
return spooler.HandleSuccess(o.view.UpdateOrgDomainSpoolerRunTimestamp)
}

View File

@@ -53,17 +53,21 @@ func (m *OrgIAMPolicy) processOrgIAMPolicy(event *models.Event) (err error) {
}
err = policy.AppendEvent(event)
case model.OrgIAMPolicyRemoved:
return m.view.DeleteOrgIAMPolicy(event.AggregateID, event.Sequence)
return m.view.DeleteOrgIAMPolicy(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedOrgIAMPolicySequence(event.Sequence)
return m.view.ProcessedOrgIAMPolicySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutOrgIAMPolicy(policy, policy.Sequence)
return m.view.PutOrgIAMPolicy(policy, policy.Sequence, event.CreationDate)
}
func (m *OrgIAMPolicy) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-3Gf9s", "id", event.AggregateID).WithError(err).Warn("something went wrong in orgIAM policy handler")
return spooler.HandleError(event, err, m.view.GetLatestOrgIAMPolicyFailedEvent, m.view.ProcessedOrgIAMPolicyFailedEvent, m.view.ProcessedOrgIAMPolicySequence, m.errorCountUntilSkip)
}
func (o *OrgIAMPolicy) OnSuccess() error {
return spooler.HandleSuccess(o.view.UpdateOrgIAMPolicySpoolerRunTimestamp)
}

View File

@@ -72,14 +72,14 @@ func (m *OrgMember) processOrgMember(event *models.Event) (err error) {
if err != nil {
return err
}
return m.view.DeleteOrgMember(event.AggregateID, member.UserID, event.Sequence)
return m.view.DeleteOrgMember(event.AggregateID, member.UserID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedOrgMemberSequence(event.Sequence)
return m.view.ProcessedOrgMemberSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutOrgMember(member, member.Sequence)
return m.view.PutOrgMember(member, member.Sequence, event.CreationDate)
}
func (m *OrgMember) processUser(event *models.Event) (err error) {
@@ -94,7 +94,7 @@ func (m *OrgMember) processUser(event *models.Event) (err error) {
return err
}
if len(members) == 0 {
return m.view.ProcessedOrgMemberSequence(event.Sequence)
return m.view.ProcessedOrgMemberSequence(event.Sequence, event.CreationDate)
}
user, err := m.userEvents.UserByID(context.Background(), event.AggregateID)
if err != nil {
@@ -103,11 +103,11 @@ func (m *OrgMember) processUser(event *models.Event) (err error) {
for _, member := range members {
m.fillUserData(member, user)
}
return m.view.PutOrgMembers(members, event.Sequence)
return m.view.PutOrgMembers(members, event.Sequence, event.CreationDate)
case usr_es_model.UserRemoved:
return m.view.DeleteOrgMembersByUserID(event.AggregateID, event.Sequence)
return m.view.DeleteOrgMembersByUserID(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedOrgMemberSequence(event.Sequence)
return m.view.ProcessedOrgMemberSequence(event.Sequence, event.CreationDate)
}
return nil
}
@@ -137,3 +137,7 @@ func (m *OrgMember) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-u73es", "id", event.AggregateID).WithError(err).Warn("something went wrong in orgmember handler")
return spooler.HandleError(event, err, m.view.GetLatestOrgMemberFailedEvent, m.view.ProcessedOrgMemberFailedEvent, m.view.ProcessedOrgMemberSequence, m.errorCountUntilSkip)
}
func (o *OrgMember) OnSuccess() error {
return spooler.HandleSuccess(o.view.UpdateOrgMemberSpoolerRunTimestamp)
}

View File

@@ -53,17 +53,21 @@ func (m *PasswordAgePolicy) processPasswordAgePolicy(event *models.Event) (err e
}
err = policy.AppendEvent(event)
case model.PasswordAgePolicyRemoved:
return m.view.DeletePasswordAgePolicy(event.AggregateID, event.Sequence)
return m.view.DeletePasswordAgePolicy(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedPasswordAgePolicySequence(event.Sequence)
return m.view.ProcessedPasswordAgePolicySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutPasswordAgePolicy(policy, policy.Sequence)
return m.view.PutPasswordAgePolicy(policy, policy.Sequence, event.CreationDate)
}
func (m *PasswordAgePolicy) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Bs89f", "id", event.AggregateID).WithError(err).Warn("something went wrong in passwordAge policy handler")
return spooler.HandleError(event, err, m.view.GetLatestPasswordAgePolicyFailedEvent, m.view.ProcessedPasswordAgePolicyFailedEvent, m.view.ProcessedPasswordAgePolicySequence, m.errorCountUntilSkip)
}
func (m *PasswordAgePolicy) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdatePasswordAgePolicySpoolerRunTimestamp)
}

View File

@@ -19,12 +19,12 @@ const (
passwordComplexityPolicyTable = "management.password_complexity_policies"
)
func (m *PasswordComplexityPolicy) ViewModel() string {
func (p *PasswordComplexityPolicy) ViewModel() string {
return passwordComplexityPolicyTable
}
func (m *PasswordComplexityPolicy) EventQuery() (*models.SearchQuery, error) {
sequence, err := m.view.GetLatestPasswordComplexityPolicySequence()
func (p *PasswordComplexityPolicy) EventQuery() (*models.SearchQuery, error) {
sequence, err := p.view.GetLatestPasswordComplexityPolicySequence()
if err != nil {
return nil, err
}
@@ -33,37 +33,41 @@ func (m *PasswordComplexityPolicy) EventQuery() (*models.SearchQuery, error) {
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (m *PasswordComplexityPolicy) Reduce(event *models.Event) (err error) {
func (p *PasswordComplexityPolicy) Reduce(event *models.Event) (err error) {
switch event.AggregateType {
case model.OrgAggregate, iam_es_model.IAMAggregate:
err = m.processPasswordComplexityPolicy(event)
err = p.processPasswordComplexityPolicy(event)
}
return err
}
func (m *PasswordComplexityPolicy) processPasswordComplexityPolicy(event *models.Event) (err error) {
func (p *PasswordComplexityPolicy) processPasswordComplexityPolicy(event *models.Event) (err error) {
policy := new(iam_model.PasswordComplexityPolicyView)
switch event.Type {
case iam_es_model.PasswordComplexityPolicyAdded, model.PasswordComplexityPolicyAdded:
err = policy.AppendEvent(event)
case iam_es_model.PasswordComplexityPolicyChanged, model.PasswordComplexityPolicyChanged:
policy, err = m.view.PasswordComplexityPolicyByAggregateID(event.AggregateID)
policy, err = p.view.PasswordComplexityPolicyByAggregateID(event.AggregateID)
if err != nil {
return err
}
err = policy.AppendEvent(event)
case model.PasswordComplexityPolicyRemoved:
return m.view.DeletePasswordComplexityPolicy(event.AggregateID, event.Sequence)
return p.view.DeletePasswordComplexityPolicy(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedPasswordComplexityPolicySequence(event.Sequence)
return p.view.ProcessedPasswordComplexityPolicySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutPasswordComplexityPolicy(policy, policy.Sequence)
return p.view.PutPasswordComplexityPolicy(policy, policy.Sequence, event.CreationDate)
}
func (m *PasswordComplexityPolicy) OnError(event *models.Event, err error) error {
func (p *PasswordComplexityPolicy) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-4Djo9", "id", event.AggregateID).WithError(err).Warn("something went wrong in passwordComplexity policy handler")
return spooler.HandleError(event, err, m.view.GetLatestPasswordComplexityPolicyFailedEvent, m.view.ProcessedPasswordComplexityPolicyFailedEvent, m.view.ProcessedPasswordComplexityPolicySequence, m.errorCountUntilSkip)
return spooler.HandleError(event, err, p.view.GetLatestPasswordComplexityPolicyFailedEvent, p.view.ProcessedPasswordComplexityPolicyFailedEvent, p.view.ProcessedPasswordComplexityPolicySequence, p.errorCountUntilSkip)
}
func (p *PasswordComplexityPolicy) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdatePasswordComplexityPolicySpoolerRunTimestamp)
}

View File

@@ -19,12 +19,12 @@ const (
passwordLockoutPolicyTable = "management.password_lockout_policies"
)
func (m *PasswordLockoutPolicy) ViewModel() string {
func (p *PasswordLockoutPolicy) ViewModel() string {
return passwordLockoutPolicyTable
}
func (m *PasswordLockoutPolicy) EventQuery() (*models.SearchQuery, error) {
sequence, err := m.view.GetLatestPasswordLockoutPolicySequence()
func (p *PasswordLockoutPolicy) EventQuery() (*models.SearchQuery, error) {
sequence, err := p.view.GetLatestPasswordLockoutPolicySequence()
if err != nil {
return nil, err
}
@@ -33,37 +33,41 @@ func (m *PasswordLockoutPolicy) EventQuery() (*models.SearchQuery, error) {
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (m *PasswordLockoutPolicy) Reduce(event *models.Event) (err error) {
func (p *PasswordLockoutPolicy) Reduce(event *models.Event) (err error) {
switch event.AggregateType {
case model.OrgAggregate, iam_es_model.IAMAggregate:
err = m.processPasswordLockoutPolicy(event)
err = p.processPasswordLockoutPolicy(event)
}
return err
}
func (m *PasswordLockoutPolicy) processPasswordLockoutPolicy(event *models.Event) (err error) {
func (p *PasswordLockoutPolicy) processPasswordLockoutPolicy(event *models.Event) (err error) {
policy := new(iam_model.PasswordLockoutPolicyView)
switch event.Type {
case iam_es_model.PasswordLockoutPolicyAdded, model.PasswordLockoutPolicyAdded:
err = policy.AppendEvent(event)
case iam_es_model.PasswordLockoutPolicyChanged, model.PasswordLockoutPolicyChanged:
policy, err = m.view.PasswordLockoutPolicyByAggregateID(event.AggregateID)
policy, err = p.view.PasswordLockoutPolicyByAggregateID(event.AggregateID)
if err != nil {
return err
}
err = policy.AppendEvent(event)
case model.PasswordLockoutPolicyRemoved:
return m.view.DeletePasswordLockoutPolicy(event.AggregateID, event.Sequence)
return p.view.DeletePasswordLockoutPolicy(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedPasswordLockoutPolicySequence(event.Sequence)
return p.view.ProcessedPasswordLockoutPolicySequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutPasswordLockoutPolicy(policy, policy.Sequence)
return p.view.PutPasswordLockoutPolicy(policy, policy.Sequence, event.CreationDate)
}
func (m *PasswordLockoutPolicy) OnError(event *models.Event, err error) error {
func (p *PasswordLockoutPolicy) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Bms8f", "id", event.AggregateID).WithError(err).Warn("something went wrong in passwordLockout policy handler")
return spooler.HandleError(event, err, m.view.GetLatestPasswordLockoutPolicyFailedEvent, m.view.ProcessedPasswordLockoutPolicyFailedEvent, m.view.ProcessedPasswordLockoutPolicySequence, m.errorCountUntilSkip)
return spooler.HandleError(event, err, p.view.GetLatestPasswordLockoutPolicyFailedEvent, p.view.ProcessedPasswordLockoutPolicyFailedEvent, p.view.ProcessedPasswordLockoutPolicySequence, p.errorCountUntilSkip)
}
func (p *PasswordLockoutPolicy) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdatePasswordLockoutPolicySpoolerRunTimestamp)
}

View File

@@ -46,17 +46,21 @@ func (p *Project) Reduce(event *models.Event) (err error) {
}
err = project.AppendEvent(event)
case es_model.ProjectRemoved:
return p.view.DeleteProject(event.AggregateID, event.Sequence)
return p.view.DeleteProject(event.AggregateID, event.Sequence, event.CreationDate)
default:
return p.view.ProcessedProjectSequence(event.Sequence)
return p.view.ProcessedProjectSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return p.view.PutProject(project)
return p.view.PutProject(project, event.CreationDate)
}
func (p *Project) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-dLsop3", "id", event.AggregateID).WithError(err).Warn("something went wrong in projecthandler")
return spooler.HandleError(event, err, p.view.GetLatestProjectFailedEvent, p.view.ProcessedProjectFailedEvent, p.view.ProcessedProjectSequence, p.errorCountUntilSkip)
}
func (p *Project) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdateProjectSpoolerRunTimestamp)
}

View File

@@ -2,6 +2,7 @@ package handler
import (
"context"
"time"
"github.com/caos/logging"
@@ -47,7 +48,7 @@ func (p *ProjectGrant) Reduce(event *models.Event) (err error) {
if err != nil {
return err
}
return p.updateExistingProjects(project, event.Sequence)
return p.updateExistingProjects(project, event.Sequence, event.CreationDate)
case es_model.ProjectGrantAdded:
err = grantedProject.AppendEvent(event)
if err != nil {
@@ -85,16 +86,16 @@ func (p *ProjectGrant) Reduce(event *models.Event) (err error) {
if err != nil {
return err
}
return p.view.DeleteProjectGrant(grant.GrantID, event.Sequence)
return p.view.DeleteProjectGrant(grant.GrantID, event.Sequence, event.CreationDate)
case es_model.ProjectRemoved:
return p.view.DeleteProjectGrantsByProjectID(event.AggregateID)
default:
return p.view.ProcessedProjectGrantSequence(event.Sequence)
return p.view.ProcessedProjectGrantSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return p.view.PutProjectGrant(grantedProject)
return p.view.PutProjectGrant(grantedProject, event.CreationDate)
}
func (p *ProjectGrant) fillOrgData(grantedProject *view_model.ProjectGrantView, org, resourceOwner *org_model.Org) {
@@ -106,7 +107,7 @@ func (p *ProjectGrant) getProject(projectID string) (*proj_model.Project, error)
return p.projectEvents.ProjectByID(context.Background(), projectID)
}
func (p *ProjectGrant) updateExistingProjects(project *view_model.ProjectView, sequence uint64) error {
func (p *ProjectGrant) updateExistingProjects(project *view_model.ProjectView, sequence uint64, eventTimestamp time.Time) error {
projectGrants, err := p.view.ProjectGrantsByProjectID(project.ProjectID)
if err != nil {
logging.LogWithFields("SPOOL-los03", "id", project.ProjectID).WithError(err).Warn("could not update existing projects")
@@ -114,10 +115,14 @@ func (p *ProjectGrant) updateExistingProjects(project *view_model.ProjectView, s
for _, existingGrant := range projectGrants {
existingGrant.Name = project.Name
}
return p.view.PutProjectGrants(projectGrants, sequence)
return p.view.PutProjectGrants(projectGrants, sequence, eventTimestamp)
}
func (p *ProjectGrant) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-sQqOg", "id", event.AggregateID).WithError(err).Warn("something went wrong in granted projecthandler")
return spooler.HandleError(event, err, p.view.GetLatestProjectGrantFailedEvent, p.view.ProcessedProjectGrantFailedEvent, p.view.ProcessedProjectGrantSequence, p.errorCountUntilSkip)
}
func (p *ProjectGrant) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdateProjectGrantSpoolerRunTimestamp)
}

View File

@@ -72,16 +72,16 @@ func (p *ProjectGrantMember) processProjectGrantMember(event *models.Event) (err
if err != nil {
return err
}
return p.view.DeleteProjectGrantMember(member.GrantID, member.UserID, event.Sequence)
return p.view.DeleteProjectGrantMember(member.GrantID, member.UserID, event.Sequence, event.CreationDate)
case proj_es_model.ProjectRemoved:
return p.view.DeleteProjectGrantMembersByProjectID(event.AggregateID)
default:
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence)
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return p.view.PutProjectGrantMember(member, member.Sequence)
return p.view.PutProjectGrantMember(member, member.Sequence, event.CreationDate)
}
func (p *ProjectGrantMember) processUser(event *models.Event) (err error) {
@@ -96,7 +96,7 @@ func (p *ProjectGrantMember) processUser(event *models.Event) (err error) {
return err
}
if len(members) == 0 {
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence)
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence, event.CreationDate)
}
user, err := p.userEvents.UserByID(context.Background(), event.AggregateID)
if err != nil {
@@ -105,9 +105,9 @@ func (p *ProjectGrantMember) processUser(event *models.Event) (err error) {
for _, member := range members {
p.fillUserData(member, user)
}
return p.view.PutProjectGrantMembers(members, event.Sequence)
return p.view.PutProjectGrantMembers(members, event.Sequence, event.CreationDate)
default:
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence)
return p.view.ProcessedProjectGrantMemberSequence(event.Sequence, event.CreationDate)
}
return nil
}
@@ -138,3 +138,7 @@ func (p *ProjectGrantMember) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-kls93", "id", event.AggregateID).WithError(err).Warn("something went wrong in projectmember handler")
return spooler.HandleError(event, err, p.view.GetLatestProjectGrantMemberFailedEvent, p.view.ProcessedProjectGrantMemberFailedEvent, p.view.ProcessedProjectGrantMemberSequence, p.errorCountUntilSkip)
}
func (p *ProjectGrantMember) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdateProjectGrantMemberSpoolerRunTimestamp)
}

View File

@@ -72,16 +72,16 @@ func (p *ProjectMember) processProjectMember(event *models.Event) (err error) {
if err != nil {
return err
}
return p.view.DeleteProjectMember(event.AggregateID, member.UserID, event.Sequence)
return p.view.DeleteProjectMember(event.AggregateID, member.UserID, event.Sequence, event.CreationDate)
case proj_es_model.ProjectRemoved:
return p.view.DeleteProjectMembersByProjectID(event.AggregateID)
default:
return p.view.ProcessedProjectMemberSequence(event.Sequence)
return p.view.ProcessedProjectMemberSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return p.view.PutProjectMember(member, member.Sequence)
return p.view.PutProjectMember(member, member.Sequence, event.CreationDate)
}
func (p *ProjectMember) processUser(event *models.Event) (err error) {
@@ -96,7 +96,7 @@ func (p *ProjectMember) processUser(event *models.Event) (err error) {
return err
}
if len(members) == 0 {
return p.view.ProcessedProjectMemberSequence(event.Sequence)
return p.view.ProcessedProjectMemberSequence(event.Sequence, event.CreationDate)
}
user, err := p.userEvents.UserByID(context.Background(), event.AggregateID)
if err != nil {
@@ -105,9 +105,9 @@ func (p *ProjectMember) processUser(event *models.Event) (err error) {
for _, member := range members {
p.fillUserData(member, user)
}
return p.view.PutProjectMembers(members, event.Sequence)
return p.view.PutProjectMembers(members, event.Sequence, event.CreationDate)
default:
return p.view.ProcessedProjectMemberSequence(event.Sequence)
return p.view.ProcessedProjectMemberSequence(event.Sequence, event.CreationDate)
}
return nil
}
@@ -137,3 +137,7 @@ func (p *ProjectMember) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-u73es", "id", event.AggregateID).WithError(err).Warn("something went wrong in projectmember handler")
return spooler.HandleError(event, err, p.view.GetLatestProjectMemberFailedEvent, p.view.ProcessedProjectMemberFailedEvent, p.view.ProcessedProjectMemberSequence, p.errorCountUntilSkip)
}
func (p *ProjectMember) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdateProjectMemberSpoolerRunTimestamp)
}

View File

@@ -52,19 +52,23 @@ func (p *ProjectRole) Reduce(event *models.Event) (err error) {
if err != nil {
return err
}
return p.view.DeleteProjectRole(event.AggregateID, event.ResourceOwner, role.Key, event.Sequence)
return p.view.DeleteProjectRole(event.AggregateID, event.ResourceOwner, role.Key, event.Sequence, event.CreationDate)
case es_model.ProjectRemoved:
return p.view.DeleteProjectRolesByProjectID(event.AggregateID)
default:
return p.view.ProcessedProjectRoleSequence(event.Sequence)
return p.view.ProcessedProjectRoleSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return p.view.PutProjectRole(role)
return p.view.PutProjectRole(role, event.CreationDate)
}
func (p *ProjectRole) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-lso9w", "id", event.AggregateID).WithError(err).Warn("something went wrong in project role handler")
return spooler.HandleError(event, err, p.view.GetLatestProjectRoleFailedEvent, p.view.ProcessedProjectRoleFailedEvent, p.view.ProcessedProjectRoleSequence, p.errorCountUntilSkip)
}
func (p *ProjectRole) OnSuccess() error {
return spooler.HandleSuccess(p.view.UpdateProjectRoleSpoolerRunTimestamp)
}

View File

@@ -109,14 +109,14 @@ func (u *User) ProcessUser(event *models.Event) (err error) {
}
err = u.fillLoginNames(user)
case es_model.UserRemoved:
return u.view.DeleteUser(event.AggregateID, event.Sequence)
return u.view.DeleteUser(event.AggregateID, event.Sequence, event.CreationDate)
default:
return u.view.ProcessedUserSequence(event.Sequence)
return u.view.ProcessedUserSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return u.view.PutUser(user, user.Sequence)
return u.view.PutUser(user, user.Sequence, event.CreationDate)
}
func (u *User) ProcessOrg(event *models.Event) (err error) {
@@ -130,7 +130,7 @@ func (u *User) ProcessOrg(event *models.Event) (err error) {
case org_es_model.OrgDomainPrimarySet:
return u.fillPreferredLoginNamesOnOrgUsers(event)
default:
return u.view.ProcessedUserSequence(event.Sequence)
return u.view.ProcessedUserSequence(event.Sequence, event.CreationDate)
}
}
@@ -153,7 +153,7 @@ func (u *User) fillLoginNamesOnOrgUsers(event *models.Event) error {
for _, user := range users {
user.SetLoginNames(policy, org.Domains)
}
return u.view.PutUsers(users, event.Sequence)
return u.view.PutUsers(users, event.Sequence, event.CreationDate)
}
func (u *User) fillPreferredLoginNamesOnOrgUsers(event *models.Event) error {
@@ -178,7 +178,7 @@ func (u *User) fillPreferredLoginNamesOnOrgUsers(event *models.Event) error {
for _, user := range users {
user.PreferredLoginName = user.GenerateLoginName(org.GetPrimaryDomain().Domain, policy.UserLoginMustBeDomain)
}
return u.view.PutUsers(users, event.Sequence)
return u.view.PutUsers(users, event.Sequence, event.CreationDate)
}
func (u *User) fillLoginNames(user *view_model.UserView) (err error) {
@@ -202,3 +202,7 @@ func (u *User) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-is8wa", "id", event.AggregateID).WithError(err).Warn("something went wrong in user handler")
return spooler.HandleError(event, err, u.view.GetLatestUserFailedEvent, u.view.ProcessedUserFailedEvent, u.view.ProcessedUserSequence, u.errorCountUntilSkip)
}
func (u *User) OnSuccess() error {
return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp)
}

View File

@@ -30,12 +30,12 @@ const (
externalIDPTable = "management.user_external_idps"
)
func (m *ExternalIDP) ViewModel() string {
func (i *ExternalIDP) ViewModel() string {
return externalIDPTable
}
func (m *ExternalIDP) EventQuery() (*models.SearchQuery, error) {
sequence, err := m.view.GetLatestExternalIDPSequence()
func (i *ExternalIDP) EventQuery() (*models.SearchQuery, error) {
sequence, err := i.view.GetLatestExternalIDPSequence()
if err != nil {
return nil, err
}
@@ -44,17 +44,17 @@ func (m *ExternalIDP) EventQuery() (*models.SearchQuery, error) {
LatestSequenceFilter(sequence.CurrentSequence), nil
}
func (m *ExternalIDP) Reduce(event *models.Event) (err error) {
func (i *ExternalIDP) Reduce(event *models.Event) (err error) {
switch event.AggregateType {
case model.UserAggregate:
err = m.processUser(event)
err = i.processUser(event)
case iam_es_model.IAMAggregate, org_es_model.OrgAggregate:
err = m.processIdpConfig(event)
err = i.processIdpConfig(event)
}
return err
}
func (m *ExternalIDP) processUser(event *models.Event) (err error) {
func (i *ExternalIDP) processUser(event *models.Event) (err error) {
externalIDP := new(usr_view_model.ExternalIDPView)
switch event.Type {
case model.HumanExternalIDPAdded:
@@ -62,25 +62,25 @@ func (m *ExternalIDP) processUser(event *models.Event) (err error) {
if err != nil {
return err
}
err = m.fillData(externalIDP)
err = i.fillData(externalIDP)
case model.HumanExternalIDPRemoved, model.HumanExternalIDPCascadeRemoved:
err = externalIDP.SetData(event)
if err != nil {
return err
}
return m.view.DeleteExternalIDP(externalIDP.ExternalUserID, externalIDP.IDPConfigID, event.Sequence)
return i.view.DeleteExternalIDP(externalIDP.ExternalUserID, externalIDP.IDPConfigID, event.Sequence, event.CreationDate)
case model.UserRemoved:
return m.view.DeleteExternalIDPsByUserID(event.AggregateID, event.Sequence)
return i.view.DeleteExternalIDPsByUserID(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedExternalIDPSequence(event.Sequence)
return i.view.ProcessedExternalIDPSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutExternalIDP(externalIDP, externalIDP.Sequence)
return i.view.PutExternalIDP(externalIDP, externalIDP.Sequence, event.CreationDate)
}
func (m *ExternalIDP) processIdpConfig(event *models.Event) (err error) {
func (i *ExternalIDP) processIdpConfig(event *models.Event) (err error) {
switch event.Type {
case iam_es_model.IDPConfigChanged, org_es_model.IDPConfigChanged:
configView := new(iam_view_model.IDPConfigView)
@@ -90,45 +90,49 @@ func (m *ExternalIDP) processIdpConfig(event *models.Event) (err error) {
} else {
configView.AppendEvent(iam_model.IDPProviderTypeOrg, event)
}
exterinalIDPs, err := m.view.ExternalIDPsByIDPConfigID(configView.IDPConfigID)
exterinalIDPs, err := i.view.ExternalIDPsByIDPConfigID(configView.IDPConfigID)
if err != nil {
return err
}
if event.AggregateType == iam_es_model.IAMAggregate {
config, err = m.iamEvents.GetIDPConfig(context.Background(), event.AggregateID, configView.IDPConfigID)
config, err = i.iamEvents.GetIDPConfig(context.Background(), event.AggregateID, configView.IDPConfigID)
} else {
config, err = m.orgEvents.GetIDPConfig(context.Background(), event.AggregateID, configView.IDPConfigID)
config, err = i.orgEvents.GetIDPConfig(context.Background(), event.AggregateID, configView.IDPConfigID)
}
if err != nil {
return err
}
for _, provider := range exterinalIDPs {
m.fillConfigData(provider, config)
i.fillConfigData(provider, config)
}
return m.view.PutExternalIDPs(event.Sequence, exterinalIDPs...)
return i.view.PutExternalIDPs(event.Sequence, event.CreationDate, exterinalIDPs...)
default:
return m.view.ProcessedExternalIDPSequence(event.Sequence)
return i.view.ProcessedExternalIDPSequence(event.Sequence, event.CreationDate)
}
return nil
}
func (m *ExternalIDP) fillData(externalIDP *usr_view_model.ExternalIDPView) error {
config, err := m.orgEvents.GetIDPConfig(context.Background(), externalIDP.ResourceOwner, externalIDP.IDPConfigID)
func (i *ExternalIDP) fillData(externalIDP *usr_view_model.ExternalIDPView) error {
config, err := i.orgEvents.GetIDPConfig(context.Background(), externalIDP.ResourceOwner, externalIDP.IDPConfigID)
if caos_errs.IsNotFound(err) {
config, err = m.iamEvents.GetIDPConfig(context.Background(), m.systemDefaults.IamID, externalIDP.IDPConfigID)
config, err = i.iamEvents.GetIDPConfig(context.Background(), i.systemDefaults.IamID, externalIDP.IDPConfigID)
}
if err != nil {
return err
}
m.fillConfigData(externalIDP, config)
i.fillConfigData(externalIDP, config)
return nil
}
func (m *ExternalIDP) fillConfigData(externalIDP *usr_view_model.ExternalIDPView, config *iam_model.IDPConfig) {
func (i *ExternalIDP) fillConfigData(externalIDP *usr_view_model.ExternalIDPView, config *iam_model.IDPConfig) {
externalIDP.IDPName = config.Name
}
func (m *ExternalIDP) OnError(event *models.Event, err error) error {
func (i *ExternalIDP) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-4Rsu8", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp provider handler")
return spooler.HandleError(event, err, m.view.GetLatestExternalIDPFailedEvent, m.view.ProcessedExternalIDPFailedEvent, m.view.ProcessedExternalIDPSequence, m.errorCountUntilSkip)
return spooler.HandleError(event, err, i.view.GetLatestExternalIDPFailedEvent, i.view.ProcessedExternalIDPFailedEvent, i.view.ProcessedExternalIDPSequence, i.errorCountUntilSkip)
}
func (i *ExternalIDP) OnSuccess() error {
return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp)
}

View File

@@ -79,14 +79,14 @@ func (u *UserGrant) processUserGrant(event *models.Event) (err error) {
}
err = grant.AppendEvent(event)
case grant_es_model.UserGrantRemoved, grant_es_model.UserGrantCascadeRemoved:
return u.view.DeleteUserGrant(event.AggregateID, event.Sequence)
return u.view.DeleteUserGrant(event.AggregateID, event.Sequence, event.CreationDate)
default:
return u.view.ProcessedUserGrantSequence(event.Sequence)
return u.view.ProcessedUserGrantSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return u.view.PutUserGrant(grant, grant.Sequence)
return u.view.PutUserGrant(grant, grant.Sequence, event.CreationDate)
}
func (u *UserGrant) processUser(event *models.Event) (err error) {
@@ -101,7 +101,7 @@ func (u *UserGrant) processUser(event *models.Event) (err error) {
return err
}
if len(grants) == 0 {
return u.view.ProcessedUserGrantSequence(event.Sequence)
return u.view.ProcessedUserGrantSequence(event.Sequence, event.CreationDate)
}
user, err := u.userEvents.UserByID(context.Background(), event.AggregateID)
if err != nil {
@@ -110,9 +110,9 @@ func (u *UserGrant) processUser(event *models.Event) (err error) {
for _, grant := range grants {
u.fillUserData(grant, user)
}
return u.view.PutUserGrants(grants, event.Sequence)
return u.view.PutUserGrants(grants, event.Sequence, event.CreationDate)
default:
return u.view.ProcessedUserGrantSequence(event.Sequence)
return u.view.ProcessedUserGrantSequence(event.Sequence, event.CreationDate)
}
return nil
}
@@ -125,7 +125,7 @@ func (u *UserGrant) processProject(event *models.Event) (err error) {
return err
}
if len(grants) == 0 {
return u.view.ProcessedUserGrantSequence(event.Sequence)
return u.view.ProcessedUserGrantSequence(event.Sequence, event.CreationDate)
}
project, err := u.projectEvents.ProjectByID(context.Background(), event.AggregateID)
if err != nil {
@@ -134,9 +134,9 @@ func (u *UserGrant) processProject(event *models.Event) (err error) {
for _, grant := range grants {
u.fillProjectData(grant, project)
}
return u.view.PutUserGrants(grants, event.Sequence)
return u.view.PutUserGrants(grants, event.Sequence, event.CreationDate)
default:
return u.view.ProcessedUserGrantSequence(event.Sequence)
return u.view.ProcessedUserGrantSequence(event.Sequence, event.CreationDate)
}
return nil
}
@@ -193,3 +193,7 @@ func (u *UserGrant) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-8is4s", "id", event.AggregateID).WithError(err).Warn("something went wrong in user handler")
return spooler.HandleError(event, err, u.view.GetLatestUserGrantFailedEvent, u.view.ProcessedUserGrantFailedEvent, u.view.ProcessedUserGrantSequence, u.errorCountUntilSkip)
}
func (u *UserGrant) OnSuccess() error {
return spooler.HandleSuccess(u.view.UpdateUserGrantSpoolerRunTimestamp)
}

View File

@@ -73,14 +73,14 @@ func (m *UserMembership) processIam(event *models.Event) (err error) {
}
err = member.AppendEvent(event)
case iam_es_model.IAMMemberRemoved:
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, usr_model.MemberTypeIam, event.Sequence)
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, usr_model.MemberTypeIam, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedUserMembershipSequence(event.Sequence)
return m.view.ProcessedUserMembershipSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutUserMembership(member, event.Sequence)
return m.view.PutUserMembership(member, event.Sequence, event.CreationDate)
}
func (m *UserMembership) fillIamDisplayName(member *usr_es_model.UserMembershipView) {
@@ -103,16 +103,16 @@ func (m *UserMembership) processOrg(event *models.Event) (err error) {
}
err = member.AppendEvent(event)
case org_es_model.OrgMemberRemoved:
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, usr_model.MemberTypeOrganisation, event.Sequence)
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, usr_model.MemberTypeOrganisation, event.Sequence, event.CreationDate)
case org_es_model.OrgChanged:
return m.updateOrgDisplayName(event)
default:
return m.view.ProcessedUserMembershipSequence(event.Sequence)
return m.view.ProcessedUserMembershipSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutUserMembership(member, event.Sequence)
return m.view.PutUserMembership(member, event.Sequence, event.CreationDate)
}
func (m *UserMembership) fillOrgDisplayName(member *usr_es_model.UserMembershipView) (err error) {
@@ -137,7 +137,7 @@ func (m *UserMembership) updateOrgDisplayName(event *models.Event) error {
for _, membership := range memberships {
membership.DisplayName = org.Name
}
return m.view.BulkPutUserMemberships(memberships, event.Sequence)
return m.view.BulkPutUserMemberships(memberships, event.Sequence, event.CreationDate)
}
func (m *UserMembership) processProject(event *models.Event) (err error) {
@@ -156,7 +156,7 @@ func (m *UserMembership) processProject(event *models.Event) (err error) {
}
err = member.AppendEvent(event)
case proj_es_model.ProjectMemberRemoved:
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, usr_model.MemberTypeProject, event.Sequence)
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, event.AggregateID, usr_model.MemberTypeProject, event.Sequence, event.CreationDate)
case proj_es_model.ProjectGrantMemberChanged:
member, err = m.view.UserMembershipByIDs(member.UserID, event.AggregateID, member.ObjectID, usr_model.MemberTypeProjectGrant)
if err != nil {
@@ -164,20 +164,20 @@ func (m *UserMembership) processProject(event *models.Event) (err error) {
}
err = member.AppendEvent(event)
case proj_es_model.ProjectGrantMemberRemoved:
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, member.ObjectID, usr_model.MemberTypeProjectGrant, event.Sequence)
return m.view.DeleteUserMembership(member.UserID, event.AggregateID, member.ObjectID, usr_model.MemberTypeProjectGrant, event.Sequence, event.CreationDate)
case proj_es_model.ProjectChanged:
return m.updateProjectDisplayName(event)
case proj_es_model.ProjectRemoved:
return m.view.DeleteUserMembershipsByAggregateID(event.AggregateID, event.Sequence)
return m.view.DeleteUserMembershipsByAggregateID(event.AggregateID, event.Sequence, event.CreationDate)
case proj_es_model.ProjectGrantRemoved:
return m.view.DeleteUserMembershipsByAggregateIDAndObjectID(event.AggregateID, member.ObjectID, event.Sequence)
return m.view.DeleteUserMembershipsByAggregateIDAndObjectID(event.AggregateID, member.ObjectID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedUserMembershipSequence(event.Sequence)
return m.view.ProcessedUserMembershipSequence(event.Sequence, event.CreationDate)
}
if err != nil {
return err
}
return m.view.PutUserMembership(member, event.Sequence)
return m.view.PutUserMembership(member, event.Sequence, event.CreationDate)
}
func (m *UserMembership) fillProjectDisplayName(member *usr_es_model.UserMembershipView) (err error) {
@@ -202,15 +202,15 @@ func (m *UserMembership) updateProjectDisplayName(event *models.Event) error {
for _, membership := range memberships {
membership.DisplayName = project.Name
}
return m.view.BulkPutUserMemberships(memberships, event.Sequence)
return m.view.BulkPutUserMemberships(memberships, event.Sequence, event.CreationDate)
}
func (m *UserMembership) processUser(event *models.Event) (err error) {
switch event.Type {
case model.UserRemoved:
return m.view.DeleteUserMembershipsByUserID(event.AggregateID, event.Sequence)
return m.view.DeleteUserMembershipsByUserID(event.AggregateID, event.Sequence, event.CreationDate)
default:
return m.view.ProcessedUserMembershipSequence(event.Sequence)
return m.view.ProcessedUserMembershipSequence(event.Sequence, event.CreationDate)
}
}
@@ -218,3 +218,7 @@ func (m *UserMembership) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Fwer2", "id", event.AggregateID).WithError(err).Warn("something went wrong in user membership handler")
return spooler.HandleError(event, err, m.view.GetLatestUserMembershipFailedEvent, m.view.ProcessedUserMembershipFailedEvent, m.view.ProcessedUserMembershipSequence, m.errorCountUntilSkip)
}
func (m *UserMembership) OnSuccess() error {
return spooler.HandleSuccess(m.view.UpdateUserMembershipSpoolerRunTimestamp)
}