fix(query): keys (#2755)

* fix: add keys to projections

* change to multiple tables

* query keys

* query keys

* fix race condition

* fix timer reset

* begin tests

* tests

* remove migration

* only send to keyChannel if not nil
This commit is contained in:
Livio Amstutz
2022-01-12 13:22:04 +01:00
committed by GitHub
parent ead61d240d
commit 9ab566fdeb
23 changed files with 927 additions and 419 deletions

View File

@@ -4,13 +4,12 @@ import (
"context"
"database/sql"
"fmt"
"os"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/handler"
"github.com/caos/zitadel/internal/id"
)
var (
@@ -32,6 +31,7 @@ type StatementHandlerConfig struct {
type StatementHandler struct {
*handler.ProjectionHandler
Locker
client *sql.DB
sequenceTable string
@@ -40,25 +40,17 @@ type StatementHandler struct {
maxFailureCount uint
failureCountStmt string
setFailureCountStmt string
lockStmt string
aggregates []eventstore.AggregateType
reduces map[eventstore.EventType]handler.Reduce
workerName string
bulkLimit uint64
bulkLimit uint64
}
func NewStatementHandler(
ctx context.Context,
config StatementHandlerConfig,
) StatementHandler {
workerName, err := os.Hostname()
if err != nil || workerName == "" {
workerName, err = id.SonyFlakeGenerator.Next()
logging.Log("SPOOL-bdO56").OnError(err).Panic("unable to generate lockID")
}
aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers))
reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers))
for _, aggReducer := range config.Reducers {
@@ -77,11 +69,10 @@ func NewStatementHandler(
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
lockStmt: fmt.Sprintf(lockStmtFormat, config.LockTable),
aggregates: aggregateTypes,
reduces: reduces,
workerName: workerName,
bulkLimit: config.BulkLimit,
Locker: NewLocker(config.Client, config.LockTable, config.ProjectionHandlerConfig.ProjectionName),
}
go h.ProjectionHandler.Process(

View File

@@ -2,9 +2,15 @@ package crdb
import (
"context"
"database/sql"
"fmt"
"os"
"time"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/id"
)
const (
@@ -15,13 +21,39 @@ const (
" WHERE %[1]s.projection_name = $3 AND (%[1]s.locker_id = $1 OR %[1]s.locked_until < now())"
)
func (h *StatementHandler) Lock(ctx context.Context, lockDuration time.Duration) <-chan error {
type Locker interface {
Lock(ctx context.Context, lockDuration time.Duration) <-chan error
Unlock() error
}
type locker struct {
client *sql.DB
lockStmt string
workerName string
projectionName string
}
func NewLocker(client *sql.DB, lockTable, projectionName string) Locker {
workerName, err := os.Hostname()
if err != nil || workerName == "" {
workerName, err = id.SonyFlakeGenerator.Next()
logging.Log("CRDB-bdO56").OnError(err).Panic("unable to generate lockID")
}
return &locker{
client: client,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
workerName: workerName,
projectionName: projectionName,
}
}
func (h *locker) Lock(ctx context.Context, lockDuration time.Duration) <-chan error {
errs := make(chan error)
go h.handleLock(ctx, errs, lockDuration)
return errs
}
func (h *StatementHandler) handleLock(ctx context.Context, errs chan error, lockDuration time.Duration) {
func (h *locker) handleLock(ctx context.Context, errs chan error, lockDuration time.Duration) {
renewLock := time.NewTimer(0)
for {
select {
@@ -37,9 +69,9 @@ func (h *StatementHandler) handleLock(ctx context.Context, errs chan error, lock
}
}
func (h *StatementHandler) renewLock(ctx context.Context, lockDuration time.Duration) error {
func (h *locker) renewLock(ctx context.Context, lockDuration time.Duration) error {
//the unit of crdb interval is seconds (https://www.cockroachlabs.com/docs/stable/interval.html).
res, err := h.client.Exec(h.lockStmt, h.workerName, lockDuration.Seconds(), h.ProjectionName)
res, err := h.client.Exec(h.lockStmt, h.workerName, lockDuration.Seconds(), h.projectionName)
if err != nil {
return errors.ThrowInternal(err, "CRDB-uaDoR", "unable to execute lock")
}
@@ -51,8 +83,8 @@ func (h *StatementHandler) renewLock(ctx context.Context, lockDuration time.Dura
return nil
}
func (h *StatementHandler) Unlock() error {
_, err := h.client.Exec(h.lockStmt, h.workerName, float64(0), h.ProjectionName)
func (h *locker) Unlock() error {
_, err := h.client.Exec(h.lockStmt, h.workerName, float64(0), h.projectionName)
if err != nil {
return errors.ThrowUnknown(err, "CRDB-JjfwO", "unlock failed")
}

View File

@@ -8,10 +8,9 @@ import (
"testing"
"time"
z_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/handler"
"github.com/DATA-DOG/go-sqlmock"
z_errs "github.com/caos/zitadel/internal/errors"
)
const (
@@ -82,13 +81,11 @@ func TestStatementHandler_handleLock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
h := &StatementHandler{
ProjectionHandler: &handler.ProjectionHandler{
ProjectionName: projectionName,
},
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
h := &locker{
projectionName: projectionName,
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
}
for _, expectation := range tt.want.expectations {
@@ -173,13 +170,11 @@ func TestStatementHandler_renewLock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
h := &StatementHandler{
ProjectionHandler: &handler.ProjectionHandler{
ProjectionName: projectionName,
},
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
h := &locker{
projectionName: projectionName,
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
}
for _, expectation := range tt.want.expectations {
@@ -237,13 +232,11 @@ func TestStatementHandler_Unlock(t *testing.T) {
if err != nil {
t.Fatal(err)
}
h := &StatementHandler{
ProjectionHandler: &handler.ProjectionHandler{
ProjectionName: projectionName,
},
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
h := &locker{
projectionName: projectionName,
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
}
for _, expectation := range tt.want.expectations {