feat: reset projections and remove failed events (#2770)

* feat: change failed events to new projection

* feat: change failed events to new projection

* feat: change current sequences to new projection

* feat: add tests

* Update internal/api/grpc/admin/failed_event.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* Update internal/api/grpc/admin/view.go

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix: truncate

* fix reset

* fix reset

* Rename V1.102__queries.sql to V1.103__queries.sql

* improve current_sequence and truncate view tables

* check sub tables of view are tables

* Update internal/query/current_sequence_test.go

Co-authored-by: Silvan <silvan.reusser@gmail.com>

* fixes and use squirrel

* missing error handling

* lock before reset

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
Fabi
2021-12-16 14:44:26 +01:00
committed by GitHub
parent d2ea9a1b8c
commit a43e1fc34a
15 changed files with 821 additions and 40 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"database/sql"
errs "errors"
"fmt"
"time"
sq "github.com/Masterminds/squirrel"
@@ -12,11 +13,182 @@ import (
"github.com/caos/zitadel/internal/query/projection"
)
const (
lockStmtFormat = "INSERT INTO %[1]s" +
" (locker_id, locked_until, projection_name) VALUES ($1, now()+$2::INTERVAL, $3)" +
" ON CONFLICT (projection_name)" +
" DO UPDATE SET locker_id = $1, locked_until = now()+$2::INTERVAL"
lockerIDReset = "reset"
)
type LatestSequence struct {
Sequence uint64
Timestamp time.Time
}
type CurrentSequences struct {
SearchResponse
CurrentSequences []*CurrentSequence
}
type CurrentSequence struct {
ProjectionName string
CurrentSequence uint64
Timestamp time.Time
}
type CurrentSequencesSearchQueries struct {
SearchRequest
Queries []SearchQuery
}
func (q *CurrentSequencesSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
query = q.SearchRequest.toQuery(query)
for _, q := range q.Queries {
query = q.toQuery(query)
}
return query
}
func (q *Queries) SearchCurrentSequences(ctx context.Context, queries *CurrentSequencesSearchQueries) (failedEvents *CurrentSequences, err error) {
query, scan := prepareCurrentSequencesQuery()
stmt, args, err := queries.toQuery(query).ToSql()
if err != nil {
return nil, errors.ThrowInvalidArgument(err, "QUERY-MmFef", "Errors.Query.InvalidRequest")
}
rows, err := q.client.QueryContext(ctx, stmt, args...)
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-22H8f", "Errors.Internal")
}
return scan(rows)
}
func (q *Queries) latestSequence(ctx context.Context, projections ...table) (*LatestSequence, error) {
query, scan := prepareLatestSequence()
or := make(sq.Or, len(projections))
for i, projection := range projections {
or[i] = sq.Eq{CurrentSequenceColProjectionName.identifier(): projection.name}
}
stmt, args, err := query.
Where(or).
OrderBy(CurrentSequenceColCurrentSequence.identifier()).
ToSql()
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-5CfX9", "Errors.Query.SQLStatement")
}
row := q.client.QueryRowContext(ctx, stmt, args...)
return scan(row)
}
func (q *Queries) ClearCurrentSequence(ctx context.Context, projectionName string) (err error) {
err = q.checkAndLock(ctx, projectionName)
if err != nil {
return err
}
tx, err := q.client.Begin()
if err != nil {
return errors.ThrowInternal(err, "QUERY-9iOpr", "Errors.RemoveFailed")
}
tables, err := tablesForReset(ctx, tx, projectionName)
if err != nil {
return err
}
err = reset(tx, tables, projectionName)
if err != nil {
return err
}
return tx.Commit()
}
func (q *Queries) checkAndLock(ctx context.Context, projectionName string) error {
projectionQuery, args, err := sq.Select("count(*)").
From("[show tables from zitadel.projections]").
Where(
sq.And{
sq.NotEq{"table_name": []string{"locks", "current_sequences", "failed_events"}},
sq.Eq{"concat('zitadel.projections.', table_name)": projectionName},
}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return errors.ThrowInternal(err, "QUERY-Dfwf2", "Errors.ProjectionName.Invalid")
}
row := q.client.QueryRowContext(ctx, projectionQuery, args...)
var count int
if err := row.Scan(&count); err != nil || count == 0 {
return errors.ThrowInternal(err, "QUERY-ej8fn", "Errors.ProjectionName.Invalid")
}
lock := fmt.Sprintf(lockStmtFormat, locksTable.identifier())
if err != nil {
return errors.ThrowInternal(err, "QUERY-DVfg3", "Errors.RemoveFailed")
}
//lock for twice the default duration (10s)
res, err := q.client.ExecContext(ctx, lock, lockerIDReset, 20*time.Second, projectionName)
if err != nil {
return errors.ThrowInternal(err, "QUERY-WEfr2", "Errors.RemoveFailed")
}
rows, err := res.RowsAffected()
if err != nil || rows == 0 {
return errors.ThrowInternal(err, "QUERY-Bh3ws", "Errors.RemoveFailed")
}
time.Sleep(7 * time.Second) //more than twice the default lock duration (10s)
return nil
}
func tablesForReset(ctx context.Context, tx *sql.Tx, projectionName string) ([]string, error) {
tablesQuery, args, err := sq.Select("concat('zitadel.projections.', table_name)").
From("[show tables from zitadel.projections]").
Where(
sq.And{
sq.Eq{"type": "table"},
sq.NotEq{"table_name": []string{"locks", "current_sequences", "failed_events"}},
sq.Like{"concat('zitadel.projections.', table_name)": projectionName + "%"},
}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-ASff2", "Errors.ProjectionName.Invalid")
}
var tables []string
rows, err := tx.QueryContext(ctx, tablesQuery, args...)
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-Dgfw", "Errors.ProjectionName.Invalid")
}
for rows.Next() {
var tableName string
if err := rows.Scan(&tableName); err != nil {
return nil, errors.ThrowInternal(err, "QUERY-ej8fn", "Errors.ProjectionName.Invalid")
}
tables = append(tables, tableName)
}
return tables, nil
}
func reset(tx *sql.Tx, tables []string, projectionName string) error {
for _, tableName := range tables {
_, err := tx.Exec(fmt.Sprintf("TRUNCATE %s cascade", tableName))
if err != nil {
return errors.ThrowInternal(err, "QUERY-3n92f", "Errors.RemoveFailed")
}
}
update, args, err := sq.Update(currentSequencesTable.identifier()).
Set(CurrentSequenceColCurrentSequence.name, 0).
Where(sq.Eq{CurrentSequenceColProjectionName.name: projectionName}).
PlaceholderFormat(sq.Dollar).
ToSql()
if err != nil {
return errors.ThrowInternal(err, "QUERY-Ff3tw", "Errors.RemoveFailed")
}
_, err = tx.Exec(update, args...)
if err != nil {
return errors.ThrowInternal(err, "QUERY-NFiws", "Errors.RemoveFailed")
}
return nil
}
func prepareLatestSequence() (sq.SelectBuilder, func(*sql.Row) (*LatestSequence, error)) {
return sq.Select(
CurrentSequenceColCurrentSequence.identifier(),
@@ -38,22 +210,43 @@ func prepareLatestSequence() (sq.SelectBuilder, func(*sql.Row) (*LatestSequence,
}
}
func (q *Queries) latestSequence(ctx context.Context, projections ...table) (*LatestSequence, error) {
query, scan := prepareLatestSequence()
or := make(sq.Or, len(projections))
for i, projection := range projections {
or[i] = sq.Eq{CurrentSequenceColProjectionName.identifier(): projection.name}
}
stmt, args, err := query.
Where(or).
OrderBy(CurrentSequenceColCurrentSequence.identifier()).
ToSql()
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-5CfX9", "Errors.Query.SQLStatement")
}
func prepareCurrentSequencesQuery() (sq.SelectBuilder, func(*sql.Rows) (*CurrentSequences, error)) {
return sq.Select(
"max("+CurrentSequenceColCurrentSequence.identifier()+") as "+CurrentSequenceColCurrentSequence.name,
"max("+CurrentSequenceColTimestamp.identifier()+") as "+CurrentSequenceColTimestamp.name,
CurrentSequenceColProjectionName.identifier(),
countColumn.identifier()).
From(currentSequencesTable.identifier()).
GroupBy(CurrentSequenceColProjectionName.identifier()).
PlaceholderFormat(sq.Dollar),
func(rows *sql.Rows) (*CurrentSequences, error) {
currentSequences := make([]*CurrentSequence, 0)
var count uint64
for rows.Next() {
currentSequence := new(CurrentSequence)
err := rows.Scan(
&currentSequence.CurrentSequence,
&currentSequence.Timestamp,
&currentSequence.ProjectionName,
&count,
)
if err != nil {
return nil, err
}
currentSequences = append(currentSequences, currentSequence)
}
row := q.client.QueryRowContext(ctx, stmt, args...)
return scan(row)
if err := rows.Close(); err != nil {
return nil, errors.ThrowInternal(err, "QUERY-jbJ77", "Errors.Query.CloseRows")
}
return &CurrentSequences{
CurrentSequences: currentSequences,
SearchResponse: SearchResponse{
Count: count,
},
}, nil
}
}
var (
@@ -77,3 +270,21 @@ var (
table: currentSequencesTable,
}
)
var (
locksTable = table{
name: projection.LocksTable,
}
LocksColLockerID = Column{
name: "locker_id",
table: locksTable,
}
LocksColUntil = Column{
name: "locked_until",
table: locksTable,
}
LocksColProjectionName = Column{
name: "projection_name",
table: locksTable,
}
)

View File

@@ -0,0 +1,156 @@
package query
import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"regexp"
"testing"
)
func Test_CurrentSequencesPrepares(t *testing.T) {
type want struct {
sqlExpectations sqlExpectation
err checkErr
}
tests := []struct {
name string
prepare interface{}
want want
object interface{}
}{
{
name: "prepareCurrentSequencesQuery no result",
prepare: prepareCurrentSequencesQuery,
want: want{
sqlExpectations: mockQueries(
regexp.QuoteMeta(`SELECT max(projections.current_sequences.current_sequence) as current_sequence,`+
` max(projections.current_sequences.timestamp) as timestamp,`+
` projections.current_sequences.projection_name,`+
` COUNT(*) OVER ()`+
` FROM projections.current_sequences`+
` GROUP BY projections.current_sequences.projection_name`),
nil,
nil,
),
},
object: &CurrentSequences{CurrentSequences: []*CurrentSequence{}},
},
{
name: "prepareCurrentSequencesQuery one result",
prepare: prepareCurrentSequencesQuery,
want: want{
sqlExpectations: mockQueries(
regexp.QuoteMeta(`SELECT max(projections.current_sequences.current_sequence) as current_sequence,`+
` max(projections.current_sequences.timestamp) as timestamp,`+
` projections.current_sequences.projection_name,`+
` COUNT(*) OVER ()`+
` FROM projections.current_sequences`+
` GROUP BY projections.current_sequences.projection_name`),
[]string{
"current_sequence",
"timestamp",
"projection_name",
"count",
},
[][]driver.Value{
{
uint64(20211108),
testNow,
"projection-name",
},
},
),
},
object: &CurrentSequences{
SearchResponse: SearchResponse{
Count: 1,
},
CurrentSequences: []*CurrentSequence{
{
Timestamp: testNow,
CurrentSequence: 20211108,
ProjectionName: "projection-name",
},
},
},
},
{
name: "prepareCurrentSequencesQuery multiple result",
prepare: prepareCurrentSequencesQuery,
want: want{
sqlExpectations: mockQueries(
regexp.QuoteMeta(`SELECT max(projections.current_sequences.current_sequence) as current_sequence,`+
` max(projections.current_sequences.timestamp) as timestamp,`+
` projections.current_sequences.projection_name,`+
` COUNT(*) OVER ()`+
` FROM projections.current_sequences`+
` GROUP BY projections.current_sequences.projection_name`),
[]string{
"current_sequence",
"timestamp",
"projection_name",
"count",
},
[][]driver.Value{
{
uint64(20211108),
testNow,
"projection-name",
},
{
uint64(20211108),
testNow,
"projection-name-2",
},
},
),
},
object: &CurrentSequences{
SearchResponse: SearchResponse{
Count: 2,
},
CurrentSequences: []*CurrentSequence{
{
Timestamp: testNow,
CurrentSequence: 20211108,
ProjectionName: "projection-name",
},
{
Timestamp: testNow,
CurrentSequence: 20211108,
ProjectionName: "projection-name-2",
},
},
},
},
{
name: "prepareCurrentSequencesQuery sql err",
prepare: prepareCurrentSequencesQuery,
want: want{
sqlExpectations: mockQueryErr(
regexp.QuoteMeta(`SELECT max(projections.current_sequences.current_sequence) as current_sequence,`+
` max(projections.current_sequences.timestamp) as timestamp,`+
` projections.current_sequences.projection_name,`+
` COUNT(*) OVER ()`+
` FROM projections.current_sequences`+
` GROUP BY projections.current_sequences.projection_name`),
sql.ErrConnDone,
),
err: func(err error) (error, bool) {
if !errors.Is(err, sql.ErrConnDone) {
return fmt.Errorf("err should be sql.ErrConnDone got: %w", err), false
}
return nil, true
},
},
object: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assertPrepare(t, tt.prepare, tt.object, tt.want.sqlExpectations, tt.want.err)
})
}
}

