mirror of
https://github.com/zitadel/zitadel.git
synced 2024-12-15 12:27:59 +00:00
1d84635836
# Which Problems Are Solved To improve performance a new table and method is implemented on eventstore. The goal of this table is to index searchable fields on command side to use it on command and query side. The table allows to store one primitive value (numeric, text) per row. The eventstore framework is extended by the `Search`-method which allows to search for objects. The `Command`-interface is extended by the `SearchOperations()`-method which does manipulate the the `search`-table. # How the Problems Are Solved This PR adds the capability of improving performance for command and query side by using the `Search`-method of the eventstore instead of using one of the `Filter`-methods. # Open Tasks - [x] Add feature flag - [x] Unit tests - [ ] ~~Benchmarks if needed~~ - [x] Ensure no behavior change - [x] Add setup step to fill table with current data - [x] Add projection which ensures data added between setup and start of the new version are also added to the table # Additional Changes The `Search`-method is currently used by `ProjectGrant`-command side. # Additional Context - Closes https://github.com/zitadel/zitadel/issues/8094
368 lines
9.8 KiB
Go
368 lines
9.8 KiB
Go
package eventstore
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
_ "embed"
|
|
"encoding/json"
|
|
"reflect"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/zitadel/zitadel/internal/api/authz"
|
|
"github.com/zitadel/zitadel/internal/eventstore"
|
|
"github.com/zitadel/zitadel/internal/telemetry/tracing"
|
|
"github.com/zitadel/zitadel/internal/zerrors"
|
|
)
|
|
|
|
type fieldValue struct {
|
|
value []byte
|
|
}
|
|
|
|
func (value *fieldValue) Unmarshal(ptr any) error {
|
|
return json.Unmarshal(value.value, ptr)
|
|
}
|
|
|
|
func (es *Eventstore) FillFields(ctx context.Context, events ...eventstore.FillFieldsEvent) (err error) {
|
|
ctx, span := tracing.NewSpan(ctx)
|
|
defer span.End()
|
|
|
|
tx, err := es.client.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return
|
|
}
|
|
err = tx.Commit()
|
|
}()
|
|
|
|
return handleFieldFillEvents(ctx, tx, events)
|
|
}
|
|
|
|
// Search implements the [eventstore.Search] method
|
|
func (es *Eventstore) Search(ctx context.Context, conditions ...map[eventstore.FieldType]any) (result []*eventstore.SearchResult, err error) {
|
|
ctx, span := tracing.NewSpan(ctx)
|
|
defer span.EndWithError(err)
|
|
|
|
var builder strings.Builder
|
|
args := buildSearchStatement(ctx, &builder, conditions...)
|
|
|
|
err = es.client.QueryContext(
|
|
ctx,
|
|
func(rows *sql.Rows) error {
|
|
for rows.Next() {
|
|
var (
|
|
res eventstore.SearchResult
|
|
value fieldValue
|
|
)
|
|
err = rows.Scan(
|
|
&res.Aggregate.InstanceID,
|
|
&res.Aggregate.ResourceOwner,
|
|
&res.Aggregate.Type,
|
|
&res.Aggregate.ID,
|
|
&res.Object.Type,
|
|
&res.Object.ID,
|
|
&res.Object.Revision,
|
|
&res.FieldName,
|
|
&value.value,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
res.Value = &value
|
|
|
|
result = append(result, &res)
|
|
}
|
|
return nil
|
|
},
|
|
builder.String(),
|
|
args...,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
const searchQueryPrefix = `SELECT instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value FROM eventstore.fields WHERE instance_id = $1`
|
|
|
|
func buildSearchStatement(ctx context.Context, builder *strings.Builder, conditions ...map[eventstore.FieldType]any) []any {
|
|
args := make([]any, 0, len(conditions)*4+1)
|
|
args = append(args, authz.GetInstance(ctx).InstanceID())
|
|
|
|
builder.WriteString(searchQueryPrefix)
|
|
|
|
builder.WriteString(" AND ")
|
|
if len(conditions) > 1 {
|
|
builder.WriteRune('(')
|
|
}
|
|
for i, condition := range conditions {
|
|
if i > 0 {
|
|
builder.WriteString(" OR ")
|
|
}
|
|
if len(condition) > 1 {
|
|
builder.WriteRune('(')
|
|
}
|
|
args = append(args, buildSearchCondition(builder, len(args)+1, condition)...)
|
|
if len(condition) > 1 {
|
|
builder.WriteRune(')')
|
|
}
|
|
}
|
|
if len(conditions) > 1 {
|
|
builder.WriteRune(')')
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
func buildSearchCondition(builder *strings.Builder, index int, conditions map[eventstore.FieldType]any) []any {
|
|
args := make([]any, 0, len(conditions))
|
|
|
|
orderedCondition := make([]eventstore.FieldType, 0, len(conditions))
|
|
for field := range conditions {
|
|
orderedCondition = append(orderedCondition, field)
|
|
}
|
|
slices.Sort(orderedCondition)
|
|
|
|
for _, field := range orderedCondition {
|
|
if len(args) > 0 {
|
|
builder.WriteString(" AND ")
|
|
}
|
|
builder.WriteString(fieldNameByType(field, conditions[field]))
|
|
builder.WriteString(" = $")
|
|
builder.WriteString(strconv.Itoa(index + len(args)))
|
|
args = append(args, conditions[field])
|
|
}
|
|
|
|
return args
|
|
}
|
|
|
|
func handleFieldCommands(ctx context.Context, tx *sql.Tx, commands []eventstore.Command) error {
|
|
for _, command := range commands {
|
|
if len(command.Fields()) > 0 {
|
|
if err := handleFieldOperations(ctx, tx, command.Fields()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleFieldFillEvents(ctx context.Context, tx *sql.Tx, events []eventstore.FillFieldsEvent) error {
|
|
for _, event := range events {
|
|
if len(event.Fields()) > 0 {
|
|
if err := handleFieldOperations(ctx, tx, event.Fields()); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleFieldOperations(ctx context.Context, tx *sql.Tx, operations []*eventstore.FieldOperation) error {
|
|
for _, operation := range operations {
|
|
if operation.Set != nil {
|
|
if err := handleFieldSet(ctx, tx, operation.Set); err != nil {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
if operation.Remove != nil {
|
|
if err := handleSearchDelete(ctx, tx, operation.Remove); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func handleFieldSet(ctx context.Context, tx *sql.Tx, field *eventstore.Field) error {
|
|
if len(field.UpsertConflictFields) == 0 {
|
|
return handleSearchInsert(ctx, tx, field)
|
|
}
|
|
return handleSearchUpsert(ctx, tx, field)
|
|
}
|
|
|
|
const (
|
|
insertField = `INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)`
|
|
)
|
|
|
|
func handleSearchInsert(ctx context.Context, tx *sql.Tx, field *eventstore.Field) error {
|
|
value, err := json.Marshal(field.Value.Value)
|
|
if err != nil {
|
|
return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value")
|
|
}
|
|
_, err = tx.ExecContext(
|
|
ctx,
|
|
insertField,
|
|
|
|
field.Aggregate.InstanceID,
|
|
field.Aggregate.ResourceOwner,
|
|
field.Aggregate.Type,
|
|
field.Aggregate.ID,
|
|
field.Object.Type,
|
|
field.Object.ID,
|
|
field.Object.Revision,
|
|
field.FieldName,
|
|
value,
|
|
field.Value.MustBeUnique,
|
|
field.Value.ShouldIndex,
|
|
)
|
|
return err
|
|
}
|
|
|
|
const (
|
|
fieldsUpsertPrefix = `WITH upsert AS (UPDATE eventstore.fields SET (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) = ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) WHERE `
|
|
fieldsUpsertSuffix = ` RETURNING * ) INSERT INTO eventstore.fields (instance_id, resource_owner, aggregate_type, aggregate_id, object_type, object_id, object_revision, field_name, value, value_must_be_unique, should_index) SELECT $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 WHERE NOT EXISTS (SELECT 1 FROM upsert)`
|
|
)
|
|
|
|
func handleSearchUpsert(ctx context.Context, tx *sql.Tx, field *eventstore.Field) error {
|
|
value, err := json.Marshal(field.Value.Value)
|
|
if err != nil {
|
|
return zerrors.ThrowInvalidArgument(err, "V3-fcrW1", "unable to marshal field value")
|
|
}
|
|
|
|
_, err = tx.ExecContext(
|
|
ctx,
|
|
writeUpsertField(field.UpsertConflictFields),
|
|
|
|
field.Aggregate.InstanceID,
|
|
field.Aggregate.ResourceOwner,
|
|
field.Aggregate.Type,
|
|
field.Aggregate.ID,
|
|
field.Object.Type,
|
|
field.Object.ID,
|
|
field.Object.Revision,
|
|
field.FieldName,
|
|
value,
|
|
field.Value.MustBeUnique,
|
|
field.Value.ShouldIndex,
|
|
)
|
|
return err
|
|
}
|
|
|
|
func writeUpsertField(fields []eventstore.FieldType) string {
|
|
var builder strings.Builder
|
|
|
|
builder.WriteString(fieldsUpsertPrefix)
|
|
for i, fieldName := range fields {
|
|
if i > 0 {
|
|
builder.WriteString(" AND ")
|
|
}
|
|
name, index := searchFieldNameAndIndexByTypeForPush(fieldName)
|
|
|
|
builder.WriteString(name)
|
|
builder.WriteString(" = ")
|
|
builder.WriteString(index)
|
|
}
|
|
builder.WriteString(fieldsUpsertSuffix)
|
|
|
|
return builder.String()
|
|
}
|
|
|
|
const removeSearch = `DELETE FROM eventstore.fields WHERE `
|
|
|
|
func handleSearchDelete(ctx context.Context, tx *sql.Tx, clauses map[eventstore.FieldType]any) error {
|
|
if len(clauses) == 0 {
|
|
return zerrors.ThrowInvalidArgument(nil, "V3-oqlBZ", "no conditions")
|
|
}
|
|
stmt, args := writeDeleteField(clauses)
|
|
_, err := tx.ExecContext(ctx, stmt, args...)
|
|
return err
|
|
}
|
|
|
|
func writeDeleteField(clauses map[eventstore.FieldType]any) (string, []any) {
|
|
var (
|
|
builder strings.Builder
|
|
args = make([]any, 0, len(clauses))
|
|
)
|
|
builder.WriteString(removeSearch)
|
|
|
|
orderedCondition := make([]eventstore.FieldType, 0, len(clauses))
|
|
for field := range clauses {
|
|
orderedCondition = append(orderedCondition, field)
|
|
}
|
|
slices.Sort(orderedCondition)
|
|
|
|
for _, fieldName := range orderedCondition {
|
|
if len(args) > 0 {
|
|
builder.WriteString(" AND ")
|
|
}
|
|
builder.WriteString(fieldNameByType(fieldName, clauses[fieldName]))
|
|
|
|
builder.WriteString(" = $")
|
|
builder.WriteString(strconv.Itoa(len(args) + 1))
|
|
|
|
args = append(args, clauses[fieldName])
|
|
}
|
|
|
|
return builder.String(), args
|
|
}
|
|
|
|
func fieldNameByType(typ eventstore.FieldType, value any) string {
|
|
switch typ {
|
|
case eventstore.FieldTypeAggregateID:
|
|
return "aggregate_id"
|
|
case eventstore.FieldTypeAggregateType:
|
|
return "aggregate_type"
|
|
case eventstore.FieldTypeInstanceID:
|
|
return "instance_id"
|
|
case eventstore.FieldTypeResourceOwner:
|
|
return "resource_owner"
|
|
case eventstore.FieldTypeFieldName:
|
|
return "field_name"
|
|
case eventstore.FieldTypeObjectType:
|
|
return "object_type"
|
|
case eventstore.FieldTypeObjectID:
|
|
return "object_id"
|
|
case eventstore.FieldTypeObjectRevision:
|
|
return "object_revision"
|
|
case eventstore.FieldTypeValue:
|
|
return valueColumn(value)
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func searchFieldNameAndIndexByTypeForPush(typ eventstore.FieldType) (string, string) {
|
|
switch typ {
|
|
case eventstore.FieldTypeInstanceID:
|
|
return "instance_id", "$1"
|
|
case eventstore.FieldTypeResourceOwner:
|
|
return "resource_owner", "$2"
|
|
case eventstore.FieldTypeAggregateType:
|
|
return "aggregate_type", "$3"
|
|
case eventstore.FieldTypeAggregateID:
|
|
return "aggregate_id", "$4"
|
|
case eventstore.FieldTypeObjectType:
|
|
return "object_type", "$5"
|
|
case eventstore.FieldTypeObjectID:
|
|
return "object_id", "$6"
|
|
case eventstore.FieldTypeObjectRevision:
|
|
return "object_revision", "$7"
|
|
case eventstore.FieldTypeFieldName:
|
|
return "field_name", "$8"
|
|
case eventstore.FieldTypeValue:
|
|
return "value", "$9"
|
|
}
|
|
return "", ""
|
|
}
|
|
|
|
func valueColumn(value any) string {
|
|
//nolint: exhaustive
|
|
switch reflect.TypeOf(value).Kind() {
|
|
case reflect.Bool:
|
|
return "bool_value"
|
|
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64, reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Float32, reflect.Float64:
|
|
return "number_value"
|
|
case reflect.String:
|
|
return "text_value"
|
|
}
|
|
return ""
|
|
}
|