mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-14 02:27:34 +00:00
feat: save last occurrence of failed events and fix instance filtering (#4710)
* fix: filter failed events and current sequence correctly * fix failed events sorting column * feat: save last occurrence of failed event * fix failedEvents query and update sql statements * change sql statement to only create index * fix linting * fix linting * Update internal/query/failed_events.go Co-authored-by: Silvan <silvan.reusser@gmail.com> * update job name on test-docs to match the one from test-code Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
@@ -7,8 +7,8 @@ import (
|
||||
)
|
||||
|
||||
type AdministratorRepository interface {
|
||||
GetFailedEvents(context.Context) ([]*model.FailedEvent, error)
|
||||
GetFailedEvents(ctx context.Context, instanceID string) ([]*model.FailedEvent, error)
|
||||
RemoveFailedEvent(context.Context, *model.FailedEvent) error
|
||||
GetViews() ([]*model.View, error)
|
||||
GetViews(instanceID string) ([]*model.View, error)
|
||||
ClearView(ctx context.Context, db, viewName string) error
|
||||
}
|
||||
|
@@ -14,10 +14,10 @@ type AdministratorRepo struct {
|
||||
View *view.View
|
||||
}
|
||||
|
||||
func (repo *AdministratorRepo) GetFailedEvents(ctx context.Context) ([]*view_model.FailedEvent, error) {
|
||||
func (repo *AdministratorRepo) GetFailedEvents(ctx context.Context, instanceID string) ([]*view_model.FailedEvent, error) {
|
||||
allFailedEvents := make([]*view_model.FailedEvent, 0)
|
||||
for _, db := range dbList {
|
||||
failedEvents, err := repo.View.AllFailedEvents(db)
|
||||
failedEvents, err := repo.View.AllFailedEvents(db, instanceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -32,10 +32,10 @@ func (repo *AdministratorRepo) RemoveFailedEvent(ctx context.Context, failedEven
|
||||
return repo.View.RemoveFailedEvent(failedEvent.Database, repository.FailedEventFromModel(failedEvent))
|
||||
}
|
||||
|
||||
func (repo *AdministratorRepo) GetViews() ([]*view_model.View, error) {
|
||||
func (repo *AdministratorRepo) GetViews(instanceID string) ([]*view_model.View, error) {
|
||||
views := make([]*view_model.View, 0)
|
||||
for _, db := range dbList {
|
||||
sequences, err := repo.View.AllCurrentSequences(db)
|
||||
sequences, err := repo.View.AllCurrentSequences(db, instanceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -21,6 +21,6 @@ func (v *View) latestFailedEvent(viewName, instanceID string, sequence uint64) (
|
||||
return repository.LatestFailedEvent(v.Db, errTable, viewName, instanceID, sequence)
|
||||
}
|
||||
|
||||
func (v *View) AllFailedEvents(db string) ([]*repository.FailedEvent, error) {
|
||||
return repository.AllFailedEvents(v.Db, db+"."+errColumn)
|
||||
func (v *View) AllFailedEvents(db, instanceID string) ([]*repository.FailedEvent, error) {
|
||||
return repository.AllFailedEvents(v.Db, db+"."+errColumn, instanceID)
|
||||
}
|
||||
|
@@ -23,8 +23,8 @@ func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repos
|
||||
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...)
|
||||
}
|
||||
|
||||
func (v *View) AllCurrentSequences(db string) ([]*repository.CurrentSequence, error) {
|
||||
return repository.AllCurrentSequences(v.Db, db+".current_sequences")
|
||||
func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) {
|
||||
return repository.AllCurrentSequences(v.Db, db+".current_sequences", instanceID)
|
||||
}
|
||||
|
||||
func (v *View) updateSpoolerRunSequence(viewName string) error {
|
||||
|
@@ -3,18 +3,25 @@ package admin
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
admin_pb "github.com/zitadel/zitadel/pkg/grpc/admin"
|
||||
)
|
||||
|
||||
func (s *Server) ListFailedEvents(ctx context.Context, req *admin_pb.ListFailedEventsRequest) (*admin_pb.ListFailedEventsResponse, error) {
|
||||
failedEventsOld, err := s.administrator.GetFailedEvents(ctx)
|
||||
func (s *Server) ListFailedEvents(ctx context.Context, _ *admin_pb.ListFailedEventsRequest) (*admin_pb.ListFailedEventsResponse, error) {
|
||||
instanceID := authz.GetInstance(ctx).InstanceID()
|
||||
failedEventsOld, err := s.administrator.GetFailedEvents(ctx, instanceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
convertedOld := FailedEventsViewToPb(failedEventsOld)
|
||||
|
||||
failedEvents, err := s.query.SearchFailedEvents(ctx, new(query.FailedEventSearchQueries))
|
||||
instanceIDQuery, err := query.NewFailedEventInstanceIDSearchQuery(instanceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
failedEvents, err := s.query.SearchFailedEvents(ctx, &query.FailedEventSearchQueries{
|
||||
Queries: []query.SearchQuery{instanceIDQuery},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -25,9 +32,9 @@ func (s *Server) ListFailedEvents(ctx context.Context, req *admin_pb.ListFailedE
|
||||
func (s *Server) RemoveFailedEvent(ctx context.Context, req *admin_pb.RemoveFailedEventRequest) (*admin_pb.RemoveFailedEventResponse, error) {
|
||||
var err error
|
||||
if req.Database != s.database {
|
||||
err = s.administrator.RemoveFailedEvent(ctx, RemoveFailedEventRequestToModel(req))
|
||||
err = s.administrator.RemoveFailedEvent(ctx, RemoveFailedEventRequestToModel(ctx, req))
|
||||
} else {
|
||||
err = s.query.RemoveFailedEvent(ctx, req.ViewName, req.FailedSequence)
|
||||
err = s.query.RemoveFailedEvent(ctx, req.ViewName, authz.GetInstance(ctx).InstanceID(), req.FailedSequence)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -1,6 +1,11 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/view/model"
|
||||
admin_pb "github.com/zitadel/zitadel/pkg/grpc/admin"
|
||||
@@ -15,12 +20,17 @@ func FailedEventsViewToPb(failedEvents []*model.FailedEvent) []*admin_pb.FailedE
|
||||
}
|
||||
|
||||
func FailedEventViewToPb(failedEvent *model.FailedEvent) *admin_pb.FailedEvent {
|
||||
var lastFailed *timestamppb.Timestamp
|
||||
if !failedEvent.LastFailed.IsZero() {
|
||||
lastFailed = timestamppb.New(failedEvent.LastFailed)
|
||||
}
|
||||
return &admin_pb.FailedEvent{
|
||||
Database: failedEvent.Database,
|
||||
ViewName: failedEvent.ViewName,
|
||||
FailedSequence: failedEvent.FailedSequence,
|
||||
FailureCount: failedEvent.FailureCount,
|
||||
ErrorMessage: failedEvent.ErrMsg,
|
||||
LastFailed: lastFailed,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,19 +43,25 @@ func FailedEventsToPb(database string, failedEvents *query.FailedEvents) []*admi
|
||||
}
|
||||
|
||||
func FailedEventToPb(database string, failedEvent *query.FailedEvent) *admin_pb.FailedEvent {
|
||||
var lastFailed *timestamppb.Timestamp
|
||||
if !failedEvent.LastFailed.IsZero() {
|
||||
lastFailed = timestamppb.New(failedEvent.LastFailed)
|
||||
}
|
||||
return &admin_pb.FailedEvent{
|
||||
Database: database,
|
||||
ViewName: failedEvent.ProjectionName,
|
||||
FailedSequence: failedEvent.FailedSequence,
|
||||
FailureCount: failedEvent.FailureCount,
|
||||
ErrorMessage: failedEvent.Error,
|
||||
LastFailed: lastFailed,
|
||||
}
|
||||
}
|
||||
|
||||
func RemoveFailedEventRequestToModel(req *admin_pb.RemoveFailedEventRequest) *model.FailedEvent {
|
||||
func RemoveFailedEventRequestToModel(ctx context.Context, req *admin_pb.RemoveFailedEventRequest) *model.FailedEvent {
|
||||
return &model.FailedEvent{
|
||||
Database: req.Database,
|
||||
ViewName: req.ViewName,
|
||||
FailedSequence: req.FailedSequence,
|
||||
InstanceID: authz.GetInstance(ctx).InstanceID(),
|
||||
}
|
||||
}
|
||||
|
@@ -1,8 +1,11 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/test"
|
||||
"github.com/zitadel/zitadel/internal/view/model"
|
||||
admin_pb "github.com/zitadel/zitadel/pkg/grpc/admin"
|
||||
@@ -25,6 +28,7 @@ func TestFailedEventsToPbFields(t *testing.T) {
|
||||
ViewName: "users",
|
||||
FailedSequence: 456,
|
||||
FailureCount: 5,
|
||||
LastFailed: time.Now(),
|
||||
ErrMsg: "some error",
|
||||
},
|
||||
},
|
||||
@@ -57,6 +61,7 @@ func TestFailedEventToPbFields(t *testing.T) {
|
||||
ViewName: "users",
|
||||
FailedSequence: 456,
|
||||
FailureCount: 5,
|
||||
LastFailed: time.Now(),
|
||||
ErrMsg: "some error",
|
||||
},
|
||||
},
|
||||
@@ -70,6 +75,7 @@ func TestFailedEventToPbFields(t *testing.T) {
|
||||
|
||||
func TestRemoveFailedEventRequestToModelFields(t *testing.T) {
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
req *admin_pb.RemoveFailedEventRequest
|
||||
}
|
||||
tests := []struct {
|
||||
@@ -79,6 +85,7 @@ func TestRemoveFailedEventRequestToModelFields(t *testing.T) {
|
||||
{
|
||||
"all fields",
|
||||
args{
|
||||
ctx: authz.WithInstanceID(context.Background(), "instanceID"),
|
||||
req: &admin_pb.RemoveFailedEventRequest{
|
||||
Database: "admin",
|
||||
ViewName: "users",
|
||||
@@ -88,7 +95,7 @@ func TestRemoveFailedEventRequestToModelFields(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
converted := RemoveFailedEventRequestToModel(tt.args.req)
|
||||
test.AssertFieldsMapped(t, converted, "FailureCount", "ErrMsg")
|
||||
converted := RemoveFailedEventRequestToModel(tt.args.ctx, tt.args.req)
|
||||
test.AssertFieldsMapped(t, converted, "FailureCount", "LastFailed", "ErrMsg")
|
||||
}
|
||||
}
|
||||
|
@@ -3,17 +3,25 @@ package admin
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/api/authz"
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
admin_pb "github.com/zitadel/zitadel/pkg/grpc/admin"
|
||||
)
|
||||
|
||||
func (s *Server) ListViews(ctx context.Context, _ *admin_pb.ListViewsRequest) (*admin_pb.ListViewsResponse, error) {
|
||||
currentSequences, err := s.query.SearchCurrentSequences(ctx, new(query.CurrentSequencesSearchQueries))
|
||||
instanceID := authz.GetInstance(ctx).InstanceID()
|
||||
instanceIDQuery, err := query.NewCurrentSequencesInstanceIDSearchQuery(instanceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentSequences, err := s.query.SearchCurrentSequences(ctx, &query.CurrentSequencesSearchQueries{
|
||||
Queries: []query.SearchQuery{instanceIDQuery},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
convertedCurrentSequences := CurrentSequencesToPb(s.database, currentSequences)
|
||||
views, err := s.administrator.GetViews()
|
||||
views, err := s.administrator.GetViews(instanceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -1,10 +1,11 @@
|
||||
package admin
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/view/model"
|
||||
admin_pb "github.com/zitadel/zitadel/pkg/grpc/admin"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func ViewsToPb(views []*model.View) []*admin_pb.View {
|
||||
@@ -35,9 +36,9 @@ func CurrentSequencesToPb(database string, currentSequences *query.CurrentSequen
|
||||
|
||||
func CurrentSequenceToPb(database string, currentSequence *query.CurrentSequence) *admin_pb.View {
|
||||
return &admin_pb.View{
|
||||
Database: database,
|
||||
ViewName: currentSequence.ProjectionName,
|
||||
ProcessedSequence: currentSequence.CurrentSequence,
|
||||
EventTimestamp: timestamppb.New(currentSequence.Timestamp),
|
||||
Database: database,
|
||||
ViewName: currentSequence.ProjectionName,
|
||||
ProcessedSequence: currentSequence.CurrentSequence,
|
||||
LastSuccessfulSpoolerRun: timestamppb.New(currentSequence.Timestamp),
|
||||
}
|
||||
}
|
||||
|
@@ -7,8 +7,8 @@ import (
|
||||
system_pb "github.com/zitadel/zitadel/pkg/grpc/system"
|
||||
)
|
||||
|
||||
func (s *Server) ListFailedEvents(ctx context.Context, req *system_pb.ListFailedEventsRequest) (*system_pb.ListFailedEventsResponse, error) {
|
||||
failedEventsOld, err := s.administrator.GetFailedEvents(ctx)
|
||||
func (s *Server) ListFailedEvents(ctx context.Context, _ *system_pb.ListFailedEventsRequest) (*system_pb.ListFailedEventsResponse, error) {
|
||||
failedEventsOld, err := s.administrator.GetFailedEvents(ctx, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -27,7 +27,7 @@ func (s *Server) RemoveFailedEvent(ctx context.Context, req *system_pb.RemoveFai
|
||||
if req.Database != s.database {
|
||||
err = s.administrator.RemoveFailedEvent(ctx, RemoveFailedEventRequestToModel(req))
|
||||
} else {
|
||||
err = s.query.RemoveFailedEvent(ctx, req.ViewName, req.FailedSequence)
|
||||
err = s.query.RemoveFailedEvent(ctx, req.ViewName, req.InstanceId, req.FailedSequence)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -1,6 +1,8 @@
|
||||
package system
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/view/model"
|
||||
system_pb "github.com/zitadel/zitadel/pkg/grpc/system"
|
||||
@@ -15,12 +17,17 @@ func FailedEventsViewToPb(failedEvents []*model.FailedEvent) []*system_pb.Failed
|
||||
}
|
||||
|
||||
func FailedEventViewToPb(failedEvent *model.FailedEvent) *system_pb.FailedEvent {
|
||||
var lastFailed *timestamppb.Timestamp
|
||||
if !failedEvent.LastFailed.IsZero() {
|
||||
lastFailed = timestamppb.New(failedEvent.LastFailed)
|
||||
}
|
||||
return &system_pb.FailedEvent{
|
||||
Database: failedEvent.Database,
|
||||
ViewName: failedEvent.ViewName,
|
||||
FailedSequence: failedEvent.FailedSequence,
|
||||
FailureCount: failedEvent.FailureCount,
|
||||
ErrorMessage: failedEvent.ErrMsg,
|
||||
LastFailed: lastFailed,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,12 +40,17 @@ func FailedEventsToPb(database string, failedEvents *query.FailedEvents) []*syst
|
||||
}
|
||||
|
||||
func FailedEventToPb(database string, failedEvent *query.FailedEvent) *system_pb.FailedEvent {
|
||||
var lastFailed *timestamppb.Timestamp
|
||||
if !failedEvent.LastFailed.IsZero() {
|
||||
lastFailed = timestamppb.New(failedEvent.LastFailed)
|
||||
}
|
||||
return &system_pb.FailedEvent{
|
||||
Database: database,
|
||||
ViewName: failedEvent.ProjectionName,
|
||||
FailedSequence: failedEvent.FailedSequence,
|
||||
FailureCount: failedEvent.FailureCount,
|
||||
ErrorMessage: failedEvent.Error,
|
||||
LastFailed: lastFailed,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,5 +59,6 @@ func RemoveFailedEventRequestToModel(req *system_pb.RemoveFailedEventRequest) *m
|
||||
Database: req.Database,
|
||||
ViewName: req.ViewName,
|
||||
FailedSequence: req.FailedSequence,
|
||||
InstanceID: req.InstanceId,
|
||||
}
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@ package system_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
system_grpc "github.com/zitadel/zitadel/internal/api/grpc/system"
|
||||
"github.com/zitadel/zitadel/internal/test"
|
||||
@@ -26,6 +27,7 @@ func TestFailedEventsToPbFields(t *testing.T) {
|
||||
ViewName: "users",
|
||||
FailedSequence: 456,
|
||||
FailureCount: 5,
|
||||
LastFailed: time.Now(),
|
||||
ErrMsg: "some error",
|
||||
},
|
||||
},
|
||||
@@ -58,6 +60,7 @@ func TestFailedEventToPbFields(t *testing.T) {
|
||||
ViewName: "users",
|
||||
FailedSequence: 456,
|
||||
FailureCount: 5,
|
||||
LastFailed: time.Now(),
|
||||
ErrMsg: "some error",
|
||||
},
|
||||
},
|
||||
@@ -84,12 +87,13 @@ func TestRemoveFailedEventRequestToModelFields(t *testing.T) {
|
||||
Database: "admin",
|
||||
ViewName: "users",
|
||||
FailedSequence: 456,
|
||||
InstanceId: "instanceID",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
converted := system_grpc.RemoveFailedEventRequestToModel(tt.args.req)
|
||||
test.AssertFieldsMapped(t, converted, "FailureCount", "ErrMsg")
|
||||
test.AssertFieldsMapped(t, converted, "FailureCount", "LastFailed", "ErrMsg")
|
||||
}
|
||||
}
|
||||
|
@@ -13,7 +13,7 @@ func (s *Server) ListViews(ctx context.Context, _ *system_pb.ListViewsRequest) (
|
||||
return nil, err
|
||||
}
|
||||
convertedCurrentSequences := CurrentSequencesToPb(s.database, currentSequences)
|
||||
views, err := s.administrator.GetViews()
|
||||
views, err := s.administrator.GetViews("")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -1,10 +1,11 @@
|
||||
package system
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/zitadel/zitadel/internal/query"
|
||||
"github.com/zitadel/zitadel/internal/view/model"
|
||||
system_pb "github.com/zitadel/zitadel/pkg/grpc/system"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
func ViewsToPb(views []*model.View) []*system_pb.View {
|
||||
@@ -35,9 +36,9 @@ func CurrentSequencesToPb(database string, currentSequences *query.CurrentSequen
|
||||
|
||||
func CurrentSequenceToPb(database string, currentSequence *query.CurrentSequence) *system_pb.View {
|
||||
return &system_pb.View{
|
||||
Database: database,
|
||||
ViewName: currentSequence.ProjectionName,
|
||||
ProcessedSequence: currentSequence.CurrentSequence,
|
||||
EventTimestamp: timestamppb.New(currentSequence.Timestamp),
|
||||
Database: database,
|
||||
ViewName: currentSequence.ProjectionName,
|
||||
ProcessedSequence: currentSequence.CurrentSequence,
|
||||
LastSuccessfulSpoolerRun: timestamppb.New(currentSequence.Timestamp),
|
||||
}
|
||||
}
|
||||
|
@@ -28,8 +28,8 @@ func expectFailureCount(tableName string, projectionName, instanceID string, fai
|
||||
|
||||
func expectUpdateFailureCount(tableName string, projectionName, instanceID string, seq, failureCount uint64) func(sqlmock.Sqlmock) {
|
||||
return func(m sqlmock.Sqlmock) {
|
||||
m.ExpectExec(`INSERT INTO `+tableName+` \(projection_name, failed_sequence, failure_count, error, instance_id\) VALUES \(\$1, \$2, \$3, \$4\, \$5\) ON CONFLICT \(projection_name, failed_sequence, instance_id\) DO UPDATE SET failure_count = EXCLUDED\.failure_count, error = EXCLUDED\.error`).
|
||||
WithArgs(projectionName, seq, failureCount, sqlmock.AnyArg(), instanceID).WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
m.ExpectExec(`INSERT INTO `+tableName+` \(projection_name, failed_sequence, failure_count, error, instance_id, last_failed\) VALUES \(\$1, \$2, \$3, \$4\, \$5\, \$6\) ON CONFLICT \(projection_name, failed_sequence, instance_id\) DO UPDATE SET failure_count = EXCLUDED\.failure_count, error = EXCLUDED\.error, last_failed = EXCLUDED\.last_failed`).
|
||||
WithArgs(projectionName, seq, failureCount, sqlmock.AnyArg(), instanceID, "NOW()").WillReturnResult(sqlmock.NewResult(1, 1))
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -11,9 +11,9 @@ import (
|
||||
|
||||
const (
|
||||
setFailureCountStmtFormat = "INSERT INTO %s" +
|
||||
" (projection_name, failed_sequence, failure_count, error, instance_id)" +
|
||||
" VALUES ($1, $2, $3, $4, $5) ON CONFLICT (projection_name, failed_sequence, instance_id)" +
|
||||
" DO UPDATE SET failure_count = EXCLUDED.failure_count, error = EXCLUDED.error"
|
||||
" (projection_name, failed_sequence, failure_count, error, instance_id, last_failed)" +
|
||||
" VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (projection_name, failed_sequence, instance_id)" +
|
||||
" DO UPDATE SET failure_count = EXCLUDED.failure_count, error = EXCLUDED.error, last_failed = EXCLUDED.last_failed"
|
||||
failureCountStmtFormat = "WITH failures AS (SELECT failure_count FROM %s WHERE projection_name = $1 AND failed_sequence = $2 AND instance_id = $3)" +
|
||||
" SELECT COALESCE((SELECT failure_count FROM failures), 0) AS failure_count"
|
||||
)
|
||||
@@ -43,7 +43,7 @@ func (h *StatementHandler) failureCount(tx *sql.Tx, seq uint64, instanceID strin
|
||||
}
|
||||
|
||||
func (h *StatementHandler) setFailureCount(tx *sql.Tx, seq uint64, count uint, err error, instanceID string) error {
|
||||
_, dbErr := tx.Exec(h.setFailureCountStmt, h.ProjectionName, seq, count, err.Error(), instanceID)
|
||||
_, dbErr := tx.Exec(h.setFailureCountStmt, h.ProjectionName, seq, count, err.Error(), instanceID, "NOW()")
|
||||
if dbErr != nil {
|
||||
return errors.ThrowInternal(dbErr, "CRDB-4Ht4x", "set failure count failed")
|
||||
}
|
||||
|
@@ -135,7 +135,7 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str
|
||||
select {
|
||||
case err := <-errs:
|
||||
cancel()
|
||||
logging.Log("SPOOL-OT8di").OnError(err).WithField("view", s.ViewModel()).WithField("worker", workerID).Debug("load canceled")
|
||||
logging.OnError(err).WithField("view", s.ViewModel()).WithField("worker", workerID).Debug("load canceled")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -216,6 +216,7 @@ func HandleError(event *models.Event, failedErr error,
|
||||
failedEvent.FailureCount++
|
||||
failedEvent.ErrMsg = failedErr.Error()
|
||||
failedEvent.InstanceID = event.InstanceID
|
||||
failedEvent.LastFailed = time.Now()
|
||||
err = processFailedEvent(failedEvent)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@@ -17,6 +17,10 @@ import (
|
||||
"github.com/zitadel/zitadel/internal/view/repository"
|
||||
)
|
||||
|
||||
var (
|
||||
testNow = time.Now()
|
||||
)
|
||||
|
||||
type testHandler struct {
|
||||
cycleDuration time.Duration
|
||||
processSleep time.Duration
|
||||
@@ -429,6 +433,7 @@ func TestHandleError(t *testing.T) {
|
||||
FailureCount: 6,
|
||||
ViewName: "super.table",
|
||||
InstanceID: instanceID,
|
||||
LastFailed: testNow,
|
||||
}, nil
|
||||
},
|
||||
errorCountUntilSkip: 5,
|
||||
@@ -449,6 +454,7 @@ func TestHandleError(t *testing.T) {
|
||||
FailureCount: 5,
|
||||
ViewName: "super.table",
|
||||
InstanceID: instanceID,
|
||||
LastFailed: testNow,
|
||||
}, nil
|
||||
},
|
||||
errorCountUntilSkip: 6,
|
||||
@@ -469,6 +475,7 @@ func TestHandleError(t *testing.T) {
|
||||
FailureCount: 3,
|
||||
ViewName: "super.table",
|
||||
InstanceID: instanceID,
|
||||
LastFailed: testNow,
|
||||
}, nil
|
||||
},
|
||||
errorCountUntilSkip: 5,
|
||||
|
@@ -42,6 +42,10 @@ type CurrentSequencesSearchQueries struct {
|
||||
Queries []SearchQuery
|
||||
}
|
||||
|
||||
func NewCurrentSequencesInstanceIDSearchQuery(instanceID string) (SearchQuery, error) {
|
||||
return NewTextQuery(CurrentSequenceColInstanceID, instanceID, TextEquals)
|
||||
}
|
||||
|
||||
func (q *CurrentSequencesSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuilder {
|
||||
query = q.SearchRequest.toQuery(query)
|
||||
for _, q := range q.Queries {
|
||||
|
@@ -3,7 +3,7 @@ package query
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
errs "errors"
|
||||
"time"
|
||||
|
||||
sq "github.com/Masterminds/squirrel"
|
||||
|
||||
@@ -15,6 +15,7 @@ const (
|
||||
failedEventsColumnProjectionName = "projection_name"
|
||||
failedEventsColumnFailedSequence = "failed_sequence"
|
||||
failedEventsColumnFailureCount = "failure_count"
|
||||
failedEventsColumnLastFailed = "last_failed"
|
||||
failedEventsColumnError = "error"
|
||||
failedEventsColumnInstanceID = "instance_id"
|
||||
)
|
||||
@@ -36,10 +37,18 @@ var (
|
||||
name: failedEventsColumnFailureCount,
|
||||
table: failedEventsTable,
|
||||
}
|
||||
FailedEventsColumnLastFailed = Column{
|
||||
name: failedEventsColumnLastFailed,
|
||||
table: failedEventsTable,
|
||||
}
|
||||
FailedEventsColumnError = Column{
|
||||
name: failedEventsColumnError,
|
||||
table: failedEventsTable,
|
||||
}
|
||||
FailedEventsColumnInstanceID = Column{
|
||||
name: failedEventsColumnInstanceID,
|
||||
table: failedEventsTable,
|
||||
}
|
||||
)
|
||||
|
||||
type FailedEvents struct {
|
||||
@@ -52,6 +61,7 @@ type FailedEvent struct {
|
||||
FailedSequence uint64
|
||||
FailureCount uint64
|
||||
Error string
|
||||
LastFailed time.Time
|
||||
}
|
||||
|
||||
type FailedEventSearchQueries struct {
|
||||
@@ -73,26 +83,27 @@ func (q *Queries) SearchFailedEvents(ctx context.Context, queries *FailedEventSe
|
||||
return scan(rows)
|
||||
}
|
||||
|
||||
func (q *Queries) RemoveFailedEvent(ctx context.Context, projectionName string, sequence uint64) (err error) {
|
||||
func (q *Queries) RemoveFailedEvent(ctx context.Context, projectionName, instanceID string, sequence uint64) (err error) {
|
||||
stmt, args, err := sq.Delete(projection.FailedEventsTable).
|
||||
Where(sq.Eq{
|
||||
failedEventsColumnProjectionName: projectionName,
|
||||
failedEventsColumnFailedSequence: sequence,
|
||||
failedEventsColumnInstanceID: instanceID,
|
||||
}).
|
||||
PlaceholderFormat(sq.Dollar).
|
||||
ToSql()
|
||||
if err != nil {
|
||||
return errors.ThrowInternal(err, "QUERY-DGgh3", "Errors.RemoveFailed")
|
||||
}
|
||||
_, err = q.client.Exec(stmt, args...)
|
||||
_, err = q.client.ExecContext(ctx, stmt, args...)
|
||||
if err != nil {
|
||||
return errors.ThrowInternal(err, "QUERY-0kbFF", "Errors.RemoveFailed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewFailedEventProjectionNameSearchQuery(method TextComparison, value string) (SearchQuery, error) {
|
||||
return NewTextQuery(FailedEventsColumnProjectionName, value, method)
|
||||
func NewFailedEventInstanceIDSearchQuery(instanceID string) (SearchQuery, error) {
|
||||
return NewTextQuery(FailedEventsColumnInstanceID, instanceID, TextEquals)
|
||||
}
|
||||
|
||||
func (r *ProjectSearchQueries) AppendProjectionNameQuery(projectionName string) error {
|
||||
@@ -112,36 +123,12 @@ func (q *FailedEventSearchQueries) toQuery(query sq.SelectBuilder) sq.SelectBuil
|
||||
return query
|
||||
}
|
||||
|
||||
func prepareFailedEventQuery(instanceIDs ...string) (sq.SelectBuilder, func(*sql.Row) (*FailedEvent, error)) {
|
||||
return sq.Select(
|
||||
FailedEventsColumnProjectionName.identifier(),
|
||||
FailedEventsColumnFailedSequence.identifier(),
|
||||
FailedEventsColumnFailureCount.identifier(),
|
||||
FailedEventsColumnError.identifier()).
|
||||
From(failedEventsTable.identifier()).PlaceholderFormat(sq.Dollar),
|
||||
func(row *sql.Row) (*FailedEvent, error) {
|
||||
p := new(FailedEvent)
|
||||
err := row.Scan(
|
||||
&p.ProjectionName,
|
||||
&p.FailedSequence,
|
||||
&p.FailureCount,
|
||||
&p.Error,
|
||||
)
|
||||
if err != nil {
|
||||
if errs.Is(err, sql.ErrNoRows) {
|
||||
return nil, errors.ThrowNotFound(err, "QUERY-5N00f", "Errors.FailedEvents.NotFound")
|
||||
}
|
||||
return nil, errors.ThrowInternal(err, "QUERY-0oJf3", "Errors.Internal")
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
}
|
||||
|
||||
func prepareFailedEventsQuery() (sq.SelectBuilder, func(*sql.Rows) (*FailedEvents, error)) {
|
||||
return sq.Select(
|
||||
FailedEventsColumnProjectionName.identifier(),
|
||||
FailedEventsColumnFailedSequence.identifier(),
|
||||
FailedEventsColumnFailureCount.identifier(),
|
||||
FailedEventsColumnLastFailed.identifier(),
|
||||
FailedEventsColumnError.identifier(),
|
||||
countColumn.identifier()).
|
||||
From(failedEventsTable.identifier()).PlaceholderFormat(sq.Dollar),
|
||||
@@ -150,16 +137,19 @@ func prepareFailedEventsQuery() (sq.SelectBuilder, func(*sql.Rows) (*FailedEvent
|
||||
var count uint64
|
||||
for rows.Next() {
|
||||
failedEvent := new(FailedEvent)
|
||||
var lastFailed sql.NullTime
|
||||
err := rows.Scan(
|
||||
&failedEvent.ProjectionName,
|
||||
&failedEvent.FailedSequence,
|
||||
&failedEvent.FailureCount,
|
||||
&lastFailed,
|
||||
&failedEvent.Error,
|
||||
&count,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
failedEvent.LastFailed = lastFailed.Time
|
||||
failedEvents = append(failedEvents, failedEvent)
|
||||
}
|
||||
|
||||
|
@@ -28,6 +28,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
|
||||
` projections.failed_events.failed_sequence,`+
|
||||
` projections.failed_events.failure_count,`+
|
||||
` projections.failed_events.last_failed,`+
|
||||
` projections.failed_events.error,`+
|
||||
` COUNT(*) OVER ()`+
|
||||
` FROM projections.failed_events`),
|
||||
@@ -45,6 +46,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
|
||||
` projections.failed_events.failed_sequence,`+
|
||||
` projections.failed_events.failure_count,`+
|
||||
` projections.failed_events.last_failed,`+
|
||||
` projections.failed_events.error,`+
|
||||
` COUNT(*) OVER ()`+
|
||||
` FROM projections.failed_events`),
|
||||
@@ -52,6 +54,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
"projection_name",
|
||||
"failed_sequence",
|
||||
"failure_count",
|
||||
"last_failed",
|
||||
"error",
|
||||
"count",
|
||||
},
|
||||
@@ -60,6 +63,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
"projection-name",
|
||||
uint64(20211108),
|
||||
uint64(2),
|
||||
testNow,
|
||||
"error",
|
||||
},
|
||||
},
|
||||
@@ -74,6 +78,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
ProjectionName: "projection-name",
|
||||
FailedSequence: 20211108,
|
||||
FailureCount: 2,
|
||||
LastFailed: testNow,
|
||||
Error: "error",
|
||||
},
|
||||
},
|
||||
@@ -87,6 +92,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
|
||||
` projections.failed_events.failed_sequence,`+
|
||||
` projections.failed_events.failure_count,`+
|
||||
` projections.failed_events.last_failed,`+
|
||||
` projections.failed_events.error,`+
|
||||
` COUNT(*) OVER ()`+
|
||||
` FROM projections.failed_events`),
|
||||
@@ -94,6 +100,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
"projection_name",
|
||||
"failed_sequence",
|
||||
"failure_count",
|
||||
"last_failed",
|
||||
"error",
|
||||
"count",
|
||||
},
|
||||
@@ -102,12 +109,14 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
"projection-name",
|
||||
uint64(20211108),
|
||||
2,
|
||||
testNow,
|
||||
"error",
|
||||
},
|
||||
{
|
||||
"projection-name-2",
|
||||
uint64(20211108),
|
||||
2,
|
||||
nil,
|
||||
"error",
|
||||
},
|
||||
},
|
||||
@@ -122,6 +131,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
ProjectionName: "projection-name",
|
||||
FailedSequence: 20211108,
|
||||
FailureCount: 2,
|
||||
LastFailed: testNow,
|
||||
Error: "error",
|
||||
},
|
||||
{
|
||||
@@ -141,6 +151,7 @@ func Test_FailedEventsPrepares(t *testing.T) {
|
||||
regexp.QuoteMeta(`SELECT projections.failed_events.projection_name,`+
|
||||
` projections.failed_events.failed_sequence,`+
|
||||
` projections.failed_events.failure_count,`+
|
||||
` projections.failed_events.last_failed,`+
|
||||
` projections.failed_events.error,`+
|
||||
` COUNT(*) OVER ()`+
|
||||
` FROM projections.failed_events`),
|
||||
|
@@ -1,9 +1,13 @@
|
||||
package model
|
||||
|
||||
import "time"
|
||||
|
||||
type FailedEvent struct {
|
||||
Database string
|
||||
ViewName string
|
||||
FailedSequence uint64
|
||||
FailureCount uint64
|
||||
ErrMsg string
|
||||
InstanceID string
|
||||
LastFailed time.Time
|
||||
}
|
||||
|
@@ -2,6 +2,7 @@ package repository
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jinzhu/gorm"
|
||||
|
||||
@@ -11,11 +12,47 @@ import (
|
||||
)
|
||||
|
||||
type FailedEvent struct {
|
||||
ViewName string `gorm:"column:view_name;primary_key"`
|
||||
FailedSequence uint64 `gorm:"column:failed_sequence;primary_key"`
|
||||
FailureCount uint64 `gorm:"column:failure_count"`
|
||||
ErrMsg string `gorm:"column:err_msg"`
|
||||
InstanceID string `gorm:"column:instance_id"`
|
||||
ViewName string `gorm:"column:view_name;primary_key"`
|
||||
FailedSequence uint64 `gorm:"column:failed_sequence;primary_key"`
|
||||
FailureCount uint64 `gorm:"column:failure_count"`
|
||||
ErrMsg string `gorm:"column:err_msg"`
|
||||
InstanceID string `gorm:"column:instance_id"`
|
||||
LastFailed time.Time `gorm:"column:last_failed"`
|
||||
}
|
||||
|
||||
type failedEventSearchRequest struct {
|
||||
Offset uint64
|
||||
Limit uint64
|
||||
SortingColumn failedEventSearchKey
|
||||
Asc bool
|
||||
Queries []*FailedEventSearchQuery
|
||||
}
|
||||
|
||||
func (f failedEventSearchRequest) GetLimit() uint64 {
|
||||
return f.Limit
|
||||
}
|
||||
|
||||
func (f failedEventSearchRequest) GetOffset() uint64 {
|
||||
return f.Offset
|
||||
}
|
||||
|
||||
func (f failedEventSearchRequest) GetSortingColumn() ColumnKey {
|
||||
if f.SortingColumn == failedEventSearchKey(FailedEventKeyUndefined) {
|
||||
return nil
|
||||
}
|
||||
return f.SortingColumn
|
||||
}
|
||||
|
||||
func (f failedEventSearchRequest) GetAsc() bool {
|
||||
return f.Asc
|
||||
}
|
||||
|
||||
func (f failedEventSearchRequest) GetQueries() []SearchQuery {
|
||||
result := make([]SearchQuery, len(f.Queries))
|
||||
for i, q := range f.Queries {
|
||||
result[i] = q
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
type FailedEventSearchQuery struct {
|
||||
@@ -43,6 +80,7 @@ const (
|
||||
FailedEventKeyViewName
|
||||
FailedEventKeyFailedSequence
|
||||
FailedEventKeyInstanceID
|
||||
FailedEventKeyLastFailed
|
||||
)
|
||||
|
||||
type failedEventSearchKey FailedEventSearchKey
|
||||
@@ -55,6 +93,8 @@ func (key failedEventSearchKey) ToColumnName() string {
|
||||
return "failed_sequence"
|
||||
case FailedEventKeyInstanceID:
|
||||
return "instance_id"
|
||||
case FailedEventKeyLastFailed:
|
||||
return "last_failed"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
@@ -65,6 +105,7 @@ func FailedEventFromModel(failedEvent *view_model.FailedEvent) *FailedEvent {
|
||||
ViewName: failedEvent.Database + "." + failedEvent.ViewName,
|
||||
FailureCount: failedEvent.FailureCount,
|
||||
FailedSequence: failedEvent.FailedSequence,
|
||||
InstanceID: failedEvent.InstanceID,
|
||||
ErrMsg: failedEvent.ErrMsg,
|
||||
}
|
||||
}
|
||||
@@ -76,6 +117,7 @@ func FailedEventToModel(failedEvent *FailedEvent) *view_model.FailedEvent {
|
||||
FailureCount: failedEvent.FailureCount,
|
||||
FailedSequence: failedEvent.FailedSequence,
|
||||
ErrMsg: failedEvent.ErrMsg,
|
||||
LastFailed: failedEvent.LastFailed,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,9 +165,13 @@ func LatestFailedEvent(db *gorm.DB, table, viewName, instanceID string, sequence
|
||||
|
||||
}
|
||||
|
||||
func AllFailedEvents(db *gorm.DB, table string) ([]*FailedEvent, error) {
|
||||
func AllFailedEvents(db *gorm.DB, table, instanceID string) ([]*FailedEvent, error) {
|
||||
queries := make([]*FailedEventSearchQuery, 0, 1)
|
||||
if instanceID != "" {
|
||||
queries = append(queries, &FailedEventSearchQuery{Key: FailedEventKeyInstanceID, Method: domain.SearchMethodEquals, Value: instanceID})
|
||||
}
|
||||
failedEvents := make([]*FailedEvent, 0)
|
||||
query := PrepareSearchQuery(table, GeneralSearchRequest{})
|
||||
query := PrepareSearchQuery(table, &failedEventSearchRequest{SortingColumn: failedEventSearchKey(FailedEventKeyLastFailed), Queries: queries})
|
||||
_, err := query(db, &failedEvents)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@@ -192,9 +192,13 @@ func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs ...string)
|
||||
return sequences, nil
|
||||
}
|
||||
|
||||
func AllCurrentSequences(db *gorm.DB, table string) ([]*CurrentSequence, error) {
|
||||
func AllCurrentSequences(db *gorm.DB, table, instanceID string) ([]*CurrentSequence, error) {
|
||||
queries := make([]sequenceSearchQuery, 0, 1)
|
||||
if instanceID != "" {
|
||||
queries = append(queries, sequenceSearchQuery{key: sequenceSearchKey(SequenceSearchKeyInstanceID), value: instanceID})
|
||||
}
|
||||
sequences := make([]*CurrentSequence, 0)
|
||||
query := PrepareSearchQuery(table, GeneralSearchRequest{})
|
||||
query := PrepareSearchQuery(table, &sequenceSearchRequest{queries: queries})
|
||||
_, err := query(db, &sequences)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
Reference in New Issue
Block a user