milestone snapshot

This commit is contained in:
Tim Möhlmann 2024-10-10 14:27:53 +03:00
parent f77c904319
commit f95cb96a98
12 changed files with 400 additions and 144 deletions

View File

@ -170,6 +170,7 @@ func startZitadel(ctx context.Context, config *Config, masterKey string, server
config.Eventstore.Pusher = new_es.NewEventstore(esPusherDBClient)
config.Eventstore.Searcher = new_es.NewEventstore(queryDBClient)
config.Eventstore.Snapshotter = new_es.NewEventstore(queryDBClient)
config.Eventstore.Querier = old_es.NewCRDB(queryDBClient)
eventstoreClient := eventstore.NewEventstore(config.Eventstore)
eventstoreV4 := es_v4.NewEventstoreFromOne(es_v4_pg.New(queryDBClient, &es_v4_pg.Config{

View File

@ -2,23 +2,29 @@ package admin
import (
"context"
"slices"
"github.com/zitadel/zitadel/internal/api/authz"
object_pb "github.com/zitadel/zitadel/internal/api/grpc/object"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/pkg/grpc/admin"
)
func (s *Server) ListMilestones(ctx context.Context, req *admin.ListMilestonesRequest) (*admin.ListMilestonesResponse, error) {
queries, err := listMilestonesToModel(authz.GetInstance(ctx).InstanceID(), req)
reached, err := milestoneQueriesReached(req.GetQueries())
if err != nil {
return nil, err
}
resp, err := s.query.SearchMilestones(ctx, []string{authz.GetInstance(ctx).InstanceID()}, queries)
model, err := s.query.GetMilestones(ctx)
if err != nil {
return nil, err
}
if reached {
model.Milestones = slices.DeleteFunc(model.Milestones, func(milestone *query.Milestone) bool {
return milestone.ReachedDate.IsZero()
})
}
return &admin.ListMilestonesResponse{
Result: milestoneViewsToPb(resp.Milestones),
Details: object_pb.ToListDetails(resp.Count, resp.Sequence, resp.LastRun),
Result: milestoneViewsToPb(model.Milestones),
Details: object_pb.ToListDetails(uint64(len(model.Milestones)), model.ProcessedSequence, model.ChangeDate),
}, nil
}

View File

@ -3,63 +3,22 @@ package admin
import (
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/zitadel/zitadel/internal/api/grpc/object"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/zerrors"
admin_pb "github.com/zitadel/zitadel/pkg/grpc/admin"
milestone_pb "github.com/zitadel/zitadel/pkg/grpc/milestone"
)
func listMilestonesToModel(instanceID string, req *admin_pb.ListMilestonesRequest) (*query.MilestonesSearchQueries, error) {
offset, limit, asc := object.ListQueryToModel(req.Query)
queries, err := milestoneQueriesToModel(req.GetQueries())
instanceIDQuery, err := query.NewTextQuery(query.MilestoneInstanceIDColID, instanceID, query.TextEquals)
if err != nil {
return nil, err
}
queries = append(queries, instanceIDQuery)
return &query.MilestonesSearchQueries{
SearchRequest: query.SearchRequest{
Offset: offset,
Limit: limit,
Asc: asc,
SortingColumn: milestoneFieldNameToSortingColumn(req.SortingColumn),
},
Queries: queries,
}, nil
}
func milestoneQueriesToModel(queries []*milestone_pb.MilestoneQuery) (q []query.SearchQuery, err error) {
q = make([]query.SearchQuery, len(queries))
for i, query := range queries {
q[i], err = milestoneQueryToModel(query)
if err != nil {
return nil, err
func milestoneQueriesReached(queries []*milestone_pb.MilestoneQuery) (reached bool, err error) {
for _, query := range queries {
switch q := query.Query.(type) {
case *milestone_pb.MilestoneQuery_IsReachedQuery:
reached = q.IsReachedQuery.GetReached()
default:
return false, zerrors.ThrowInvalidArgument(nil, "ADMIN-sE7pc", "List.Query.Invalid")
}
}
return q, nil
}
func milestoneQueryToModel(milestoneQuery *milestone_pb.MilestoneQuery) (query.SearchQuery, error) {
switch q := milestoneQuery.Query.(type) {
case *milestone_pb.MilestoneQuery_IsReachedQuery:
if q.IsReachedQuery.GetReached() {
return query.NewNotNullQuery(query.MilestoneReachedDateColID)
}
return query.NewIsNullQuery(query.MilestoneReachedDateColID)
default:
return nil, zerrors.ThrowInvalidArgument(nil, "ADMIN-sE7pc", "List.Query.Invalid")
}
}
func milestoneFieldNameToSortingColumn(field milestone_pb.MilestoneFieldName) query.Column {
switch field {
case milestone_pb.MilestoneFieldName_MILESTONE_FIELD_NAME_REACHED_DATE:
return query.MilestoneReachedDateColID
default:
return query.MilestoneTypeColID
}
return reached, nil
}
func milestoneViewsToPb(milestones []*query.Milestone) []*milestone_pb.Milestone {

View File

@ -8,7 +8,8 @@ type Config struct {
PushTimeout time.Duration
MaxRetries uint32
Pusher Pusher
Querier Querier
Searcher Searcher
Pusher Pusher
Querier Querier
Searcher Searcher
Snapshotter Snapshotter
}

View File

@ -44,7 +44,7 @@ type Snapshot[T any] struct {
Object T
}
// NewSnapshot returns an empty snapshot ready for Get.
// NewSnapshot returns an empty snapshot ready for Populate.
func NewSnapshot[T any](aggregate *Aggregate) *Snapshot[T] {
var object T
return &Snapshot[T]{
@ -56,7 +56,7 @@ func NewSnapshot[T any](aggregate *Aggregate) *Snapshot[T] {
}
}
// SnapshotFromWriteModel returns a populated snapshot ready for Set.
// SnapshotFromWriteModel returns a populated snapshot ready for Store.
func SnapshotFromWriteModel[T any](model *WriteModel, object T) *Snapshot[T] {
return &Snapshot[T]{
SnapshotBase: SnapshotBase{
@ -70,7 +70,7 @@ func SnapshotFromWriteModel[T any](model *WriteModel, object T) *Snapshot[T] {
}
}
// SnapshotFromReadModel returns a populated snapshot ready for Set.
// SnapshotFromReadModel returns a populated snapshot ready for Store.
func SnapshotFromReadModel[T any](model *ReadModel, object T) *Snapshot[T] {
return &Snapshot[T]{
SnapshotBase: SnapshotBase{
@ -84,7 +84,7 @@ func SnapshotFromReadModel[T any](model *ReadModel, object T) *Snapshot[T] {
}
}
func (s *Snapshot[T]) Set(ctx context.Context, repo Snapshotter) (err error) {
func (s *Snapshot[T]) Store(ctx context.Context, repo Snapshotter) (err error) {
payload, err := json.Marshal(s.Object)
if err != nil {
return err
@ -95,7 +95,7 @@ func (s *Snapshot[T]) Set(ctx context.Context, repo Snapshotter) (err error) {
})
}
func (s *Snapshot[T]) Get(ctx context.Context, repo Snapshotter) (err error) {
func (s *Snapshot[T]) Populate(ctx context.Context, repo Snapshotter) (err error) {
data, err := repo.GetSnapshot(ctx, s.InstanceID, NewSnapshotType(s.Object), s.AggregateID)
if err != nil {
return err

View File

@ -9,6 +9,7 @@ import (
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/api/call"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/query/projection"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
@ -136,3 +137,24 @@ func prepareMilestonesQuery(ctx context.Context, db prepareDatabase) (sq.SelectB
}, nil
}
}
func (q *Queries) GetMilestones(ctx context.Context) (_ *MilestoneReadModel, err error) {
model, err := q.GetMilestoneSnapshot(ctx, nil)
if err != nil {
return nil, err
}
if model.allDone() {
return model, nil
}
err = q.eventstore.FilterToQueryReducer(ctx, model)
if err != nil {
return nil, err
}
if model.Position > model.startPosition {
err = eventstore.SnapshotFromReadModel(&model.ReadModel, model).Store(ctx, q.eventstore)
if err != nil {
return nil, err
}
}
return model, nil
}

View File

@ -0,0 +1,231 @@
package query
import (
"cmp"
"context"
"slices"
"strconv"
"github.com/zitadel/zitadel/internal/api/authz"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/milestone"
"github.com/zitadel/zitadel/internal/repository/oidcsession"
"github.com/zitadel/zitadel/internal/repository/project"
)
type MilestoneReadModel struct {
eventstore.ReadModel `json:"-"`
startPosition float64
systemUsers map[string]*authz.SystemAPIUser
Milestones []*Milestone
IgnoredClientIDs []string
}
func NewMilestoneReadModel(ctx context.Context, systemUsers map[string]*authz.SystemAPIUser) *MilestoneReadModel {
instanceID := authz.GetInstance(ctx).InstanceID()
milestones := make([]*Milestone, len(milestone.TypeValues()))
for i, typ := range milestone.TypeValues() {
milestones[i] = &Milestone{
InstanceID: instanceID,
Type: typ,
}
}
return &MilestoneReadModel{
ReadModel: eventstore.ReadModel{
AggregateID: "",
InstanceID: instanceID,
ResourceOwner: instanceID,
},
Milestones: milestones,
}
}
func (m *MilestoneReadModel) Reduce() error {
for _, event := range m.Events {
switch e := event.(type) {
case *instance.InstanceAddedEvent:
m.reduceReached(milestone.InstanceCreated, event)
case *instance.DomainPrimarySetEvent:
for _, milestone := range m.Milestones {
milestone.PrimaryDomain = e.Domain
}
case *project.ProjectAddedEvent:
m.reduceReachedIfUserEvent(milestone.ProjectCreated, event)
case *project.ApplicationAddedEvent:
m.reduceReachedIfUserEvent(milestone.ApplicationCreated, event)
case *project.OIDCConfigAddedEvent:
m.reduceAppConfigAdded(event, e.ClientID)
case *project.APIConfigAddedEvent:
m.reduceAppConfigAdded(event, e.ClientID)
case *oidcsession.AddedEvent:
m.reduceReached(milestone.AuthenticationSucceededOnInstance, event)
// Ignore authentications without session, for example JWT profile,
if e.SessionID == "" {
m.reduceReached(milestone.AuthenticationSucceededOnApplication, event)
}
case *instance.InstanceRemovedEvent:
m.reduceReached(milestone.InstanceDeleted, event)
case *milestone.PushedEvent:
}
}
return m.ReadModel.Reduce()
}
func (m *MilestoneReadModel) Query() *eventstore.SearchQueryBuilder {
var (
instanceEvents []eventstore.EventType
projectEvents []eventstore.EventType
oidcsessionEvent eventstore.EventType
)
if m.notReached(milestone.InstanceCreated) {
instanceEvents = append(instanceEvents, instance.InstanceAddedEventType)
}
if m.emptyDomain() {
instanceEvents = append(instanceEvents, instance.InstanceDomainPrimarySetEventType)
}
if m.notReached(milestone.AuthenticationSucceededOnInstance) {
oidcsessionEvent = oidcsession.AddedType
}
if m.notReached(milestone.ProjectCreated) {
projectEvents = append(projectEvents, project.ProjectAddedType)
}
if m.notReached(milestone.ApplicationCreated) {
projectEvents = append(projectEvents, project.ApplicationAddedType)
}
if m.notReached(milestone.AuthenticationSucceededOnApplication) {
projectEvents = append(projectEvents, project.OIDCConfigAddedType, project.APIConfigAddedType)
oidcsessionEvent = oidcsession.AddedType
}
if m.notReached(milestone.InstanceDeleted) {
instanceEvents = append(instanceEvents, instance.InstanceRemovedEventType)
}
builder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent)
builder.InstanceID(m.InstanceID)
if len(instanceEvents) > 0 {
builder = builder.AddQuery().
AggregateTypes(instance.AggregateType).
EventTypes(instanceEvents...).
PositionAfter(m.Position).
Builder()
}
if len(projectEvents) > 0 {
builder = builder.AddQuery().
AggregateTypes(project.AggregateType).
EventTypes(projectEvents...).
PositionAfter(m.Position).
Builder()
}
if oidcsessionEvent != "" {
builder = builder.AddQuery().
AggregateTypes(oidcsession.AggregateType).
EventTypes(oidcsessionEvent).
PositionAfter(m.Position).
Builder()
}
if m.unPushed() {
builder = builder.AddQuery().
AggregateTypes(milestone.AggregateType).
EventTypes(milestone.PushedEventType).
PositionAfter(m.Position).
Builder()
}
return builder
}
func (m *MilestoneReadModel) reduceReached(typ milestone.Type, event eventstore.Event) {
milestone, ok := m.getMilestone(typ)
if ok && milestone.ReachedDate.IsZero() {
milestone.ReachedDate = event.CreatedAt()
}
}
func (m *MilestoneReadModel) reduceReachedIfUserEvent(typ milestone.Type, event eventstore.Event) {
if !m.isSystemEvent(event) {
m.reduceReached(typ, event)
}
}
func (m *MilestoneReadModel) reduceAppConfigAdded(event eventstore.Event, clientID string) {
if !m.isSystemEvent(event) {
return
}
milestone, ok := m.getMilestone(milestone.AuthenticationSucceededOnApplication)
if ok && milestone.ReachedDate.IsZero() {
m.IgnoredClientIDs = append(m.IgnoredClientIDs, clientID)
}
}
func (m *MilestoneReadModel) isSystemEvent(event eventstore.Event) bool {
if userId, err := strconv.Atoi(event.Creator()); err == nil && userId > 0 {
return false
}
// check if it is a hard coded event creator
for _, creator := range []string{"", "system", "OIDC", "LOGIN", "SYSTEM"} {
if creator == event.Creator() {
return true
}
}
_, ok := m.systemUsers[event.Creator()]
return ok
}
func (m *MilestoneReadModel) reducePushed(e *milestone.PushedEvent) {
milestone, ok := m.getMilestone(e.MilestoneType)
if ok {
milestone.PushedDate = e.CreatedAt()
}
}
func (m *MilestoneReadModel) allDone() bool {
return !slices.ContainsFunc(m.Milestones, func(milestone *Milestone) bool {
// contains unfinished milestone
return milestone.PrimaryDomain == "" ||
milestone.ReachedDate.IsZero() ||
milestone.PushedDate.IsZero()
})
}
func (m *MilestoneReadModel) emptyDomain() bool {
return slices.ContainsFunc(m.Milestones, func(milestone *Milestone) bool {
// contains milestone without domain
return milestone.PrimaryDomain == ""
})
}
func (m *MilestoneReadModel) notReached(typ milestone.Type) bool {
milestone, ok := m.getMilestone(typ)
return ok && milestone.ReachedDate.IsZero()
}
func (m *MilestoneReadModel) unPushed() bool {
return slices.ContainsFunc(m.Milestones, func(milestone *Milestone) bool {
// contains un-pushed milestone
return !milestone.ReachedDate.IsZero() && milestone.PushedDate.IsZero()
})
}
func (m *MilestoneReadModel) getMilestone(typ milestone.Type) (*Milestone, bool) {
i, ok := slices.BinarySearchFunc(m.Milestones, typ, func(m *Milestone, typ milestone.Type) int {
return cmp.Compare(m.Type, typ)
})
if ok {
return m.Milestones[i], ok
}
return nil, ok
}
func (q *Queries) GetMilestoneSnapshot(ctx context.Context, systemUsers map[string]*authz.SystemAPIUser) (*MilestoneReadModel, error) {
model := NewMilestoneReadModel(ctx, systemUsers)
err := eventstore.SnapshotFromReadModel(&model.ReadModel, model).Populate(ctx, q.eventstore)
if err != nil {
return nil, err
}
model.startPosition = model.Position
return model, nil
}

View File

@ -119,7 +119,7 @@ func (p *milestoneProjection) reduceInstanceAdded(event eventstore.Event) (*hand
if err != nil {
return nil, err
}
allTypes := milestone.AllTypes()
allTypes := milestone.TypeValues()
statements := make([]func(eventstore.Event) handler.Exec, 0, len(allTypes))
for _, msType := range allTypes {
createColumns := []handler.Column{

View File

@ -1,59 +1,14 @@
//go:generate stringer -type Type
//go:generate enumer -type Type -json
package milestone
import (
"fmt"
"strings"
)
type Type int
const (
unknown Type = iota
InstanceCreated
InstanceCreated Type = iota + 1
AuthenticationSucceededOnInstance
ProjectCreated
ApplicationCreated
AuthenticationSucceededOnApplication
InstanceDeleted
typesCount
)
func AllTypes() []Type {
types := make([]Type, typesCount-1)
for i := Type(1); i < typesCount; i++ {
types[i-1] = i
}
return types
}
func (t *Type) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%s"`, t.String())), nil
}
func (t *Type) UnmarshalJSON(data []byte) error {
*t = typeFromString(strings.Trim(string(data), `"`))
return nil
}
func typeFromString(t string) Type {
switch t {
case InstanceCreated.String():
return InstanceCreated
case AuthenticationSucceededOnInstance.String():
return AuthenticationSucceededOnInstance
case ProjectCreated.String():
return ProjectCreated
case ApplicationCreated.String():
return ApplicationCreated
case AuthenticationSucceededOnApplication.String():
return AuthenticationSucceededOnApplication
case InstanceDeleted.String():
return InstanceDeleted
default:
return unknown
}
}

View File

@ -0,0 +1,113 @@
// Code generated by "enumer -type Type -json"; DO NOT EDIT.
package milestone
import (
"encoding/json"
"fmt"
"strings"
)
const _TypeName = "InstanceCreatedAuthenticationSucceededOnInstanceProjectCreatedApplicationCreatedAuthenticationSucceededOnApplicationInstanceDeleted"
var _TypeIndex = [...]uint8{0, 15, 48, 62, 80, 116, 131}
const _TypeLowerName = "instancecreatedauthenticationsucceededoninstanceprojectcreatedapplicationcreatedauthenticationsucceededonapplicationinstancedeleted"
func (i Type) String() string {
i -= 1
if i < 0 || i >= Type(len(_TypeIndex)-1) {
return fmt.Sprintf("Type(%d)", i+1)
}
return _TypeName[_TypeIndex[i]:_TypeIndex[i+1]]
}
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
func _TypeNoOp() {
var x [1]struct{}
_ = x[InstanceCreated-(1)]
_ = x[AuthenticationSucceededOnInstance-(2)]
_ = x[ProjectCreated-(3)]
_ = x[ApplicationCreated-(4)]
_ = x[AuthenticationSucceededOnApplication-(5)]
_ = x[InstanceDeleted-(6)]
}
var _TypeValues = []Type{InstanceCreated, AuthenticationSucceededOnInstance, ProjectCreated, ApplicationCreated, AuthenticationSucceededOnApplication, InstanceDeleted}
var _TypeNameToValueMap = map[string]Type{
_TypeName[0:15]: InstanceCreated,
_TypeLowerName[0:15]: InstanceCreated,
_TypeName[15:48]: AuthenticationSucceededOnInstance,
_TypeLowerName[15:48]: AuthenticationSucceededOnInstance,
_TypeName[48:62]: ProjectCreated,
_TypeLowerName[48:62]: ProjectCreated,
_TypeName[62:80]: ApplicationCreated,
_TypeLowerName[62:80]: ApplicationCreated,
_TypeName[80:116]: AuthenticationSucceededOnApplication,
_TypeLowerName[80:116]: AuthenticationSucceededOnApplication,
_TypeName[116:131]: InstanceDeleted,
_TypeLowerName[116:131]: InstanceDeleted,
}
var _TypeNames = []string{
_TypeName[0:15],
_TypeName[15:48],
_TypeName[48:62],
_TypeName[62:80],
_TypeName[80:116],
_TypeName[116:131],
}
// TypeString retrieves an enum value from the enum constants string name.
// Throws an error if the param is not part of the enum.
func TypeString(s string) (Type, error) {
if val, ok := _TypeNameToValueMap[s]; ok {
return val, nil
}
if val, ok := _TypeNameToValueMap[strings.ToLower(s)]; ok {
return val, nil
}
return 0, fmt.Errorf("%s does not belong to Type values", s)
}
// TypeValues returns all values of the enum
func TypeValues() []Type {
return _TypeValues
}
// TypeStrings returns a slice of all String values of the enum
func TypeStrings() []string {
strs := make([]string, len(_TypeNames))
copy(strs, _TypeNames)
return strs
}
// IsAType returns "true" if the value is listed in the enum definition. "false" otherwise
func (i Type) IsAType() bool {
for _, v := range _TypeValues {
if i == v {
return true
}
}
return false
}
// MarshalJSON implements the json.Marshaler interface for Type
func (i Type) MarshalJSON() ([]byte, error) {
return json.Marshal(i.String())
}
// UnmarshalJSON implements the json.Unmarshaler interface for Type
func (i *Type) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return fmt.Errorf("Type should be a string, got %s", data)
}
var err error
*i, err = TypeString(s)
return err
}

View File

@ -1,30 +0,0 @@
// Code generated by "stringer -type Type"; DO NOT EDIT.
package milestone
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[unknown-0]
_ = x[InstanceCreated-1]
_ = x[AuthenticationSucceededOnInstance-2]
_ = x[ProjectCreated-3]
_ = x[ApplicationCreated-4]
_ = x[AuthenticationSucceededOnApplication-5]
_ = x[InstanceDeleted-6]
_ = x[typesCount-7]
}
const _Type_name = "unknownInstanceCreatedAuthenticationSucceededOnInstanceProjectCreatedApplicationCreatedAuthenticationSucceededOnApplicationInstanceDeletedtypesCount"
var _Type_index = [...]uint8{0, 7, 22, 55, 69, 87, 123, 138, 148}
func (i Type) String() string {
if i < 0 || i >= Type(len(_Type_index)-1) {
return "Type(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _Type_name[_Type_index[i]:_Type_index[i+1]]
}

View File

@ -9213,10 +9213,8 @@ message ActivateFeatureLoginDefaultOrgResponse {
}
message ListMilestonesRequest {
//list limitations and ordering
zitadel.v1.ListQuery query = 1;
// the field the result is sorted
zitadel.milestone.v1.MilestoneFieldName sorting_column = 2;
// limit, ordering and sorting is not supported anymore
reserved 1, 2;
//criteria the client is looking for
repeated zitadel.milestone.v1.MilestoneQuery queries = 3;
}