View File

@@ -0,0 +1,173 @@
package query
import (
"context"
"database/sql"
errs "errors"
sq "github.com/Masterminds/squirrel"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/query/projection"
)
const (
failedEventsColumnProjectionName = "projection_name"
failedEventsColumnFailedSequence = "failed_sequence"
failedEventsColumnFailureCount = "failure_count"
failedEventsColumnError = "error"
)
var (
failedEventsTable = table{
name: projection.FailedEventsTable,
}
FailedEventsColumnProjectionName = Column{
name: failedEventsColumnProjectionName,
table: failedEventsTable,
}
FailedEventsColumnFailedSequence = Column{
name: failedEventsColumnFailedSequence,
table: failedEventsTable,
}
FailedEventsColumnFailureCount = Column{
name: failedEventsColumnFailureCount,
table: failedEventsTable,
}
FailedEventsColumnError = Column{
name: failedEventsColumnError,
table: failedEventsTable,
}
)
type FailedEvents struct {
SearchResponse
FailedEvents []*FailedEvent
}
type FailedEvent struct {
ProjectionName string
FailedSequence uint64
FailureCount uint64
Error string
}
type FailedEventSearchQueries struct {
SearchRequest
Queries []SearchQuery
}
func (q *Queries) SearchFailedEvents(ctx context.Context, queries *FailedEventSearchQueries) (failedEvents *FailedEvents, err error) {
query, scan := prepareFailedEventsQuery()
stmt, args, err := queries.toQuery(query).ToSql()
if err != nil {
return nil, errors.ThrowInvalidArgument(err, "QUERY-n8rjJ", "Errors.Query.InvalidRequest")
}
rows, err := q.client.QueryContext(ctx, stmt, args...)
if err != nil {
return nil, errors.ThrowInternal(err, "QUERY-3j99J", "Errors.Internal")
}
return scan(rows)
}
func (q *Queries) RemoveFailedEvent(ctx context.Context, projectionName string, sequence uint64) (err error) {
stmt, args, err := sq.Delete(projection.FailedEventsTable).
Where(sq.Eq{
failedEventsColumnProjectionName: projectionName,
failedEventsColumnFailedSequence: sequence,
}).ToSql()
if err != nil {
return errors.ThrowInternal(err, "QUERY-DGgh3", "Errors.RemoveFailed")
}
_, err = q.client.Exec(stmt, args...)
if err != nil {
return errors.ThrowInternal(err, "QUERY-0kbFF", "Errors.RemoveFailed")
}
return nil
}
func NewFailedEventProjectionNameSearchQuery(method TextComparison, value string) (SearchQuery, error) {
return NewTextQuery(FailedEventsColumnProjectionName, value, method)
}
func (r *ProjectSearchQueries) AppendProjectionNameQuery(projectionName string) error {
query, err := NewProjectResourceOwnerSearchQuery(projectionName)
if err != nil {
return err
}
r.Queries = append(r.Queries, query)
return nil
}
func (q *FailedEventSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
query = q.SearchRequest.toQuery(query)
for _, q := range q.Queries {
query = q.toQuery(query)
}
return query
}
func prepareFailedEventQuery() (sq.SelectBuilder, func(*sql.Row) (*FailedEvent, error)) {
return sq.Select(
FailedEventsColumnProjectionName.identifier(),
FailedEventsColumnFailedSequence.identifier(),
FailedEventsColumnFailureCount.identifier(),
FailedEventsColumnError.identifier()).
From(failedEventsTable.identifier()).PlaceholderFormat(sq.Dollar),
func(row *sql.Row) (*FailedEvent, error) {
p := new(FailedEvent)
err := row.Scan(
&p.ProjectionName,
&p.FailedSequence,
&p.FailureCount,
&p.Error,
)
if err != nil {
if errs.Is(err, sql.ErrNoRows) {
return nil, errors.ThrowNotFound(err, "QUERY-5N00f", "Errors.FailedEvents.NotFound")
}
return nil, errors.ThrowInternal(err, "QUERY-0oJf3", "Errors.Internal")
}
return p, nil
}
}
func prepareFailedEventsQuery() (sq.SelectBuilder, func(*sql.Rows) (*FailedEvents, error)) {
return sq.Select(
FailedEventsColumnProjectionName.identifier(),
FailedEventsColumnFailedSequence.identifier(),
FailedEventsColumnFailureCount.identifier(),
FailedEventsColumnError.identifier(),
countColumn.identifier()).
From(failedEventsTable.identifier()).PlaceholderFormat(sq.Dollar),
func(rows *sql.Rows) (*FailedEvents, error) {
failedEvents := make([]*FailedEvent, 0)
var count uint64
for rows.Next() {
failedEvent := new(FailedEvent)
err := rows.Scan(
&failedEvent.ProjectionName,
&failedEvent.FailedSequence,
&failedEvent.FailureCount,
&failedEvent.Error,
&count,
)
if err != nil {
return nil, err
}
failedEvents = append(failedEvents, failedEvent)
}
if err := rows.Close(); err != nil {
return nil, errors.ThrowInternal(err, "QUERY-En99f", "Errors.Query.CloseRows")
}
return &FailedEvents{
FailedEvents: failedEvents,
SearchResponse: SearchResponse{
Count: count,
},
}, nil
}
}

