zitadel/internal/eventstore/handler/statement.go
Elio Bischof bb756482c7
feat: push telemetry (#6027)
* document analytics config

* rework configuration and docs

* describe HandleActiveInstances better

* describe active instances on quotas better

* only projected events are considered

* cleanup

* describe changes at runtime

* push milestones

* stop tracking events

* calculate and push 4 in 6 milestones

* reduce milestone pushed

* remove docs

* fix scheduled pseudo event projection

* push 5 in 6 milestones

* push 6 in 6 milestones

* ignore client ids

* fix text array contains

* push human readable milestone type

* statement unit tests

* improve dev and db performance

* organize imports

* cleanup

* organize imports

* test projection

* check rows.Err()

* test search query

* pass linting

* review

* test 4 milestones

* simplify milestone by instance ids query

* use type NamespacedCondition

* cleanup

* lint

* lint

* dont overwrite original error

* no opt-in in examples

* cleanup

* prerelease

* enable request headers

* make limit configurable

* review fixes

* only requeue special handlers secondly

* include integration tests

* Revert "include integration tests"

This reverts commit 96db9504ec.

* pass reducers

* test handlers

* fix unit test

* feat: increment version

* lint

* remove prerelease

* fix integration tests
2023-07-06 08:38:13 +02:00

79 lines
1.8 KiB
Go

package handler
import (
"database/sql"
"encoding/json"
"errors"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
)
var (
ErrNoProjection = errors.New("no projection")
ErrNoValues = errors.New("no values")
ErrNoCondition = errors.New("no condition")
ErrSomeStmtsFailed = errors.New("some statements failed")
)
type Statements []Statement
func (stmts Statements) Len() int { return len(stmts) }
func (stmts Statements) Swap(i, j int) { stmts[i], stmts[j] = stmts[j], stmts[i] }
func (stmts Statements) Less(i, j int) bool { return stmts[i].Sequence < stmts[j].Sequence }
type Statement struct {
AggregateType eventstore.AggregateType
Sequence uint64
PreviousSequence uint64
InstanceID string
Execute func(ex Executer, projectionName string) error
}
func (s *Statement) IsNoop() bool {
return s.Execute == nil
}
type Executer interface {
Exec(string, ...interface{}) (sql.Result, error)
}
type Column struct {
Name string
Value interface{}
ParameterOpt func(string) string
}
func NewCol(name string, value interface{}) Column {
return Column{
Name: name,
Value: value,
}
}
func NewJSONCol(name string, value interface{}) Column {
marshalled, err := json.Marshal(value)
if err != nil {
logging.WithFields("column", name).WithError(err).Panic("unable to marshal column")
}
return NewCol(name, marshalled)
}
type Condition func(param string) (string, interface{})
type NamespacedCondition func(namespace string) Condition
func NewCond(name string, value interface{}) Condition {
return func(param string) (string, interface{}) {
return name + " = " + param, value
}
}
func NewNamespacedCondition(name string, value interface{}) NamespacedCondition {
return func(namespace string) Condition {
return NewCond(namespace+"."+name, value)
}
}