feat: jobs for projection tables (#1730)

* job queue

* wg improvements

* start handler

* statement

* statements

* imporve handler

* improve statement

* statement in seperate file

* move handlers

* move query/old to query

* handler

* read models

* bulk works

* cleanup

* contrib

* rename readmodel to projection

* rename read_models schema to projections

* rename read_models schema to projections

* search query as func,
bulk iterates as long as new events

* add event sequence less query

* update checks for events between current sequence and sequence of first statement if it has previous sequence 0

* cleanup crdb projection

* refactor projection handler

* start with testing

* tests for handler

* remove todo

* refactor statement: remove table name,
add tests

* improve projection handler shutdown,
no savepoint if noop stmt,
tests for stmt handler

* tests

* start failed events

* seperate branch for contrib

* move statement constructors to crdb pkg

* correct import

* Subscribe for eventtypes (#1800)

* fix: is default (#1737)

* fix: use email as username on global org (#1738)

* fix: use email as username on global org

* Update user_human.go

* Update register_handler.go

* chore(deps): update docusaurus (#1739)

* chore: remove PAT and use GH Token (#1716)

* chore: remove PAT and use GH Token

* fix env

* fix env

* fix env

* md lint

* trigger ci

* change user

* fix GH bug

* replace login part

* chore: add GH Token to sem rel (#1746)

* chore: add GH Token to sem rel

* try branch

* add GH Token

* remove test branch again

* docs: changes acme to acme-caos (#1744)

* changes acme to acme-caos

* Apply suggestions from code review

Co-authored-by: Florian Forster <florian@caos.ch>

Co-authored-by: Maximilian Panne <maximilian.panne@gmail.com>
Co-authored-by: Florian Forster <florian@caos.ch>

* feat: add additional origins on applications (#1691)

* feat: add additional origins on applications

* app additional redirects

* chore(deps-dev): bump @angular/cli from 11.2.8 to 11.2.11 in /console (#1706)

* fix: show org with regex (#1688)

* fix: flag mapping (#1699)

* chore(deps-dev): bump @angular/cli from 11.2.8 to 11.2.11 in /console

Bumps [@angular/cli](https://github.com/angular/angular-cli) from 11.2.8 to 11.2.11.
- [Release notes](https://github.com/angular/angular-cli/releases)
- [Commits](https://github.com/angular/angular-cli/compare/v11.2.8...v11.2.11)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Silvan <silvan.reusser@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps-dev): bump stylelint from 13.10.0 to 13.13.1 in /console (#1703)

* fix: show org with regex (#1688)

* fix: flag mapping (#1699)

* chore(deps-dev): bump stylelint from 13.10.0 to 13.13.1 in /console

Bumps [stylelint](https://github.com/stylelint/stylelint) from 13.10.0 to 13.13.1.
- [Release notes](https://github.com/stylelint/stylelint/releases)
- [Changelog](https://github.com/stylelint/stylelint/blob/master/CHANGELOG.md)
- [Commits](https://github.com/stylelint/stylelint/compare/13.10.0...13.13.1)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Silvan <silvan.reusser@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps-dev): bump @types/node from 14.14.37 to 15.0.1 in /console (#1702)

* fix: show org with regex (#1688)

* fix: flag mapping (#1699)

* chore(deps-dev): bump @types/node from 14.14.37 to 15.0.1 in /console

Bumps [@types/node](https://github.com/DefinitelyTyped/DefinitelyTyped/tree/HEAD/types/node) from 14.14.37 to 15.0.1.
- [Release notes](https://github.com/DefinitelyTyped/DefinitelyTyped/releases)
- [Commits](https://github.com/DefinitelyTyped/DefinitelyTyped/commits/HEAD/types/node)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Silvan <silvan.reusser@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump ts-protoc-gen from 0.14.0 to 0.15.0 in /console (#1701)

* fix: show org with regex (#1688)

* fix: flag mapping (#1699)

* chore(deps): bump ts-protoc-gen from 0.14.0 to 0.15.0 in /console

Bumps [ts-protoc-gen](https://github.com/improbable-eng/ts-protoc-gen) from 0.14.0 to 0.15.0.
- [Release notes](https://github.com/improbable-eng/ts-protoc-gen/releases)
- [Changelog](https://github.com/improbable-eng/ts-protoc-gen/blob/master/CHANGELOG.md)
- [Commits](https://github.com/improbable-eng/ts-protoc-gen/compare/0.14.0...0.15.0)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Silvan <silvan.reusser@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps-dev): bump @types/jasmine from 3.6.9 to 3.6.10 in /console (#1682)

Bumps [@types/jasmine](https://github.com/DefinitelyTyped/DefinitelyTyped/tree/HEAD/types/jasmine) from 3.6.9 to 3.6.10.
- [Release notes](https://github.com/DefinitelyTyped/DefinitelyTyped/releases)
- [Commits](https://github.com/DefinitelyTyped/DefinitelyTyped/commits/HEAD/types/jasmine)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump @types/google-protobuf in /console (#1681)

Bumps [@types/google-protobuf](https://github.com/DefinitelyTyped/DefinitelyTyped/tree/HEAD/types/google-protobuf) from 3.7.4 to 3.15.2.
- [Release notes](https://github.com/DefinitelyTyped/DefinitelyTyped/releases)
- [Commits](https://github.com/DefinitelyTyped/DefinitelyTyped/commits/HEAD/types/google-protobuf)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump grpc from 1.24.5 to 1.24.7 in /console (#1666)

Bumps [grpc](https://github.com/grpc/grpc-node) from 1.24.5 to 1.24.7.
- [Release notes](https://github.com/grpc/grpc-node/releases)
- [Commits](https://github.com/grpc/grpc-node/compare/grpc@1.24.5...grpc@1.24.7)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* lock

* chore(deps-dev): bump @angular/language-service from 11.2.9 to 11.2.12 in /console (#1704)

* fix: show org with regex (#1688)

* fix: flag mapping (#1699)

* chore(deps-dev): bump @angular/language-service in /console

Bumps [@angular/language-service](https://github.com/angular/angular/tree/HEAD/packages/language-service) from 11.2.9 to 11.2.12.
- [Release notes](https://github.com/angular/angular/releases)
- [Changelog](https://github.com/angular/angular/blob/master/CHANGELOG.md)
- [Commits](https://github.com/angular/angular/commits/11.2.12/packages/language-service)

Signed-off-by: dependabot[bot] <support@github.com>

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Silvan <silvan.reusser@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* package lock

* downgrade grpc

* downgrade protobuf types

* revert npm packs 🥸

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Silvan <silvan.reusser@gmail.com>

* docs: update run and start section texts (#1745)

* update run and start section texts

* adds showcase

Co-authored-by: Maximilian Panne <maximilian.panne@gmail.com>

* fix: additional origin list (#1753)

* fix: handle api configs in authz handler (#1755)

* fix(console): add model for api keys, fix toast, binding (#1757)

* fix: add model for api keys, fix toast, binding

* show api clientid

* fix: missing patchvalue (#1758)

* feat: refresh token (#1728)

* begin refresh tokens

* refresh tokens

* list and revoke refresh tokens

* handle remove

* tests for refresh tokens

* uniqueness and default expiration

* rename oidc token methods

* cleanup

* migration version

* Update internal/static/i18n/en.yaml

Co-authored-by: Fabi <38692350+fgerschwiler@users.noreply.github.com>

* fixes

* feat: update oidc pkg for refresh tokens

Co-authored-by: Fabi <38692350+fgerschwiler@users.noreply.github.com>

* fix: correct json name of clientId in key.json (#1760)

* fix: migration version (#1767)

* start subscription

* eventtypes

* fix(login): links (#1778)

* fix(login): href for help

* fix(login): correct link to tos

* fix: access tokens for service users and refresh token infos (#1779)

* fix: access token for service user

* handle info from refresh request

* uniqueness

* postpone access token uniqueness change

* chore(coc): recommend code of conduct (#1782)

* subscribe for events

* feat(console): refresh toggle out of granttype context (#1785)

* refresh toggle

* disable if not code flow, lint

* lint

* fix: change oidc config order

* accept refresh option within flow

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix: refresh token activation (#1795)

* fix: oidc grant type check

* docs: add offline_access scope

* docs: update refresh token status in supported grant types

* fix: update oidc pkg

* fix: check refresh token grant type (#1796)

* configuration structs

* org admins

* failed events

* fixes

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Livio Amstutz <livio.a@gmail.com>
Co-authored-by: Florian Forster <florian@caos.ch>
Co-authored-by: mffap <mpa@caos.ch>
Co-authored-by: Maximilian Panne <maximilian.panne@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Fabi <38692350+fgerschwiler@users.noreply.github.com>

* remove comment

* aggregate reducer

* remove eventtypes

* add protoc-get-validate to mod

* fix transaltion

* upsert

* add gender on org admins,
allow to retry failed stmts after configurable time

* remove if

* sub queries

* fix: tests

* add builder to tests

* new search query

* rename searchquerybuilder to builder

* remove comment from code

* test with multiple queries

* add filters test

* current sequences

* make org and org_admins work again

* add aggregate type to current sequence

* fix(contibute): listing

* add validate module

* fix: search queries

* feat(eventstore): previous aggregate root sequence (#1810)

* feat(eventstore): previous aggregate root sequence

* fix tests

* fix: eventstore v1 test

* add col to all mocked rows

* next try

* fix mig

* rename aggregate root to aggregate type

* update comment

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* small refactorings

* allow update multiple current sequences

* unique log id

* fix migrations

* rename org admin to org owner

* improve error handling and logging

* fix(migration): optimize prev agg root seq

* fix: projection handler test

* fix: sub queries

* small fixes

* additional event types

* correct org owner projection

* fix primary key

* feat(eventstore): jobs for projections (#2026)

* fix: template names in login (#1974)

* fix: template names in login

* fix: error.html

* fix: check for features on mgmt only (#1976)

* fix: add sentry in ui, http and projection handlers (#1977)

* fix: add sentry in ui, http and projection handlers

* fix test

* fix(eventstore): sub queries (#1805)

* sub queries

* fix: tests

* add builder to tests

* new search query

* rename searchquerybuilder to builder

* remove comment from code

* test with multiple queries

* add filters test

* fix(contibute): listing

* add validate module

* fix: search queries

* remove unused event type in query

* ignore query if error in marshal

* go mod tidy

* update privacy policy query

* update queries

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* feat: Extend oidc idp with oauth endpoints (#1980)

* feat: add oauth attributes to oidc idp configuration

* feat: return idpconfig id on create idp

* feat: tests

* feat: descriptions

* feat: docs

* feat: tests

* docs: update to beta 3 (#1984)

* fix: role assertion (#1986)

* fix: enum to display access token role assertion

* improve assertion descriptions

* fix nil pointer

* docs: eventstore (#1982)

* docs: eventstore

* Apply suggestions from code review

Co-authored-by: Florian Forster <florian@caos.ch>

Co-authored-by: Florian Forster <florian@caos.ch>

* fix(sentry): trigger sentry release (#1989)

* feat(send sentry release): send sentry release

* fix(moved step and added releasetag): moved step and added releasetag

* fix: set version for sentry release (#1990)

* feat(send sentry release): send sentry release

* fix(moved step and added releasetag): moved step and added releasetag

* fix(corrected var name): corrected var name

Co-authored-by: Livio Amstutz <livio.a@gmail.com>

* fix: log error reason on terminate session (#1973)

* fix: return default language file, if requested lang does not exist for default login texts (#1988)

* fix: return default language file, if requested lang doesnt exists

* feat: read default translation file

* feat: docs

* fix: race condition in auth request unmarshalling (#1993)

* feat: handle ui_locales in login (#1994)

* fix: handle ui_locales in login

* move supportedlanguage func into i18n package

* update oidc pkg

* fix: handle closed channels on unsubscribe (#1995)

* fix: give restore more time (#1997)

* fix: translation file read (#2009)

* feat: translation file read

* feat: readme

* fix: enable idp add button for iam users (#2010)

* fix: filter event_data (#2011)

* feat: Custom message files (#1992)

* feat: add get custom message text to admin api

* feat: read custom message texts from files

* feat: get languages in apis

* feat: get languages in apis

* feat: get languages in apis

* feat: pr feedback

* feat: docs

* feat: merge main

* fix: sms notification (#2013)

* fix: phone verifications

* feat: fix password reset as sms

* fix: phone verification

* fix: grpc status in sentry and validation interceptors (#2012)

* fix: remove oauth endpoints from oidc config proto (#2014)

* try with view

* fix(console): disable sw (#2021)

* fix: disable sw

* angular.json disable sw

* project projections

* fix typos

* customize projections

* customizable projections,
add change date to projects

Co-authored-by: Livio Amstutz <livio.a@gmail.com>
Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Fabi <38692350+fgerschwiler@users.noreply.github.com>
Co-authored-by: Florian Forster <florian@caos.ch>
Co-authored-by: mffap <mpa@caos.ch>
Co-authored-by: Christian Jakob <47860090+thesephirot@users.noreply.github.com>
Co-authored-by: Elio Bischof <eliobischof@gmail.com>

* env file

* typo

* correct users

* correct migration

* fix: merge fail

* fix test

* fix(tests): unordered matcher

* improve currentSequenceMatcher

* correct certs

* correct certs

* add zitadel database on database list

* refctor switch in match

* enable all handlers

* Delete io.env

* cleanup

* add handlers

* rename view to projection

* rename view to projection

* fix type typo

* remove unnecessary logs

* refactor stmts

* simplify interval calculation

* fix tests

* fix unlock test

* fix migration

* migs

* fix(operator): update cockroach and flyway versions (#2138)

* chore(deps): bump k8s.io/apiextensions-apiserver from 0.19.2 to 0.21.3

Bumps [k8s.io/apiextensions-apiserver](https://github.com/kubernetes/apiextensions-apiserver) from 0.19.2 to 0.21.3.
- [Release notes](https://github.com/kubernetes/apiextensions-apiserver/releases)
- [Commits](https://github.com/kubernetes/apiextensions-apiserver/compare/v0.19.2...v0.21.3)

---
updated-dependencies:
- dependency-name: k8s.io/apiextensions-apiserver
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* chore(deps): bump google.golang.org/api from 0.34.0 to 0.52.0

Bumps [google.golang.org/api](https://github.com/googleapis/google-api-go-client) from 0.34.0 to 0.52.0.
- [Release notes](https://github.com/googleapis/google-api-go-client/releases)
- [Changelog](https://github.com/googleapis/google-api-go-client/blob/master/CHANGES.md)
- [Commits](https://github.com/googleapis/google-api-go-client/compare/v0.34.0...v0.52.0)

---
updated-dependencies:
- dependency-name: google.golang.org/api
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* start update dependencies

* update mods and otlp

* fix(build): update to go 1.16

* old version for k8s mods

* update k8s versions

* update orbos

* fix(operator): update cockroach and flyway version

* Update images.go

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Stefan Benz <stefan@caos.ch>

* fix import

* fix typo

* fix(migration): add org projection

* fix(projection): correct table for org events in org owners

* better insert stmt

* fix typo

* fix typo

* set max connection lifetime

* set max conns and conn lifetime in eventstore v1

* configure sql connection settings

* add mig for agg type index

* fix replace tab in yaml

* check requeue at least 500ms

* split column in column and condition

* remove useless comment

* mig versions

* fix migs

Co-authored-by: Max Peintner <max@caos.ch>
Co-authored-by: Livio Amstutz <livio.a@gmail.com>
Co-authored-by: Florian Forster <florian@caos.ch>
Co-authored-by: mffap <mpa@caos.ch>
Co-authored-by: Maximilian Panne <maximilian.panne@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Fabi <38692350+fgerschwiler@users.noreply.github.com>
Co-authored-by: Christian Jakob <47860090+thesephirot@users.noreply.github.com>
Co-authored-by: Elio Bischof <eliobischof@gmail.com>
Co-authored-by: Stefan Benz <stefan@caos.ch>
This commit is contained in:
Silvan
2021-08-19 08:31:56 +02:00
committed by GitHub
parent 9ba8184829
commit 37ee5b4bab
69 changed files with 6449 additions and 547 deletions

View File

@@ -0,0 +1,71 @@
package crdb
import (
"database/sql"
"strconv"
"strings"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore"
)
const (
currentSequenceStmtFormat = `SELECT current_sequence, aggregate_type FROM %s WHERE projection_name = $1 FOR UPDATE`
updateCurrentSequencesStmtFormat = `UPSERT INTO %s (projection_name, aggregate_type, current_sequence, timestamp) VALUES `
)
type currentSequences map[eventstore.AggregateType]uint64
func (h *StatementHandler) currentSequences(query func(string, ...interface{}) (*sql.Rows, error)) (currentSequences, error) {
rows, err := query(h.currentSequenceStmt, h.ProjectionName)
if err != nil {
return nil, err
}
defer rows.Close()
sequences := make(currentSequences, len(h.aggregates))
for rows.Next() {
var (
aggregateType eventstore.AggregateType
sequence uint64
)
err = rows.Scan(&sequence, &aggregateType)
if err != nil {
return nil, errors.ThrowInternal(err, "CRDB-dbatK", "scan failed")
}
sequences[aggregateType] = sequence
}
if err = rows.Close(); err != nil {
return nil, errors.ThrowInternal(err, "CRDB-h5i5m", "close rows failed")
}
if err = rows.Err(); err != nil {
return nil, errors.ThrowInternal(err, "CRDB-O8zig", "errors in scanning rows")
}
return sequences, nil
}
func (h *StatementHandler) updateCurrentSequences(tx *sql.Tx, sequences currentSequences) error {
valueQueries := make([]string, 0, len(sequences))
valueCounter := 0
values := make([]interface{}, 0, len(sequences)*3)
for aggregate, sequence := range sequences {
valueQueries = append(valueQueries, "($"+strconv.Itoa(valueCounter+1)+", $"+strconv.Itoa(valueCounter+2)+", $"+strconv.Itoa(valueCounter+3)+", NOW())")
valueCounter += 3
values = append(values, h.ProjectionName, aggregate, sequence)
}
res, err := tx.Exec(h.updateSequencesBaseStmt+strings.Join(valueQueries, ", "), values...)
if err != nil {
return errors.ThrowInternal(err, "CRDB-TrH2Z", "unable to exec update sequence")
}
if rows, _ := res.RowsAffected(); rows < 1 {
return errSeqNotUpdated
}
return nil
}

View File

@@ -0,0 +1,306 @@
package crdb
import (
"database/sql"
"database/sql/driver"
"log"
"strings"
"time"
"github.com/DATA-DOG/go-sqlmock"
"github.com/caos/zitadel/internal/eventstore"
)
type mockExpectation func(sqlmock.Sqlmock)
func expectFailureCount(tableName string, projectionName string, failedSeq, failureCount uint64) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectQuery(`WITH failures AS \(SELECT failure_count FROM `+tableName+` WHERE projection_name = \$1 AND failed_sequence = \$2\) SELECT IF\(EXISTS\(SELECT failure_count FROM failures\), \(SELECT failure_count FROM failures\), 0\) AS failure_count`).
WithArgs(projectionName, failedSeq).
WillReturnRows(
sqlmock.NewRows([]string{"failure_count"}).
AddRow(failureCount),
)
}
}
func expectUpdateFailureCount(tableName string, projectionName string, seq, failureCount uint64) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec(`UPSERT INTO `+tableName+` \(projection_name, failed_sequence, failure_count, error\) VALUES \(\$1, \$2, \$3, \$4\)`).
WithArgs(projectionName, seq, failureCount, sqlmock.AnyArg()).WillReturnResult(sqlmock.NewResult(1, 1))
}
}
func expectCreate(projectionName string, columnNames, placeholders []string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
args := make([]driver.Value, len(columnNames))
for i := 0; i < len(columnNames); i++ {
args[i] = sqlmock.AnyArg()
placeholders[i] = `\` + placeholders[i]
}
m.ExpectExec("INSERT INTO " + projectionName + ` \(` + strings.Join(columnNames, ", ") + `\) VALUES \(` + strings.Join(placeholders, ", ") + `\)`).
WithArgs(args...).
WillReturnResult(sqlmock.NewResult(1, 1))
}
}
func expectCreateErr(projectionName string, columnNames, placeholders []string, err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
args := make([]driver.Value, len(columnNames))
for i := 0; i < len(columnNames); i++ {
args[i] = sqlmock.AnyArg()
placeholders[i] = `\` + placeholders[i]
}
m.ExpectExec("INSERT INTO " + projectionName + ` \(` + strings.Join(columnNames, ", ") + `\) VALUES \(` + strings.Join(placeholders, ", ") + `\)`).
WithArgs(args...).
WillReturnError(err)
}
}
func expectBegin() func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectBegin()
}
}
func expectBeginErr(err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectBegin().WillReturnError(err)
}
}
func expectCommit() func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectCommit()
}
}
func expectCommitErr(err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectCommit().WillReturnError(err)
}
}
func expectRollback() func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectRollback()
}
}
func expectSavePoint() func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("SAVEPOINT push_stmt").
WillReturnResult(sqlmock.NewResult(1, 1))
}
}
func expectSavePointErr(err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("SAVEPOINT push_stmt").
WillReturnError(err)
}
}
func expectSavePointRollback() func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("ROLLBACK TO SAVEPOINT push_stmt").
WillReturnResult(sqlmock.NewResult(1, 1))
}
}
func expectSavePointRollbackErr(err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("ROLLBACK TO SAVEPOINT push_stmt").
WillReturnError(err)
}
}
func expectSavePointRelease() func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("RELEASE push_stmt").
WillReturnResult(sqlmock.NewResult(1, 1))
}
}
func expectCurrentSequence(tableName, projection string, seq uint64, aggregateType string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectQuery(`SELECT current_sequence, aggregate_type FROM ` + tableName + ` WHERE projection_name = \$1 FOR UPDATE`).
WithArgs(
projection,
).
WillReturnRows(
sqlmock.NewRows([]string{"current_sequence", "aggregate_type"}).
AddRow(seq, aggregateType),
)
}
}
func expectCurrentSequenceErr(tableName, projection string, err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectQuery(`SELECT current_sequence, aggregate_type FROM ` + tableName + ` WHERE projection_name = \$1 FOR UPDATE`).
WithArgs(
projection,
).
WillReturnError(err)
}
}
func expectCurrentSequenceNoRows(tableName, projection string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectQuery(`SELECT current_sequence, aggregate_type FROM ` + tableName + ` WHERE projection_name = \$1 FOR UPDATE`).
WithArgs(
projection,
).
WillReturnRows(
sqlmock.NewRows([]string{"current_sequence", "aggregate_type"}),
)
}
}
func expectCurrentSequenceScanErr(tableName, projection string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectQuery(`SELECT current_sequence, aggregate_type FROM ` + tableName + ` WHERE projection_name = \$1 FOR UPDATE`).
WithArgs(
projection,
).
WillReturnRows(
sqlmock.NewRows([]string{"current_sequence", "aggregate_type"}).
RowError(0, sql.ErrTxDone).
AddRow(0, "agg"),
)
}
}
func expectUpdateCurrentSequence(tableName, projection string, seq uint64, aggregateType string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("UPSERT INTO "+tableName+` \(projection_name, aggregate_type, current_sequence, timestamp\) VALUES \(\$1, \$2, \$3, NOW\(\)\)`).
WithArgs(
projection,
aggregateType,
seq,
).
WillReturnResult(
sqlmock.NewResult(1, 1),
)
}
}
func expectUpdateTwoCurrentSequence(tableName, projection string, sequences currentSequences) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
matcher := &currentSequenceMatcher{seq: sequences}
m.ExpectExec("UPSERT INTO "+tableName+` \(projection_name, aggregate_type, current_sequence, timestamp\) VALUES \(\$1, \$2, \$3, NOW\(\)\), \(\$4, \$5, \$6, NOW\(\)\)`).
WithArgs(
projection,
matcher,
matcher,
projection,
matcher,
matcher,
).
WillReturnResult(
sqlmock.NewResult(1, 1),
)
}
}
type currentSequenceMatcher struct {
seq currentSequences
currentAggregate eventstore.AggregateType
}
func (m *currentSequenceMatcher) Match(value driver.Value) bool {
switch v := value.(type) {
case string:
if m.currentAggregate != "" {
log.Printf("expected sequence of %s but got next aggregate type %s", m.currentAggregate, value)
return false
}
_, ok := m.seq[eventstore.AggregateType(v)]
if !ok {
return false
}
m.currentAggregate = eventstore.AggregateType(v)
return true
default:
seq := m.seq[m.currentAggregate]
m.currentAggregate = ""
delete(m.seq, m.currentAggregate)
return int64(seq) == value.(int64)
}
}
func expectUpdateCurrentSequenceErr(tableName, projection string, seq uint64, err error, aggregateType string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("UPSERT INTO "+tableName+` \(projection_name, aggregate_type, current_sequence, timestamp\) VALUES \(\$1, \$2, \$3, NOW\(\)\)`).
WithArgs(
projection,
aggregateType,
seq,
).
WillReturnError(err)
}
}
func expectUpdateCurrentSequenceNoRows(tableName, projection string, seq uint64, aggregateType string) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec("UPSERT INTO "+tableName+` \(projection_name, aggregate_type, current_sequence, timestamp\) VALUES \(\$1, \$2, \$3, NOW\(\)\)`).
WithArgs(
projection,
aggregateType,
seq,
).
WillReturnResult(
sqlmock.NewResult(0, 0),
)
}
}
func expectLock(lockTable, workerName string, d time.Duration) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec(`INSERT INTO `+lockTable+
` \(locker_id, locked_until, projection_name\) VALUES \(\$1, now\(\)\+\$2::INTERVAL, \$3\)`+
` ON CONFLICT \(projection_name\)`+
` DO UPDATE SET locker_id = \$1, locked_until = now\(\)\+\$2::INTERVAL`+
` WHERE `+lockTable+`\.projection_name = \$3 AND \(`+lockTable+`\.locker_id = \$1 OR `+lockTable+`\.locked_until < now\(\)\)`).
WithArgs(
workerName,
float64(d),
projectionName,
).
WillReturnResult(
sqlmock.NewResult(1, 1),
)
}
}
func expectLockNoRows(lockTable, workerName string, d time.Duration) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec(`INSERT INTO `+lockTable+
` \(locker_id, locked_until, projection_name\) VALUES \(\$1, now\(\)\+\$2::INTERVAL, \$3\)`+
` ON CONFLICT \(projection_name\)`+
` DO UPDATE SET locker_id = \$1, locked_until = now\(\)\+\$2::INTERVAL`+
` WHERE `+lockTable+`\.projection_name = \$3 AND \(`+lockTable+`\.locker_id = \$1 OR `+lockTable+`\.locked_until < now\(\)\)`).
WithArgs(
workerName,
float64(d),
projectionName,
).
WillReturnResult(driver.ResultNoRows)
}
}
func expectLockErr(lockTable, workerName string, d time.Duration, err error) func(sqlmock.Sqlmock) {
return func(m sqlmock.Sqlmock) {
m.ExpectExec(`INSERT INTO `+lockTable+
` \(locker_id, locked_until, projection_name\) VALUES \(\$1, now\(\)\+\$2::INTERVAL, \$3\)`+
` ON CONFLICT \(projection_name\)`+
` DO UPDATE SET locker_id = \$1, locked_until = now\(\)\+\$2::INTERVAL`+
` WHERE `+lockTable+`\.projection_name = \$3 AND \(`+lockTable+`\.locker_id = \$1 OR `+lockTable+`\.locked_until < now\(\)\)`).
WithArgs(
workerName,
float64(d),
projectionName,
).
WillReturnError(err)
}
}

View File

@@ -0,0 +1,53 @@
package crdb
import (
"database/sql"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/handler"
)
const (
setFailureCountStmtFormat = "UPSERT INTO %s" +
" (projection_name, failed_sequence, failure_count, error)" +
" VALUES ($1, $2, $3, $4)"
failureCountStmtFormat = "WITH failures AS (SELECT failure_count FROM %s WHERE projection_name = $1 AND failed_sequence = $2)" +
" SELECT IF(" +
"EXISTS(SELECT failure_count FROM failures)," +
" (SELECT failure_count FROM failures)," +
" 0" +
") AS failure_count"
)
func (h *StatementHandler) handleFailedStmt(tx *sql.Tx, stmt handler.Statement, execErr error) (shouldContinue bool) {
failureCount, err := h.failureCount(tx, stmt.Sequence)
if err != nil {
logging.LogWithFields("CRDB-WJaFk", "projection", h.ProjectionName, "seq", stmt.Sequence).WithError(err).Warn("unable to get failure count")
return false
}
failureCount += 1
err = h.setFailureCount(tx, stmt.Sequence, failureCount, execErr)
logging.LogWithFields("CRDB-cI0dB", "projection", h.ProjectionName, "seq", stmt.Sequence).OnError(err).Warn("unable to update failure count")
return failureCount >= h.maxFailureCount
}
func (h *StatementHandler) failureCount(tx *sql.Tx, seq uint64) (count uint, err error) {
row := tx.QueryRow(h.failureCountStmt, h.ProjectionName, seq)
if err = row.Err(); err != nil {
return 0, errors.ThrowInternal(err, "CRDB-Unnex", "unable to update failure count")
}
if err = row.Scan(&count); err != nil {
return 0, errors.ThrowInternal(err, "CRDB-RwSMV", "unable to scann count")
}
return count, nil
}
func (h *StatementHandler) setFailureCount(tx *sql.Tx, seq uint64, count uint, err error) error {
_, dbErr := tx.Exec(h.setFailureCountStmt, h.ProjectionName, seq, count, err.Error())
if dbErr != nil {
return errors.ThrowInternal(dbErr, "CRDB-4Ht4x", "set failure count failed")
}
return nil
}

View File

@@ -0,0 +1,268 @@
package crdb
import (
"context"
"database/sql"
"fmt"
"os"
"github.com/caos/logging"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/handler"
"github.com/caos/zitadel/internal/id"
)
var (
errSeqNotUpdated = errors.ThrowInternal(nil, "CRDB-79GWt", "current sequence not updated")
)
type StatementHandlerConfig struct {
handler.ProjectionHandlerConfig
Client *sql.DB
SequenceTable string
LockTable string
FailedEventsTable string
MaxFailureCount uint
BulkLimit uint64
Reducers []handler.AggregateReducer
}
type StatementHandler struct {
*handler.ProjectionHandler
client *sql.DB
sequenceTable string
currentSequenceStmt string
updateSequencesBaseStmt string
maxFailureCount uint
failureCountStmt string
setFailureCountStmt string
lockStmt string
aggregates []eventstore.AggregateType
reduces map[eventstore.EventType]handler.Reduce
workerName string
bulkLimit uint64
}
func NewStatementHandler(
ctx context.Context,
config StatementHandlerConfig,
) StatementHandler {
workerName, err := os.Hostname()
if err != nil || workerName == "" {
workerName, err = id.SonyFlakeGenerator.Next()
logging.Log("SPOOL-bdO56").OnError(err).Panic("unable to generate lockID")
}
aggregateTypes := make([]eventstore.AggregateType, 0, len(config.Reducers))
reduces := make(map[eventstore.EventType]handler.Reduce, len(config.Reducers))
for _, aggReducer := range config.Reducers {
aggregateTypes = append(aggregateTypes, aggReducer.Aggregate)
for _, eventReducer := range aggReducer.EventRedusers {
reduces[eventReducer.Event] = eventReducer.Reduce
}
}
h := StatementHandler{
ProjectionHandler: handler.NewProjectionHandler(config.ProjectionHandlerConfig),
client: config.Client,
sequenceTable: config.SequenceTable,
maxFailureCount: config.MaxFailureCount,
currentSequenceStmt: fmt.Sprintf(currentSequenceStmtFormat, config.SequenceTable),
updateSequencesBaseStmt: fmt.Sprintf(updateCurrentSequencesStmtFormat, config.SequenceTable),
failureCountStmt: fmt.Sprintf(failureCountStmtFormat, config.FailedEventsTable),
setFailureCountStmt: fmt.Sprintf(setFailureCountStmtFormat, config.FailedEventsTable),
lockStmt: fmt.Sprintf(lockStmtFormat, config.LockTable),
aggregates: aggregateTypes,
reduces: reduces,
workerName: workerName,
bulkLimit: config.BulkLimit,
}
go h.ProjectionHandler.Process(
ctx,
h.reduce,
h.Update,
h.Lock,
h.Unlock,
h.SearchQuery,
)
h.ProjectionHandler.Handler.Subscribe(h.aggregates...)
return h
}
func (h *StatementHandler) SearchQuery() (*eventstore.SearchQueryBuilder, uint64, error) {
sequences, err := h.currentSequences(h.client.Query)
if err != nil {
return nil, 0, err
}
queryBuilder := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).Limit(h.bulkLimit)
for _, aggregateType := range h.aggregates {
queryBuilder.
AddQuery().
AggregateTypes(aggregateType).
SequenceGreater(sequences[aggregateType])
}
return queryBuilder, h.bulkLimit, nil
}
//Update implements handler.Update
func (h *StatementHandler) Update(ctx context.Context, stmts []handler.Statement, reduce handler.Reduce) (unexecutedStmts []handler.Statement, err error) {
tx, err := h.client.BeginTx(ctx, nil)
if err != nil {
return stmts, errors.ThrowInternal(err, "CRDB-e89Gq", "begin failed")
}
sequences, err := h.currentSequences(tx.Query)
if err != nil {
tx.Rollback()
return stmts, err
}
//checks for events between create statement and current sequence
// because there could be events between current sequence and a creation event
// and we cannot check via stmt.PreviousSequence
if stmts[0].PreviousSequence == 0 {
previousStmts, err := h.fetchPreviousStmts(ctx, stmts[0].Sequence, sequences, reduce)
if err != nil {
tx.Rollback()
return stmts, err
}
stmts = append(previousStmts, stmts...)
}
lastSuccessfulIdx := h.executeStmts(tx, stmts, sequences)
if lastSuccessfulIdx >= 0 {
err = h.updateCurrentSequences(tx, sequences)
if err != nil {
tx.Rollback()
return stmts, err
}
}
if err = tx.Commit(); err != nil {
return stmts, err
}
if lastSuccessfulIdx == -1 {
return stmts, handler.ErrSomeStmtsFailed
}
unexecutedStmts = make([]handler.Statement, len(stmts)-(lastSuccessfulIdx+1))
copy(unexecutedStmts, stmts[lastSuccessfulIdx+1:])
stmts = nil
if len(unexecutedStmts) > 0 {
return unexecutedStmts, handler.ErrSomeStmtsFailed
}
return unexecutedStmts, nil
}
func (h *StatementHandler) fetchPreviousStmts(
ctx context.Context,
stmtSeq uint64,
sequences currentSequences,
reduce handler.Reduce,
) (previousStmts []handler.Statement, err error) {
query := eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent)
queriesAdded := false
for _, aggregateType := range h.aggregates {
if stmtSeq <= sequences[aggregateType] {
continue
}
query.
AddQuery().
AggregateTypes(aggregateType).
SequenceGreater(sequences[aggregateType]).
SequenceLess(stmtSeq)
queriesAdded = true
}
if !queriesAdded {
return nil, nil
}
events, err := h.Eventstore.FilterEvents(ctx, query)
if err != nil {
return nil, err
}
for _, event := range events {
stmts, err := reduce(event)
if err != nil {
return nil, err
}
previousStmts = append(previousStmts, stmts...)
}
return previousStmts, nil
}
func (h *StatementHandler) executeStmts(
tx *sql.Tx,
stmts []handler.Statement,
sequences currentSequences,
) int {
lastSuccessfulIdx := -1
for i, stmt := range stmts {
if stmt.Sequence <= sequences[stmt.AggregateType] {
continue
}
if stmt.PreviousSequence > 0 && stmt.PreviousSequence != sequences[stmt.AggregateType] {
logging.LogWithFields("CRDB-jJBJn", "projection", h.ProjectionName, "aggregateType", stmt.AggregateType, "seq", stmt.Sequence, "prevSeq", stmt.PreviousSequence, "currentSeq", sequences[stmt.AggregateType]).Warn("sequences do not match")
break
}
err := h.executeStmt(tx, stmt)
if err == nil {
sequences[stmt.AggregateType], lastSuccessfulIdx = stmt.Sequence, i
continue
}
shouldContinue := h.handleFailedStmt(tx, stmt, err)
if !shouldContinue {
break
}
sequences[stmt.AggregateType], lastSuccessfulIdx = stmt.Sequence, i
}
return lastSuccessfulIdx
}
//executeStmt handles sql statements
//an error is returned if the statement could not be inserted properly
func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt handler.Statement) error {
if stmt.IsNoop() {
return nil
}
_, err := tx.Exec("SAVEPOINT push_stmt")
if err != nil {
return errors.ThrowInternal(err, "CRDB-i1wp6", "unable to create savepoint")
}
err = stmt.Execute(tx, h.ProjectionName)
if err != nil {
_, rollbackErr := tx.Exec("ROLLBACK TO SAVEPOINT push_stmt")
if rollbackErr != nil {
return errors.ThrowInternal(rollbackErr, "CRDB-zzp3P", "rollback to savepoint failed")
}
return errors.ThrowInternal(err, "CRDB-oRkaN", "unable execute stmt")
}
_, err = tx.Exec("RELEASE push_stmt")
if err != nil {
return errors.ThrowInternal(err, "CRDB-qWgwT", "unable to release savepoint")
}
return nil
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,60 @@
package crdb
import (
"context"
"time"
"github.com/caos/zitadel/internal/errors"
)
const (
lockStmtFormat = "INSERT INTO %[1]s" +
" (locker_id, locked_until, projection_name) VALUES ($1, now()+$2::INTERVAL, $3)" +
" ON CONFLICT (projection_name)" +
" DO UPDATE SET locker_id = $1, locked_until = now()+$2::INTERVAL" +
" WHERE %[1]s.projection_name = $3 AND (%[1]s.locker_id = $1 OR %[1]s.locked_until < now())"
)
func (h *StatementHandler) Lock(ctx context.Context, lockDuration time.Duration) <-chan error {
errs := make(chan error)
go h.handleLock(ctx, errs, lockDuration)
return errs
}
func (h *StatementHandler) handleLock(ctx context.Context, errs chan error, lockDuration time.Duration) {
renewLock := time.NewTimer(0)
for {
select {
case <-renewLock.C:
errs <- h.renewLock(ctx, lockDuration)
//refresh the lock 500ms before it times out. 500ms should be enough for one transaction
renewLock.Reset(lockDuration - (500 * time.Millisecond))
case <-ctx.Done():
close(errs)
renewLock.Stop()
return
}
}
}
func (h *StatementHandler) renewLock(ctx context.Context, lockDuration time.Duration) error {
//the unit of crdb interval is seconds (https://www.cockroachlabs.com/docs/stable/interval.html).
res, err := h.client.Exec(h.lockStmt, h.workerName, lockDuration.Seconds(), h.ProjectionName)
if err != nil {
return errors.ThrowInternal(err, "CRDB-uaDoR", "unable to execute lock")
}
if rows, _ := res.RowsAffected(); rows == 0 {
return errors.ThrowAlreadyExists(nil, "CRDB-mmi4J", "projection already locked")
}
return nil
}
func (h *StatementHandler) Unlock() error {
_, err := h.client.Exec(h.lockStmt, h.workerName, float64(0), h.ProjectionName)
if err != nil {
return errors.ThrowUnknown(err, "CRDB-JjfwO", "unlock failed")
}
return nil
}

View File

@@ -0,0 +1,286 @@
package crdb
import (
"context"
"database/sql"
"errors"
"fmt"
"testing"
"time"
z_errs "github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore/handler"
"github.com/DATA-DOG/go-sqlmock"
)
const (
workerName = "test_worker"
projectionName = "my_projection"
lockTable = "my_lock_table"
)
var (
renewNoRowsAffectedErr = z_errs.ThrowAlreadyExists(nil, "CRDB-mmi4J", "projection already locked")
errLock = errors.New("lock err")
)
func TestStatementHandler_handleLock(t *testing.T) {
type want struct {
expectations []mockExpectation
}
type args struct {
lockDuration time.Duration
ctx context.Context
errMock *errsMock
}
tests := []struct {
name string
want want
args args
}{
{
name: "lock fails",
want: want{
expectations: []mockExpectation{
expectLock(lockTable, workerName, 2),
expectLock(lockTable, workerName, 2),
expectLockErr(lockTable, workerName, 2, errLock),
},
},
args: args{
lockDuration: 2 * time.Second,
ctx: context.Background(),
errMock: &errsMock{
errs: make(chan error),
successfulIters: 2,
shouldErr: true,
},
},
},
{
name: "success",
want: want{
expectations: []mockExpectation{
expectLock(lockTable, workerName, 2),
expectLock(lockTable, workerName, 2),
},
},
args: args{
lockDuration: 2 * time.Second,
ctx: context.Background(),
errMock: &errsMock{
errs: make(chan error),
successfulIters: 2,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, mock, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
h := &StatementHandler{
ProjectionHandler: &handler.ProjectionHandler{
ProjectionName: projectionName,
},
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
}
for _, expectation := range tt.want.expectations {
expectation(mock)
}
ctx, cancel := context.WithCancel(tt.args.ctx)
go tt.args.errMock.handleErrs(t, cancel)
go h.handleLock(ctx, tt.args.errMock.errs, tt.args.lockDuration)
<-ctx.Done()
mock.MatchExpectationsInOrder(true)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations not met: %v", err)
}
})
}
}
func TestStatementHandler_renewLock(t *testing.T) {
type want struct {
expectations []mockExpectation
isErr func(err error) bool
}
type args struct {
lockDuration time.Duration
}
tests := []struct {
name string
want want
args args
}{
{
name: "lock fails",
want: want{
expectations: []mockExpectation{
expectLockErr(lockTable, workerName, 1, sql.ErrTxDone),
},
isErr: func(err error) bool {
return errors.Is(err, sql.ErrTxDone)
},
},
args: args{
lockDuration: 1 * time.Second,
},
},
{
name: "lock no rows",
want: want{
expectations: []mockExpectation{
expectLockNoRows(lockTable, workerName, 2),
},
isErr: func(err error) bool {
return errors.As(err, &renewNoRowsAffectedErr)
},
},
args: args{
lockDuration: 2 * time.Second,
},
},
{
name: "success",
want: want{
expectations: []mockExpectation{
expectLock(lockTable, workerName, 3),
},
isErr: func(err error) bool {
return errors.Is(err, nil)
},
},
args: args{
lockDuration: 3 * time.Second,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, mock, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
h := &StatementHandler{
ProjectionHandler: &handler.ProjectionHandler{
ProjectionName: projectionName,
},
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
}
for _, expectation := range tt.want.expectations {
expectation(mock)
}
err = h.renewLock(context.Background(), tt.args.lockDuration)
if !tt.want.isErr(err) {
t.Errorf("unexpected error = %v", err)
}
mock.MatchExpectationsInOrder(true)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations not met: %v", err)
}
})
}
}
func TestStatementHandler_Unlock(t *testing.T) {
type want struct {
expectations []mockExpectation
isErr func(err error) bool
}
tests := []struct {
name string
want want
}{
{
name: "unlock fails",
want: want{
expectations: []mockExpectation{
expectLockErr(lockTable, workerName, 0, sql.ErrTxDone),
},
isErr: func(err error) bool {
return errors.Is(err, sql.ErrTxDone)
},
},
},
{
name: "success",
want: want{
expectations: []mockExpectation{
expectLock(lockTable, workerName, 0),
},
isErr: func(err error) bool {
return errors.Is(err, nil)
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, mock, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
h := &StatementHandler{
ProjectionHandler: &handler.ProjectionHandler{
ProjectionName: projectionName,
},
client: client,
workerName: workerName,
lockStmt: fmt.Sprintf(lockStmtFormat, lockTable),
}
for _, expectation := range tt.want.expectations {
expectation(mock)
}
err = h.Unlock()
if !tt.want.isErr(err) {
t.Errorf("unexpected error = %v", err)
}
mock.MatchExpectationsInOrder(true)
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("expectations not met: %v", err)
}
})
}
}
type errsMock struct {
errs chan error
successfulIters int
shouldErr bool
}
func (m *errsMock) handleErrs(t *testing.T, cancel func()) {
for i := 0; i < m.successfulIters; i++ {
if err := <-m.errs; err != nil {
t.Errorf("unexpected err in iteration %d: %v", i, err)
cancel()
return
}
}
if m.shouldErr {
if err := <-m.errs; err == nil {
t.Error("error must not be nil")
}
}
cancel()
}

View File

@@ -0,0 +1,16 @@
package crdb
import (
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/handler"
)
//reduce implements handler.Reduce function
func (h *StatementHandler) reduce(event eventstore.EventReader) ([]handler.Statement, error) {
reduce, ok := h.reduces[event.Type()]
if !ok {
return []handler.Statement{NewNoOpStatement(event)}, nil
}
return reduce(event)
}

View File

@@ -0,0 +1,190 @@
package crdb
import (
"strconv"
"strings"
"github.com/caos/zitadel/internal/errors"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/handler"
)
type execOption func(*execConfig)
type execConfig struct {
tableName string
args []interface{}
err error
}
func WithTableSuffix(name string) func(*execConfig) {
return func(o *execConfig) {
o.tableName += "_" + name
}
}
func NewCreateStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
config := execConfig{
args: args,
}
if len(values) == 0 {
config.err = handler.ErrNoValues
}
q := func(config execConfig) string {
return "INSERT INTO " + config.tableName + " (" + columnNames + ") VALUES (" + valuesPlaceholder + ")"
}
return handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
Execute: exec(config, q, opts),
}
}
func NewUpsertStatement(event eventstore.EventReader, values []handler.Column, opts ...execOption) handler.Statement {
cols, params, args := columnsToQuery(values)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
config := execConfig{
args: args,
}
if len(values) == 0 {
config.err = handler.ErrNoValues
}
q := func(config execConfig) string {
return "UPSERT INTO " + config.tableName + " (" + columnNames + ") VALUES (" + valuesPlaceholder + ")"
}
return handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
Execute: exec(config, q, opts),
}
}
func NewUpdateStatement(event eventstore.EventReader, values []handler.Column, conditions []handler.Condition, opts ...execOption) handler.Statement {
cols, params, args := columnsToQuery(values)
wheres, whereArgs := conditionsToWhere(conditions, len(params))
args = append(args, whereArgs...)
columnNames := strings.Join(cols, ", ")
valuesPlaceholder := strings.Join(params, ", ")
wheresPlaceholders := strings.Join(wheres, " AND ")
config := execConfig{
args: args,
}
if len(values) == 0 {
config.err = handler.ErrNoValues
}
if len(conditions) == 0 {
config.err = handler.ErrNoCondition
}
q := func(config execConfig) string {
return "UPDATE " + config.tableName + " SET (" + columnNames + ") = (" + valuesPlaceholder + ") WHERE " + wheresPlaceholders
}
return handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
Execute: exec(config, q, opts),
}
}
func NewDeleteStatement(event eventstore.EventReader, conditions []handler.Condition, opts ...execOption) handler.Statement {
wheres, args := conditionsToWhere(conditions, 0)
wheresPlaceholders := strings.Join(wheres, " AND ")
config := execConfig{
args: args,
}
if len(conditions) == 0 {
config.err = handler.ErrNoCondition
}
q := func(config execConfig) string {
return "DELETE FROM " + config.tableName + " WHERE " + wheresPlaceholders
}
return handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
Execute: exec(config, q, opts),
}
}
func NewNoOpStatement(event eventstore.EventReader) handler.Statement {
return handler.Statement{
AggregateType: event.Aggregate().Type,
Sequence: event.Sequence(),
PreviousSequence: event.PreviousAggregateTypeSequence(),
}
}
func columnsToQuery(cols []handler.Column) (names []string, parameters []string, values []interface{}) {
names = make([]string, len(cols))
values = make([]interface{}, len(cols))
parameters = make([]string, len(cols))
for i, col := range cols {
names[i] = col.Name
values[i] = col.Value
parameters[i] = "$" + strconv.Itoa(i+1)
}
return names, parameters, values
}
func conditionsToWhere(cols []handler.Condition, paramOffset int) (wheres []string, values []interface{}) {
wheres = make([]string, len(cols))
values = make([]interface{}, len(cols))
for i, col := range cols {
wheres[i] = "(" + col.Name + " = $" + strconv.Itoa(i+1+paramOffset) + ")"
values[i] = col.Value
}
return wheres, values
}
type query func(config execConfig) string
func exec(config execConfig, q query, opts []execOption) func(ex handler.Executer, projectionName string) error {
return func(ex handler.Executer, projectionName string) error {
if projectionName == "" {
return handler.ErrNoProjection
}
if config.err != nil {
return config.err
}
config.tableName = projectionName
for _, opt := range opts {
opt(&config)
}
if _, err := ex.Exec(q(config), config.args...); err != nil {
return errors.ThrowInternal(err, "CRDB-pKtsr", "exec failed")
}
return nil
}
}

View File

@@ -0,0 +1,822 @@
package crdb
import (
"database/sql"
"errors"
"reflect"
"testing"
"github.com/caos/zitadel/internal/eventstore"
"github.com/caos/zitadel/internal/eventstore/handler"
)
type wantExecuter struct {
query string
args []interface{}
t *testing.T
wasExecuted bool
shouldExecute bool
}
var errTestErr = errors.New("some error")
func (ex *wantExecuter) check(t *testing.T) {
t.Helper()
if ex.wasExecuted && !ex.shouldExecute {
t.Error("executer should not be executed")
} else if !ex.wasExecuted && ex.shouldExecute {
t.Error("executer should be executed")
} else if ex.wasExecuted != ex.shouldExecute {
t.Errorf("executed missmatched should be %t, but was %t", ex.shouldExecute, ex.wasExecuted)
}
}
func (ex *wantExecuter) Exec(query string, args ...interface{}) (sql.Result, error) {
ex.t.Helper()
ex.wasExecuted = true
if query != ex.query {
ex.t.Errorf("wrong query:\n expected:\n %q\n got:\n %q", ex.query, query)
}
if !reflect.DeepEqual(ex.args, args) {
ex.t.Errorf("wrong args:\n expected:\n %v\n got:\n %v", ex.args, args)
}
return nil, nil
}
func TestNewCreateStatement(t *testing.T) {
type args struct {
table string
event *testEvent
values []handler.Column
}
type want struct {
aggregateType eventstore.AggregateType
sequence uint64
previousSequence uint64
table string
executer *wantExecuter
isErr func(error) bool
}
tests := []struct {
name string
args args
want want
}{
{
name: "no table",
args: args{
table: "",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
},
want: want{
table: "",
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoProjection)
},
},
},
{
name: "no values",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoValues)
},
},
},
{
name: "correct",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "INSERT INTO my_table (col1) VALUES ($1)",
shouldExecute: true,
args: []interface{}{"val"},
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.want.executer.t = t
stmt := NewCreateStatement(tt.args.event, tt.args.values)
err := stmt.Execute(tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
tt.want.executer.check(t)
})
}
}
func TestNewUpsertStatement(t *testing.T) {
type args struct {
table string
event *testEvent
values []handler.Column
}
type want struct {
aggregateType eventstore.AggregateType
sequence uint64
previousSequence uint64
table string
executer *wantExecuter
isErr func(error) bool
}
tests := []struct {
name string
args args
want want
}{
{
name: "no table",
args: args{
table: "",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
},
want: want{
table: "",
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoProjection)
},
},
},
{
name: "no values",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoValues)
},
},
},
{
name: "correct",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "UPSERT INTO my_table (col1) VALUES ($1)",
shouldExecute: true,
args: []interface{}{"val"},
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.want.executer.t = t
stmt := NewUpsertStatement(tt.args.event, tt.args.values)
err := stmt.Execute(tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
tt.want.executer.check(t)
})
}
}
func TestNewUpdateStatement(t *testing.T) {
type args struct {
table string
event *testEvent
values []handler.Column
conditions []handler.Condition
}
type want struct {
table string
aggregateType eventstore.AggregateType
sequence uint64
previousSequence uint64
executer *wantExecuter
isErr func(error) bool
}
tests := []struct {
name string
args args
want want
}{
{
name: "no table",
args: args{
table: "",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
conditions: []handler.Condition{
{
Name: "col2",
Value: 1,
},
},
},
want: want{
table: "",
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoProjection)
},
},
},
{
name: "no values",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{},
conditions: []handler.Condition{
{
Name: "col2",
Value: 1,
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoValues)
},
},
},
{
name: "no conditions",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
conditions: []handler.Condition{},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoCondition)
},
},
},
{
name: "correct",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
},
conditions: []handler.Condition{
{
Name: "col2",
Value: 1,
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "UPDATE my_table SET (col1) = ($1) WHERE (col2 = $2)",
shouldExecute: true,
args: []interface{}{"val", 1},
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.want.executer.t = t
stmt := NewUpdateStatement(tt.args.event, tt.args.values, tt.args.conditions)
err := stmt.Execute(tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
tt.want.executer.check(t)
})
}
}
func TestNewDeleteStatement(t *testing.T) {
type args struct {
table string
event *testEvent
conditions []handler.Condition
}
type want struct {
table string
aggregateType eventstore.AggregateType
sequence uint64
previousSequence uint64
executer *wantExecuter
isErr func(error) bool
}
tests := []struct {
name string
args args
want want
}{
{
name: "no table",
args: args{
table: "",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
conditions: []handler.Condition{
{
Name: "col2",
Value: 1,
},
},
},
want: want{
table: "",
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoProjection)
},
},
},
{
name: "no conditions",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
conditions: []handler.Condition{},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoCondition)
},
},
},
{
name: "correct",
args: args{
table: "my_table",
event: &testEvent{
sequence: 1,
previousSequence: 0,
aggregateType: "agg",
},
conditions: []handler.Condition{
{
Name: "col1",
Value: 1,
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
query: "DELETE FROM my_table WHERE (col1 = $1)",
shouldExecute: true,
args: []interface{}{1},
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.want.executer.t = t
stmt := NewDeleteStatement(tt.args.event, tt.args.conditions)
err := stmt.Execute(tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
tt.want.executer.check(t)
})
}
}
func TestNewNoOpStatement(t *testing.T) {
type args struct {
event *testEvent
}
tests := []struct {
name string
args args
want handler.Statement
}{
{
name: "generate correctly",
args: args{
event: &testEvent{
aggregateType: "agg",
sequence: 5,
previousSequence: 3,
},
},
want: handler.Statement{
AggregateType: "agg",
Execute: nil,
Sequence: 5,
PreviousSequence: 3,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewNoOpStatement(tt.args.event); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewNoOpStatement() = %v, want %v", got, tt.want)
}
})
}
}
func TestStatement_Execute(t *testing.T) {
type fields struct {
execute func(ex handler.Executer, projectionName string) error
}
type want struct {
isErr func(error) bool
}
type args struct {
projectionName string
}
tests := []struct {
name string
args args
fields fields
want want
}{
{
name: "execute returns no error",
fields: fields{
execute: func(ex handler.Executer, projectionName string) error { return nil },
},
args: args{
projectionName: "my_projection",
},
want: want{
isErr: func(err error) bool {
return err == nil
},
},
},
{
name: "execute returns error",
args: args{
projectionName: "my_projection",
},
fields: fields{
execute: func(ex handler.Executer, projectionName string) error { return errTestErr },
},
want: want{
isErr: func(err error) bool {
return errors.Is(err, errTestErr)
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stmt := &handler.Statement{
Execute: tt.fields.execute,
}
if err := stmt.Execute(nil, tt.args.projectionName); !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
})
}
}
func Test_columnsToQuery(t *testing.T) {
type args struct {
cols []handler.Column
}
type want struct {
names []string
params []string
values []interface{}
}
tests := []struct {
name string
args args
want want
}{
{
name: "no columns",
args: args{},
want: want{
names: []string{},
params: []string{},
values: []interface{}{},
},
},
{
name: "one column",
args: args{
cols: []handler.Column{
{
Name: "col1",
Value: 1,
},
},
},
want: want{
names: []string{"col1"},
params: []string{"$1"},
values: []interface{}{1},
},
},
{
name: "multiple columns",
args: args{
cols: []handler.Column{
{
Name: "col1",
Value: 1,
},
{
Name: "col2",
Value: 3.14,
},
},
},
want: want{
names: []string{"col1", "col2"},
params: []string{"$1", "$2"},
values: []interface{}{1, 3.14},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotNames, gotParameters, gotValues := columnsToQuery(tt.args.cols)
if !reflect.DeepEqual(gotNames, tt.want.names) {
t.Errorf("columnsToQuery() gotNames = %v, want %v", gotNames, tt.want.names)
}
if !reflect.DeepEqual(gotParameters, tt.want.params) {
t.Errorf("columnsToQuery() gotParameters = %v, want %v", gotParameters, tt.want.params)
}
if !reflect.DeepEqual(gotValues, tt.want.values) {
t.Errorf("columnsToQuery() gotValues = %v, want %v", gotValues, tt.want.values)
}
})
}
}
func Test_columnsToWhere(t *testing.T) {
type args struct {
conds []handler.Condition
paramOffset int
}
type want struct {
wheres []string
values []interface{}
}
tests := []struct {
name string
args args
want want
}{
{
name: "no wheres",
args: args{},
want: want{
wheres: []string{},
values: []interface{}{},
},
},
{
name: "no offset",
args: args{
conds: []handler.Condition{
{
Name: "col1",
Value: "val1",
},
},
paramOffset: 0,
},
want: want{
wheres: []string{"(col1 = $1)"},
values: []interface{}{"val1"},
},
},
{
name: "multiple cols",
args: args{
conds: []handler.Condition{
{
Name: "col1",
Value: "val1",
},
{
Name: "col2",
Value: "val2",
},
},
paramOffset: 0,
},
want: want{
wheres: []string{"(col1 = $1)", "(col2 = $2)"},
values: []interface{}{"val1", "val2"},
},
},
{
name: "2 offset",
args: args{
conds: []handler.Condition{
{
Name: "col1",
Value: "val1",
},
},
paramOffset: 2,
},
want: want{
wheres: []string{"(col1 = $3)"},
values: []interface{}{"val1"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotWheres, gotValues := conditionsToWhere(tt.args.conds, tt.args.paramOffset)
if !reflect.DeepEqual(gotWheres, tt.want.wheres) {
t.Errorf("columnsToWhere() gotWheres = %v, want %v", gotWheres, tt.want.wheres)
}
if !reflect.DeepEqual(gotValues, tt.want.values) {
t.Errorf("columnsToWhere() gotValues = %v, want %v", gotValues, tt.want.values)
}
})
}
}