View File

@@ -0,0 +1,164 @@
package query
import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"regexp"
"testing"
)
func Test_FailedEventsPrepares(t *testing.T) {
type want struct {
sqlExpectations sqlExpectation
err checkErr
}
tests := []struct {
name string
prepare interface{}
want want
object interface{}
}{
{
name: "prepareFailedEventsQuery no result",
prepare: prepareFailedEventsQuery,
want: want{
sqlExpectations: mockQueries(
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
` projections.failed_events.failed_sequence,`+
` projections.failed_events.failure_count,`+
` projections.failed_events.error,`+
` COUNT(*) OVER ()`+
` FROM projections.failed_events`),
nil,
nil,
),
},
object: &FailedEvents{FailedEvents: []*FailedEvent{}},
},
{
name: "prepareFailedEventsQuery one result",
prepare: prepareFailedEventsQuery,
want: want{
sqlExpectations: mockQueries(
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
` projections.failed_events.failed_sequence,`+
` projections.failed_events.failure_count,`+
` projections.failed_events.error,`+
` COUNT(*) OVER ()`+
` FROM projections.failed_events`),
[]string{
"projection_name",
"failed_sequence",
"failure_count",
"error",
"count",
},
[][]driver.Value{
{
"projection-name",
uint64(20211108),
uint64(2),
"error",
},
},
),
},
object: &FailedEvents{
SearchResponse: SearchResponse{
Count: 1,
},
FailedEvents: []*FailedEvent{
{
ProjectionName: "projection-name",
FailedSequence: 20211108,
FailureCount: 2,
Error: "error",
},
},
},
},
{
name: "prepareFailedEventsQuery multiple result",
prepare: prepareFailedEventsQuery,
want: want{
sqlExpectations: mockQueries(
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
` projections.failed_events.failed_sequence,`+
` projections.failed_events.failure_count,`+
` projections.failed_events.error,`+
` COUNT(*) OVER ()`+
` FROM projections.failed_events`),
[]string{
"projection_name",
"failed_sequence",
"failure_count",
"error",
"count",
},
[][]driver.Value{
{
"projection-name",
uint64(20211108),
2,
"error",
},
{
"projection-name-2",
uint64(20211108),
2,
"error",
},
},
),
},
object: &FailedEvents{
SearchResponse: SearchResponse{
Count: 2,
},
FailedEvents: []*FailedEvent{
{
ProjectionName: "projection-name",
FailedSequence: 20211108,
FailureCount: 2,
Error: "error",
},
{
ProjectionName: "projection-name-2",
FailedSequence: 20211108,
FailureCount: 2,
Error: "error",
},
},
},
},
{
name: "prepareFailedEventsQuery sql err",
prepare: prepareFailedEventsQuery,
want: want{
sqlExpectations: mockQueryErr(
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
` projections.failed_events.failed_sequence,`+
` projections.failed_events.failure_count,`+
` projections.failed_events.error,`+
` COUNT(*) OVER ()`+
` FROM projections.failed_events`),
sql.ErrConnDone,
),
err: func(err error) (error, bool) {
if !errors.Is(err, sql.ErrConnDone) {
return fmt.Errorf("err should be sql.ErrConnDone got: %w", err), false
}
return nil, true
},
},
object: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assertPrepare(t, tt.prepare, tt.object, tt.want.sqlExpectations, tt.want.err)
})
}
}

View File

@@ -0,0 +1,8 @@
package projection
const (
FailedEventsColumnProjectionName = "projection_name"
FailedEventsColumnFailedSequence = "failed_sequence"
FailedEventsColumnFailureCount = "failure_count"
FailedEventsColumnError = "error"
)

View File

@@ -13,8 +13,8 @@ import (
const (
CurrentSeqTable = "projections.current_sequences"
locksTable = "projections.locks"
failedEventsTable = "projections.failed_events"
LocksTable = "projections.locks"
FailedEventsTable = "projections.failed_events"
)
func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, config Config, defaults systemdefaults.SystemDefaults) error {
@@ -28,8 +28,8 @@ func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, co
},
Client: sqlClient,
SequenceTable: CurrentSeqTable,
LockTable: locksTable,
FailedEventsTable: failedEventsTable,
LockTable: LocksTable,
FailedEventsTable: FailedEventsTable,
MaxFailureCount: config.MaxFailureCount,
BulkLimit: config.BulkLimit,
}