package eventstore import ( "context" "time" "github.com/caos/logging" "github.com/golang/protobuf/ptypes" "github.com/caos/zitadel/internal/api/authz" "github.com/caos/zitadel/internal/auth/repository/eventsourcing/view" "github.com/caos/zitadel/internal/config/systemdefaults" "github.com/caos/zitadel/internal/domain" "github.com/caos/zitadel/internal/errors" v1 "github.com/caos/zitadel/internal/eventstore/v1" "github.com/caos/zitadel/internal/eventstore/v1/models" iam_model "github.com/caos/zitadel/internal/iam/repository/view/model" "github.com/caos/zitadel/internal/telemetry/tracing" "github.com/caos/zitadel/internal/user/model" usr_view "github.com/caos/zitadel/internal/user/repository/view" usr_view_model "github.com/caos/zitadel/internal/user/repository/view/model" ) type UserRepo struct { SearchLimit uint64 Eventstore v1.Eventstore View *view.View SystemDefaults systemdefaults.SystemDefaults PrefixAvatarURL string } func (repo *UserRepo) Health(ctx context.Context) error { return repo.Eventstore.Health(ctx) } func (repo *UserRepo) MyUser(ctx context.Context) (*model.UserView, error) { return repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) } func (repo *UserRepo) MyProfile(ctx context.Context) (*model.Profile, error) { user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) if err != nil { return nil, err } if user.HumanView == nil { return nil, errors.ThrowPreconditionFailed(nil, "EVENT-H2JIT", "Errors.User.NotHuman") } return user.GetProfile() } func (repo *UserRepo) SearchMyExternalIDPs(ctx context.Context, request *model.ExternalIDPSearchRequest) (*model.ExternalIDPSearchResponse, error) { err := request.EnsureLimit(repo.SearchLimit) if err != nil { return nil, err } sequence, seqErr := repo.View.GetLatestExternalIDPSequence() logging.Log("EVENT-5Jsi8").OnError(seqErr).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Warn("could not read latest user sequence") request.AppendUserQuery(authz.GetCtxData(ctx).UserID) externalIDPS, count, err := repo.View.SearchExternalIDPs(request) if err != nil { return nil, err } result := &model.ExternalIDPSearchResponse{ Offset: request.Offset, Limit: request.Limit, TotalResult: count, Result: usr_view_model.ExternalIDPViewsToModel(externalIDPS), } if seqErr == nil { result.Sequence = sequence.CurrentSequence result.Timestamp = sequence.LastSuccessfulSpoolerRun } return result, nil } func (repo *UserRepo) MyEmail(ctx context.Context) (*model.Email, error) { user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) if err != nil { return nil, err } if user.HumanView == nil { return nil, errors.ThrowPreconditionFailed(nil, "EVENT-oGRpc", "Errors.User.NotHuman") } return user.GetEmail() } func (repo *UserRepo) MyPhone(ctx context.Context) (*model.Phone, error) { user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) if err != nil { return nil, err } if user.HumanView == nil { return nil, errors.ThrowPreconditionFailed(nil, "EVENT-DTWJb", "Errors.User.NotHuman") } return user.GetPhone() } func (repo *UserRepo) MyAddress(ctx context.Context) (*model.Address, error) { user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) if err != nil { return nil, err } if user.HumanView == nil { return nil, errors.ThrowPreconditionFailed(nil, "EVENT-Ok9nI", "Errors.User.NotHuman") } return user.GetAddress() } func (repo *UserRepo) MyUserMFAs(ctx context.Context) ([]*model.MultiFactor, error) { user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) if err != nil { return nil, err } mfas := make([]*model.MultiFactor, 0) if user.OTPState != model.MFAStateUnspecified { mfas = append(mfas, &model.MultiFactor{Type: model.MFATypeOTP, State: user.OTPState}) } for _, u2f := range user.U2FTokens { mfas = append(mfas, &model.MultiFactor{Type: model.MFATypeU2F, State: u2f.State, Attribute: u2f.Name, ID: u2f.TokenID}) } return mfas, nil } func (repo *UserRepo) GetMyPasswordless(ctx context.Context) ([]*model.WebAuthNView, error) { user, err := repo.UserByID(ctx, authz.GetCtxData(ctx).UserID) if err != nil { return nil, err } if user.HumanView == nil { return nil, errors.ThrowPreconditionFailed(nil, "USER-9kF98", "Errors.User.NotHuman") } return user.HumanView.PasswordlessTokens, nil } func (repo *UserRepo) UserSessionUserIDsByAgentID(ctx context.Context, agentID string) ([]string, error) { userSessions, err := repo.View.UserSessionsByAgentID(agentID) if err != nil { return nil, err } userIDs := make([]string, 0, len(userSessions)) for _, session := range userSessions { if session.State == int32(domain.UserSessionStateActive) { userIDs = append(userIDs, session.UserID) } } return userIDs, nil } func (repo *UserRepo) UserByID(ctx context.Context, id string) (*model.UserView, error) { user, err := repo.View.UserByID(id) if err != nil { return nil, err } events, err := repo.getUserEvents(ctx, id, user.Sequence) if err != nil { logging.Log("EVENT-PSoc3").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events") return usr_view_model.UserToModel(user, repo.PrefixAvatarURL), nil } userCopy := *user for _, event := range events { if err := userCopy.AppendEvent(event); err != nil { return usr_view_model.UserToModel(user, repo.PrefixAvatarURL), nil } } if userCopy.State == int32(model.UserStateDeleted) { return nil, errors.ThrowNotFound(nil, "EVENT-vZ8us", "Errors.User.NotFound") } return usr_view_model.UserToModel(&userCopy, repo.PrefixAvatarURL), nil } func (repo *UserRepo) UserEventsByID(ctx context.Context, id string, sequence uint64) ([]*models.Event, error) { return repo.getUserEvents(ctx, id, sequence) } func (repo *UserRepo) UserByLoginName(ctx context.Context, loginname string) (*model.UserView, error) { user, err := repo.View.UserByLoginName(loginname) if err != nil { return nil, err } events, err := repo.getUserEvents(ctx, user.ID, user.Sequence) if err != nil { logging.Log("EVENT-PSoc3").WithError(err).WithField("traceID", tracing.TraceIDFromCtx(ctx)).Debug("error retrieving new events") return usr_view_model.UserToModel(user, repo.PrefixAvatarURL), nil } userCopy := *user for _, event := range events { if err := userCopy.AppendEvent(event); err != nil { return usr_view_model.UserToModel(user, repo.PrefixAvatarURL), nil } } if userCopy.State == int32(model.UserStateDeleted) { return nil, errors.ThrowNotFound(nil, "EVENT-vZ8us", "Errors.User.NotFound") } return usr_view_model.UserToModel(&userCopy, repo.PrefixAvatarURL), nil } func (repo *UserRepo) MyUserChanges(ctx context.Context, lastSequence uint64, limit uint64, sortAscending bool, retention time.Duration) (*model.UserChanges, error) { changes, err := repo.getUserChanges(ctx, authz.GetCtxData(ctx).UserID, lastSequence, limit, sortAscending, retention) if err != nil { return nil, err } for _, change := range changes.Changes { change.ModifierName = change.ModifierID change.ModifierLoginName = change.ModifierID user, _ := repo.UserByID(ctx, change.ModifierID) if user != nil { change.ModifierLoginName = user.PreferredLoginName if user.HumanView != nil { change.ModifierName = user.HumanView.DisplayName change.ModifierAvatarURL = user.HumanView.AvatarURL } if user.MachineView != nil { change.ModifierName = user.MachineView.Name } } } return changes, nil } func (repo *UserRepo) SearchUsers(ctx context.Context, request *model.UserSearchRequest) (*model.UserSearchResponse, error) { sequence, sequenceErr := repo.View.GetLatestUserSequence() logging.Log("EVENT-Gdgsw").OnError(sequenceErr).Warn("could not read latest user sequence") users, count, err := repo.View.SearchUsers(request) if err != nil { return nil, err } result := &model.UserSearchResponse{ Offset: request.Offset, Limit: request.Limit, TotalResult: count, Result: usr_view_model.UsersToModel(users, repo.PrefixAvatarURL), } if sequenceErr == nil { result.Sequence = sequence.CurrentSequence result.Timestamp = sequence.LastSuccessfulSpoolerRun } return result, nil } func (r *UserRepo) getUserChanges(ctx context.Context, userID string, lastSequence uint64, limit uint64, sortAscending bool, retention time.Duration) (*model.UserChanges, error) { query := usr_view.ChangesQuery(userID, lastSequence, limit, sortAscending, retention) events, err := r.Eventstore.FilterEvents(ctx, query) if err != nil { logging.Log("EVENT-g9HCv").WithError(err).Warn("eventstore unavailable") return nil, errors.ThrowInternal(err, "EVENT-htuG9", "Errors.Internal") } if len(events) == 0 { return nil, errors.ThrowNotFound(nil, "EVENT-6cAxe", "Errors.User.NoChanges") } result := make([]*model.UserChange, len(events)) for i, event := range events { creationDate, err := ptypes.TimestampProto(event.CreationDate) logging.Log("EVENT-8GTGS").OnError(err).Debug("unable to parse timestamp") change := &model.UserChange{ ChangeDate: creationDate, EventType: event.Type.String(), ModifierID: event.EditorUser, Sequence: event.Sequence, } //TODO: now all types should be unmarshalled, e.g. password // if len(event.Data) != 0 { // user := new(model.User) // err := json.Unmarshal(event.Data, user) // logging.Log("EVENT-Rkg7X").OnError(err).Debug("unable to unmarshal data") // change.Data = user // } result[i] = change if lastSequence < event.Sequence { lastSequence = event.Sequence } } return &model.UserChanges{ Changes: result, LastSequence: lastSequence, }, nil } func (r *UserRepo) getUserEvents(ctx context.Context, userID string, sequence uint64) ([]*models.Event, error) { query, err := usr_view.UserByIDQuery(userID, sequence) if err != nil { return nil, err } return r.Eventstore.FilterEvents(ctx, query) } func (repo *UserRepo) GetMyMetadataByKey(ctx context.Context, key string) (*domain.Metadata, error) { ctxData := authz.GetCtxData(ctx) data, err := repo.View.MetadataByKeyAndResourceOwner(ctxData.UserID, ctxData.ResourceOwner, key) if err != nil { return nil, err } return iam_model.MetadataViewToDomain(data), nil } func (repo *UserRepo) SearchUserMetadata(ctx context.Context, userID string) (*domain.MetadataSearchResponse, error) { req := new(domain.MetadataSearchRequest) return repo.searchUserMetadata(userID, "", req) } func (repo *UserRepo) SearchMyMetadata(ctx context.Context, req *domain.MetadataSearchRequest) (*domain.MetadataSearchResponse, error) { ctxData := authz.GetCtxData(ctx) err := req.EnsureLimit(repo.SearchLimit) if err != nil { return nil, err } return repo.searchUserMetadata(ctxData.UserID, ctxData.ResourceOwner, req) } func (repo *UserRepo) searchUserMetadata(userID, resourceOwner string, req *domain.MetadataSearchRequest) (*domain.MetadataSearchResponse, error) { sequence, sequenceErr := repo.View.GetLatestUserSequence() logging.Log("EVENT-N9fsd").OnError(sequenceErr).Warn("could not read latest user sequence") req.AppendAggregateIDQuery(userID) if resourceOwner != "" { req.AppendResourceOwnerQuery(resourceOwner) } metadata, count, err := repo.View.SearchMetadata(req) if err != nil { return nil, err } result := &domain.MetadataSearchResponse{ Offset: req.Offset, Limit: req.Limit, TotalResult: count, Result: iam_model.MetadataViewsToDomain(metadata), } if sequenceErr == nil { result.Sequence = sequence.CurrentSequence result.Timestamp = sequence.LastSuccessfulSpoolerRun } return result, nil }