From c2e6fd8f407db5e79a292cc623dcb3c2e49923ed Mon Sep 17 00:00:00 2001 From: Livio Amstutz Date: Tue, 18 Jan 2022 14:15:00 +0100 Subject: [PATCH] fix(query): add user metadata projection (#3021) * fix: add metadata projection * cleanup --- internal/query/projection/projection.go | 1 + internal/query/projection/user_metadata.go | 115 +++++++++++++ .../query/projection/user_metadata_test.go | 157 ++++++++++++++++++ .../cockroach/V1.106__user_metadata.sql | 13 ++ 4 files changed, 286 insertions(+) create mode 100644 internal/query/projection/user_metadata.go create mode 100644 internal/query/projection/user_metadata_test.go create mode 100644 migrations/cockroach/V1.106__user_metadata.sql diff --git a/internal/query/projection/projection.go b/internal/query/projection/projection.go index 83c90021d2..b311ba97d3 100644 --- a/internal/query/projection/projection.go +++ b/internal/query/projection/projection.go @@ -65,6 +65,7 @@ func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, co NewProjectGrantMemberProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["project_grant_members"])) NewAuthNKeyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["authn_keys"])) NewUserGrantProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["user_grants"])) + NewUserMetadataProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["user_metadata"])) _, err := NewKeyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["keys"]), defaults.KeyConfig, keyChan) return err diff --git a/internal/query/projection/user_metadata.go b/internal/query/projection/user_metadata.go new file mode 100644 index 0000000000..36a5250f33 --- /dev/null +++ b/internal/query/projection/user_metadata.go @@ -0,0 +1,115 @@ +package projection + +import ( + "context" + + "github.com/caos/logging" + + "github.com/caos/zitadel/internal/errors" + "github.com/caos/zitadel/internal/eventstore" + "github.com/caos/zitadel/internal/eventstore/handler" + "github.com/caos/zitadel/internal/eventstore/handler/crdb" + "github.com/caos/zitadel/internal/repository/user" +) + +type UserMetadataProjection struct { + crdb.StatementHandler +} + +const UserMetadataProjectionTable = "zitadel.projections.user_metadata" + +func NewUserMetadataProjection(ctx context.Context, config crdb.StatementHandlerConfig) *UserMetadataProjection { + p := &UserMetadataProjection{} + config.ProjectionName = UserMetadataProjectionTable + config.Reducers = p.reducers() + p.StatementHandler = crdb.NewStatementHandler(ctx, config) + return p +} + +func (p *UserMetadataProjection) reducers() []handler.AggregateReducer { + return []handler.AggregateReducer{ + { + Aggregate: user.AggregateType, + EventRedusers: []handler.EventReducer{ + { + Event: user.MetadataSetType, + Reduce: p.reduceMetadataSet, + }, + { + Event: user.MetadataRemovedType, + Reduce: p.reduceMetadataRemoved, + }, + { + Event: user.MetadataRemovedAllType, + Reduce: p.reduceMetadataRemovedAll, + }, + { + Event: user.UserRemovedType, + Reduce: p.reduceMetadataRemovedAll, + }, + }, + }, + } +} + +const ( + UserMetadataColumnUserID = "user_id" + UserMetadataColumnResourceOwner = "resource_owner" + UserMetadataColumnCreationDate = "creation_date" + UserMetadataColumnChangeDate = "change_date" + UserMetadataColumnSequence = "sequence" + UserMetadataColumnKey = "key" + UserMetadataColumnValue = "value" +) + +func (p *UserMetadataProjection) reduceMetadataSet(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.MetadataSetEvent) + if !ok { + logging.LogWithFields("HANDL-Sgn5w", "seq", event.Sequence(), "expectedType", user.MetadataSetType).Error("wrong event type") + return nil, errors.ThrowInvalidArgument(nil, "HANDL-Ghn52", "reduce.wrong.event.type") + } + return crdb.NewUpsertStatement( + e, + []handler.Column{ + handler.NewCol(UserMetadataColumnUserID, e.Aggregate().ID), + handler.NewCol(UserMetadataColumnResourceOwner, e.Aggregate().ResourceOwner), + handler.NewCol(UserMetadataColumnCreationDate, e.CreationDate()), + handler.NewCol(UserMetadataColumnChangeDate, e.CreationDate()), + handler.NewCol(UserMetadataColumnSequence, e.Sequence()), + handler.NewCol(UserMetadataColumnKey, e.Key), + handler.NewCol(UserMetadataColumnValue, e.Value), + }, + ), nil +} + +func (p *UserMetadataProjection) reduceMetadataRemoved(event eventstore.Event) (*handler.Statement, error) { + e, ok := event.(*user.MetadataRemovedEvent) + if !ok { + logging.LogWithFields("HANDL-Dbfg2", "seq", event.Sequence(), "expectedType", user.MetadataRemovedType).Error("wrong event type") + return nil, errors.ThrowInvalidArgument(nil, "HANDL-Bm542", "reduce.wrong.event.type") + } + return crdb.NewDeleteStatement( + e, + []handler.Condition{ + handler.NewCond(UserMetadataColumnUserID, e.Aggregate().ID), + handler.NewCond(UserMetadataColumnKey, e.Key), + }, + ), nil +} + +func (p *UserMetadataProjection) reduceMetadataRemovedAll(event eventstore.Event) (*handler.Statement, error) { + switch event.(type) { + case *user.MetadataRemovedAllEvent, + *user.UserRemovedEvent: + //ok + default: + logging.LogWithFields("HANDL-Dfbh2", "seq", event.Sequence(), "expectedTypes", []eventstore.EventType{user.MetadataRemovedAllType, user.UserRemovedType}).Error("wrong event type") + return nil, errors.ThrowInvalidArgument(nil, "HANDL-Bmnf2", "reduce.wrong.event.type") + } + return crdb.NewDeleteStatement( + event, + []handler.Condition{ + handler.NewCond(UserMetadataColumnUserID, event.Aggregate().ID), + }, + ), nil +} diff --git a/internal/query/projection/user_metadata_test.go b/internal/query/projection/user_metadata_test.go new file mode 100644 index 0000000000..e5c88ac1fc --- /dev/null +++ b/internal/query/projection/user_metadata_test.go @@ -0,0 +1,157 @@ +package projection + +import ( + "testing" + + "github.com/caos/zitadel/internal/errors" + "github.com/caos/zitadel/internal/eventstore" + "github.com/caos/zitadel/internal/eventstore/handler" + "github.com/caos/zitadel/internal/eventstore/repository" + "github.com/caos/zitadel/internal/repository/user" +) + +func TestUserMetadataProjection_reduces(t *testing.T) { + type args struct { + event func(t *testing.T) eventstore.Event + } + tests := []struct { + name string + args args + reduce func(event eventstore.Event) (*handler.Statement, error) + want wantReduce + }{ + { + name: "reduceMetadataSet", + args: args{ + event: getEvent(testEvent( + repository.EventType(user.MetadataSetType), + user.AggregateType, + []byte(`{ + "key": "key", + "value": "dmFsdWU=" + }`), + ), user.MetadataSetEventMapper), + }, + reduce: (&UserMetadataProjection{}).reduceMetadataSet, + want: wantReduce{ + aggregateType: user.AggregateType, + sequence: 15, + previousSequence: 10, + projection: UserMetadataProjectionTable, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "UPSERT INTO zitadel.projections.user_metadata (user_id, resource_owner, creation_date, change_date, sequence, key, value) VALUES ($1, $2, $3, $4, $5, $6, $7)", + expectedArgs: []interface{}{ + "agg-id", + "ro-id", + anyArg{}, + anyArg{}, + uint64(15), + "key", + []byte("value"), + }, + }, + }, + }, + }, + }, + { + name: "reduceMetadataRemoved", + args: args{ + event: getEvent(testEvent( + repository.EventType(user.MetadataRemovedType), + user.AggregateType, + []byte(`{ + "key": "key" + }`), + ), user.MetadataRemovedEventMapper), + }, + reduce: (&UserMetadataProjection{}).reduceMetadataRemoved, + want: wantReduce{ + aggregateType: user.AggregateType, + sequence: 15, + previousSequence: 10, + projection: UserMetadataProjectionTable, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "DELETE FROM zitadel.projections.user_metadata WHERE (user_id = $1) AND (key = $2)", + expectedArgs: []interface{}{ + "agg-id", + "key", + }, + }, + }, + }, + }, + }, + { + name: "reduceMetadataRemovedAll", + args: args{ + event: getEvent(testEvent( + repository.EventType(user.MetadataRemovedAllType), + user.AggregateType, + nil, + ), user.MetadataRemovedAllEventMapper), + }, + reduce: (&UserMetadataProjection{}).reduceMetadataRemovedAll, + want: wantReduce{ + aggregateType: user.AggregateType, + sequence: 15, + previousSequence: 10, + projection: UserMetadataProjectionTable, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "DELETE FROM zitadel.projections.user_metadata WHERE (user_id = $1)", + expectedArgs: []interface{}{ + "agg-id", + }, + }, + }, + }, + }, + }, + { + name: "reduceMetadataRemovedAll (user removed)", + args: args{ + event: getEvent(testEvent( + repository.EventType(user.UserRemovedType), + user.AggregateType, + nil, + ), user.UserRemovedEventMapper), + }, + reduce: (&UserMetadataProjection{}).reduceMetadataRemovedAll, + want: wantReduce{ + aggregateType: user.AggregateType, + sequence: 15, + previousSequence: 10, + projection: UserMetadataProjectionTable, + executer: &testExecuter{ + executions: []execution{ + { + expectedStmt: "DELETE FROM zitadel.projections.user_metadata WHERE (user_id = $1)", + expectedArgs: []interface{}{ + "agg-id", + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + event := baseEvent(t) + got, err := tt.reduce(event) + if _, ok := err.(errors.InvalidArgument); !ok { + t.Errorf("no wrong event mapping: %v, got: %v", err, got) + } + + event = tt.args.event(t) + got, err = tt.reduce(event) + assertReduce(t, got, err, tt.want) + }) + } +} diff --git a/migrations/cockroach/V1.106__user_metadata.sql b/migrations/cockroach/V1.106__user_metadata.sql new file mode 100644 index 0000000000..50263dfd20 --- /dev/null +++ b/migrations/cockroach/V1.106__user_metadata.sql @@ -0,0 +1,13 @@ +CREATE TABLE zitadel.projections.user_metadata ( + user_id STRING NOT NULL + , resource_owner STRING NOT NULL + , creation_date TIMESTAMPTZ NOT NULL + , change_date TIMESTAMPTZ NOT NULL + , sequence INT8 NOT NULL + + , key STRING NOT NULL + , value BYTES + + , PRIMARY KEY (user_id, key) + , INDEX idx_ro (resource_owner) +);