mirror of
https://github.com/zitadel/zitadel.git
synced 2025-02-28 23:27:23 +00:00
feat(eventstore): add spooler (#78)
* add spooler to eventstore * Update internal/eventstore/spooler/spooler.go Co-Authored-By: Silvan <silvan.reusser@gmail.com> * Update internal/eventstore/spooler/spooler_test.go Co-Authored-By: Silvan <silvan.reusser@gmail.com> * remove comments * fix race condition in test Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
parent
33a4802425
commit
b9aae0b2e3
29
go.mod
29
go.mod
@ -12,39 +12,42 @@ require (
|
||||
github.com/Masterminds/sprig v2.22.0+incompatible
|
||||
github.com/VictoriaMetrics/fastcache v1.5.7
|
||||
github.com/allegro/bigcache v1.2.1
|
||||
github.com/aws/aws-sdk-go v1.30.4 // indirect
|
||||
github.com/aws/aws-sdk-go v1.30.16 // indirect
|
||||
github.com/caos/logging v0.0.1
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20200312223839-f565e4789405
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20200411195601-6f5842749cfc
|
||||
github.com/envoyproxy/protoc-gen-validate v0.3.0
|
||||
github.com/ghodss/yaml v1.0.0
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
|
||||
github.com/golang/mock v1.4.3
|
||||
github.com/golang/protobuf v1.4.0-rc.4
|
||||
github.com/golang/protobuf v1.4.0
|
||||
github.com/google/uuid v1.1.1 // indirect
|
||||
github.com/gorilla/schema v1.1.0
|
||||
github.com/gorilla/securecookie v1.1.1
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.3
|
||||
github.com/huandu/xstrings v1.3.0 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.4
|
||||
github.com/huandu/xstrings v1.3.1 // indirect
|
||||
github.com/imdario/mergo v0.3.9 // indirect
|
||||
github.com/jackc/pgconn v1.5.0 // indirect
|
||||
github.com/jinzhu/gorm v1.9.12
|
||||
github.com/lib/pq v1.3.0
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
|
||||
github.com/lib/pq v1.4.0
|
||||
github.com/mitchellh/copystructure v1.0.0 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.1 // indirect
|
||||
github.com/nicksnyder/go-i18n/v2 v2.0.3
|
||||
github.com/rs/cors v1.7.0
|
||||
github.com/sethvargo/go-password v0.1.3
|
||||
github.com/sirupsen/logrus v1.5.0 // indirect
|
||||
github.com/sony/sonyflake v1.0.0
|
||||
github.com/stretchr/testify v1.5.1
|
||||
go.opencensus.io v0.22.3
|
||||
golang.org/x/crypto v0.0.0-20200403201458-baeed622b8d8
|
||||
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 // indirect
|
||||
golang.org/x/sys v0.0.0-20200428200454-593003d681fa // indirect
|
||||
golang.org/x/text v0.3.2
|
||||
golang.org/x/tools v0.0.0-20200403190813-44a64ad78b9b
|
||||
google.golang.org/api v0.21.0 // indirect
|
||||
google.golang.org/genproto v0.0.0-20200403120447-c50568487044
|
||||
google.golang.org/grpc v1.28.0
|
||||
google.golang.org/protobuf v1.20.1
|
||||
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32
|
||||
google.golang.org/api v0.22.0 // indirect
|
||||
google.golang.org/appengine v1.6.6 // indirect
|
||||
google.golang.org/genproto v0.0.0-20200428115010-c45acf45369a
|
||||
google.golang.org/grpc v1.29.1
|
||||
google.golang.org/protobuf v1.21.0
|
||||
gopkg.in/yaml.v2 v2.2.8 // indirect
|
||||
)
|
||||
|
44
go.sum
44
go.sum
@ -50,10 +50,10 @@ github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQY
|
||||
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
|
||||
github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aws/aws-sdk-go v1.23.20/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
|
||||
github.com/aws/aws-sdk-go v1.30.1 h1:cUMxtoFvIHhScZgv17tGxw15r6rVKJHR1hsIFRx9hcA=
|
||||
github.com/aws/aws-sdk-go v1.30.1/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||
github.com/aws/aws-sdk-go v1.30.4 h1:dpQgypC3rld2Uuz+/2u+0nbfmmyEWxau6v1hdAlvoc8=
|
||||
github.com/aws/aws-sdk-go v1.30.4/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||
github.com/aws/aws-sdk-go v1.30.16 h1:1eabOZiVGl+XB02/VG9NpR+3fngaNc74pgb5RmyzgLY=
|
||||
github.com/aws/aws-sdk-go v1.30.16/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
|
||||
github.com/caos/logging v0.0.1 h1:YSGtO2/+5OWdwilBCou50akoDHAT/OhkbrolkVlR6U0=
|
||||
github.com/caos/logging v0.0.1 h1:YSGtO2/+5OWdwilBCou50akoDHAT/OhkbrolkVlR6U0=
|
||||
github.com/caos/logging v0.0.1/go.mod h1:9LKiDE2ChuGv6CHYif/kiugrfEXu9AwDiFWSreX7Wp0=
|
||||
@ -71,6 +71,8 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX
|
||||
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20200312223839-f565e4789405 h1:i1XXyBMAGL7NqogtoS6NHQ/IJwCbG0R725hAhEhldOI=
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20200312223839-f565e4789405/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20200411195601-6f5842749cfc h1:8pe9G3mY+h/qW3ysdFz9Eht4kbIsL0BS5FWih1Sn6JU=
|
||||
github.com/cockroachdb/cockroach-go v0.0.0-20200411195601-6f5842749cfc/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
|
||||
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
|
||||
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
|
||||
@ -79,6 +81,7 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
|
||||
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
|
||||
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd h1:83Wprp6ROGeiHFAP8WJdI2RoxALQYgdllERc3N5N2DM=
|
||||
github.com/denisenkom/go-mssqldb v0.0.0-20191124224453-732737034ffd/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU=
|
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
@ -87,6 +90,7 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrp
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.3.0 h1:Y2J74o+yAfcD8jpqtkLnUqRo+yshLr4eR1WPYGX0cic=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.3.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 h1:Yzb9+7DPaBjB8zlTR87/ElzFsnQfuHnVUVqpZZIcV5Y=
|
||||
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
|
||||
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
@ -103,6 +107,7 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
|
||||
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
|
||||
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
|
||||
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
|
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
|
||||
@ -126,6 +131,9 @@ github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:x
|
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
|
||||
github.com/golang/protobuf v1.4.0-rc.4 h1:+EOh4OY6tjM6ZueeUKinl1f0U2820HzQOuf1iqMnsks=
|
||||
github.com/golang/protobuf v1.4.0-rc.4/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
|
||||
github.com/golang/protobuf v1.4.0 h1:oOuy+ugB+P/kBdUnG5QaMXSIyJ1q38wWSojYCb3z5VQ=
|
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
|
||||
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
|
||||
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||
@ -156,10 +164,14 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.2.0 h1:0IKlLyQ3Hs9nDaiK5cSHAGmcQ
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0/go.mod h1:mJzapYve32yjrKlk9GbyCZHuPgZsrbyIbyKhSzOpg6s=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.4 h1:IOPK2xMPP3aV6/NPt4jt//ELFo3Vv8sDVD8j3+tleDU=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.14.4/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0=
|
||||
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
|
||||
github.com/huandu/xstrings v1.3.0 h1:gvV6jG9dTgFEncxo+AF7PH6MZXi/vZl25owA/8Dg8Wo=
|
||||
github.com/huandu/xstrings v1.3.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
|
||||
github.com/huandu/xstrings v1.3.1 h1:4jgBlKK6tLKFvO8u5pmYjG91cqytmDCDvGh7ECVFfFs=
|
||||
github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg=
|
||||
@ -202,6 +214,7 @@ github.com/jinzhu/gorm v1.9.12 h1:Drgk1clyWT9t9ERbzHza6Mj/8FY/CqMyVzOiHviMo6Q=
|
||||
github.com/jinzhu/gorm v1.9.12/go.mod h1:vhTjlKSJUTWNtcbQtrMBFCxy7eXTzeCAzfL5fBZT/Qs=
|
||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.0.1 h1:HjfetcXq097iXP0uoPCdnM4Efp5/9MsM0/M+XOTeR3M=
|
||||
github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
|
||||
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
|
||||
@ -214,6 +227,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3 h1:CE8S1cTafDpPvMhIxNJKvHsGVBgn1xWYf1NbHQhywc8=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
|
||||
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
@ -227,9 +242,12 @@ github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
|
||||
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/lib/pq v1.4.0 h1:TmtCFbH+Aw0AixwyttznSMQDgbR5Yed/Gg6S8Funrhc=
|
||||
github.com/lib/pq v1.4.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
|
||||
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
|
||||
github.com/mattn/go-sqlite3 v2.0.1+incompatible h1:xQ15muvnzGBHpIpdrNi1DA5x0+TcBZzsIDwmw9uTHzw=
|
||||
github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
|
||||
github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ=
|
||||
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
|
||||
@ -254,8 +272,6 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
|
||||
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
|
||||
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
|
||||
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
|
||||
github.com/sethvargo/go-password v0.1.3 h1:18KkbGDkw8SuzeohAbWqBLNSfRQblVwEHOLbPa0PvWM=
|
||||
github.com/sethvargo/go-password v0.1.3/go.mod h1:2tyaaoHK/AlXwh5WWQDYjqQbHcq4cjPj5qb/ciYvu/Q=
|
||||
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
|
||||
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
|
||||
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
|
||||
@ -303,6 +319,8 @@ golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59 h1:3zb4D3T4G8jdExgVU/95+v
|
||||
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200403201458-baeed622b8d8 h1:fpnn/HnJONpIu6hkXi1u/7rR0NzilgWr4T0JmWkEitk=
|
||||
golang.org/x/crypto v0.0.0-20200403201458-baeed622b8d8/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc h1:ZGI/fILM2+ueot/UixBSoj9188jCAxVHEZEGhqq67I4=
|
||||
golang.org/x/crypto v0.0.0-20200427165652-729f1e841bcc/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
|
||||
@ -355,6 +373,8 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e h1:3G+cUijn7XD+S4eJFddp53Pv7+slrESplyjG25HgL+k=
|
||||
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U=
|
||||
golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
@ -399,6 +419,8 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20u
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d h1:nc5K6ox/4lTFbMVSL9WRR81ixkcwXThoiF6yf+R9scA=
|
||||
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200428200454-593003d681fa h1:yMbJOvnfYkO1dSAviTu/ZguZWLBTXx4xE3LYrxUCCiA=
|
||||
golang.org/x/sys v0.0.0-20200428200454-593003d681fa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
@ -444,10 +466,10 @@ golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapK
|
||||
golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
|
||||
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4 h1:kDtqNkeBrZb8B+atrj50B5XLHpzXXqcCdZPP/ApQ5NY=
|
||||
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
|
||||
golang.org/x/tools v0.0.0-20200331202046-9d5940d49312 h1:2PHG+Ia3gK1K2kjxZnSylizb//eyaMG8gDFbOG7wLV8=
|
||||
golang.org/x/tools v0.0.0-20200331202046-9d5940d49312/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20200403190813-44a64ad78b9b h1:AFZdJUT7jJYXQEC29hYH/WZkoV7+KhwxQGmdZ19yYoY=
|
||||
golang.org/x/tools v0.0.0-20200403190813-44a64ad78b9b/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 h1:Xvf3ZQTm5bjXPxhI7g+dwqsCqadK1rcNtwtszuatetk=
|
||||
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
@ -468,6 +490,8 @@ google.golang.org/api v0.20.0 h1:jz2KixHX7EcCPiQrySzPdnYT7DbINAypCqKZ1Z7GM40=
|
||||
google.golang.org/api v0.20.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
|
||||
google.golang.org/api v0.21.0 h1:zS+Q/CJJnVlXpXQVIz+lH0ZT2lBuT2ac7XD8Y/3w6hY=
|
||||
google.golang.org/api v0.21.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
|
||||
google.golang.org/api v0.22.0 h1:J1Pl9P2lnmYFSJvgs70DKELqHNh8CNWXPbud4njEE2s=
|
||||
google.golang.org/api v0.22.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
@ -475,6 +499,8 @@ google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww
|
||||
google.golang.org/appengine v1.6.2/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
|
||||
google.golang.org/appengine v1.6.5 h1:tycE03LOZYQNhDpS27tcQdAzLCVMaj7QT2SXxebnpCM=
|
||||
google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc=
|
||||
google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE=
|
||||
@ -497,6 +523,8 @@ google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940 h1:MRHtG0U6SnaUb+s
|
||||
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/genproto v0.0.0-20200403120447-c50568487044 h1:112OPFAnKD+XtZhdffFnz17dEbiCTCKfnNQCAWmwFzA=
|
||||
google.golang.org/genproto v0.0.0-20200403120447-c50568487044/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/genproto v0.0.0-20200428115010-c45acf45369a h1:ykRcNp3dotYGpAEIYeWCGaefklVjVy/rnSvM3zNh6j8=
|
||||
google.golang.org/genproto v0.0.0-20200428115010-c45acf45369a/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
|
||||
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
|
||||
@ -509,12 +537,16 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
|
||||
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
|
||||
google.golang.org/grpc v1.28.0 h1:bO/TA4OxCOummhSf10siHuG7vJOiwh7SpRpFZDkOgl4=
|
||||
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
|
||||
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
|
||||
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.20.1 h1:ESRXHgpUBG5D2I5mmsQIyYxB/tQIZfSZ8wLyFDf/N/U=
|
||||
google.golang.org/protobuf v1.20.1/go.mod h1:KqelGeouBkcbcuB3HCk4/YH2tmNLk6YSWA5LIWeI/lY=
|
||||
google.golang.org/protobuf v1.21.0 h1:qdOKuR/EIArgaWNjetjgTzgVTAZ+S/WXVrq9HW9zimw=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
|
||||
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
|
11
internal/eventstore/query/handler.go
Executable file
11
internal/eventstore/query/handler.go
Executable file
@ -0,0 +1,11 @@
|
||||
package query
|
||||
|
||||
import (
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
)
|
||||
|
||||
type Handler interface {
|
||||
ViewModel() string
|
||||
EventQuery() (*models.SearchQuery, error)
|
||||
Process(*models.Event) error
|
||||
}
|
29
internal/eventstore/spooler/config.go
Normal file
29
internal/eventstore/spooler/config.go
Normal file
@ -0,0 +1,29 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/sony/sonyflake"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Eventstore eventstore.Eventstore
|
||||
Locker Locker
|
||||
ViewHandlers []Handler
|
||||
ConcurrentTasks int
|
||||
}
|
||||
|
||||
func (c *Config) New() *Spooler {
|
||||
lockID, err := sonyflake.NewSonyflake(sonyflake.Settings{}).NextID()
|
||||
logging.Log("SPOOL-bdO56").OnError(err).Panic("unable to generate lockID")
|
||||
|
||||
return &Spooler{
|
||||
handlers: c.ViewHandlers,
|
||||
lockID: strconv.FormatUint(lockID, 10),
|
||||
eventstore: c.Eventstore,
|
||||
locker: c.Locker,
|
||||
queue: make(chan *spooledHandler),
|
||||
concurrentTasks: c.ConcurrentTasks,
|
||||
}
|
||||
}
|
3
internal/eventstore/spooler/mock.go
Normal file
3
internal/eventstore/spooler/mock.go
Normal file
@ -0,0 +1,3 @@
|
||||
package spooler
|
||||
|
||||
//go:generate mockgen -source spooler.go -destination ./mock/spooler.go -package mock
|
129
internal/eventstore/spooler/mock/spooler.go
Normal file
129
internal/eventstore/spooler/mock/spooler.go
Normal file
@ -0,0 +1,129 @@
|
||||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: spooler.go
|
||||
|
||||
// Package mock is a generated GoMock package.
|
||||
package mock
|
||||
|
||||
import (
|
||||
models "github.com/caos/zitadel/internal/eventstore/models"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
reflect "reflect"
|
||||
time "time"
|
||||
)
|
||||
|
||||
// MockHandler is a mock of Handler interface
|
||||
type MockHandler struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockHandlerMockRecorder
|
||||
}
|
||||
|
||||
// MockHandlerMockRecorder is the mock recorder for MockHandler
|
||||
type MockHandlerMockRecorder struct {
|
||||
mock *MockHandler
|
||||
}
|
||||
|
||||
// NewMockHandler creates a new mock instance
|
||||
func NewMockHandler(ctrl *gomock.Controller) *MockHandler {
|
||||
mock := &MockHandler{ctrl: ctrl}
|
||||
mock.recorder = &MockHandlerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockHandler) EXPECT() *MockHandlerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// ViewModel mocks base method
|
||||
func (m *MockHandler) ViewModel() string {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "ViewModel")
|
||||
ret0, _ := ret[0].(string)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// ViewModel indicates an expected call of ViewModel
|
||||
func (mr *MockHandlerMockRecorder) ViewModel() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ViewModel", reflect.TypeOf((*MockHandler)(nil).ViewModel))
|
||||
}
|
||||
|
||||
// EventQuery mocks base method
|
||||
func (m *MockHandler) EventQuery() (*models.SearchQuery, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "EventQuery")
|
||||
ret0, _ := ret[0].(*models.SearchQuery)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// EventQuery indicates an expected call of EventQuery
|
||||
func (mr *MockHandlerMockRecorder) EventQuery() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EventQuery", reflect.TypeOf((*MockHandler)(nil).EventQuery))
|
||||
}
|
||||
|
||||
// Process mocks base method
|
||||
func (m *MockHandler) Process(arg0 *models.Event) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Process", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Process indicates an expected call of Process
|
||||
func (mr *MockHandlerMockRecorder) Process(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Process", reflect.TypeOf((*MockHandler)(nil).Process), arg0)
|
||||
}
|
||||
|
||||
// MinimumCycleDuration mocks base method
|
||||
func (m *MockHandler) MinimumCycleDuration() time.Duration {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "MinimumCycleDuration")
|
||||
ret0, _ := ret[0].(time.Duration)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// MinimumCycleDuration indicates an expected call of MinimumCycleDuration
|
||||
func (mr *MockHandlerMockRecorder) MinimumCycleDuration() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MinimumCycleDuration", reflect.TypeOf((*MockHandler)(nil).MinimumCycleDuration))
|
||||
}
|
||||
|
||||
// MockLocker is a mock of Locker interface
|
||||
type MockLocker struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockLockerMockRecorder
|
||||
}
|
||||
|
||||
// MockLockerMockRecorder is the mock recorder for MockLocker
|
||||
type MockLockerMockRecorder struct {
|
||||
mock *MockLocker
|
||||
}
|
||||
|
||||
// NewMockLocker creates a new mock instance
|
||||
func NewMockLocker(ctrl *gomock.Controller) *MockLocker {
|
||||
mock := &MockLocker{ctrl: ctrl}
|
||||
mock.recorder = &MockLockerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use
|
||||
func (m *MockLocker) EXPECT() *MockLockerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Renew mocks base method
|
||||
func (m *MockLocker) Renew(lockerID, viewModel string, waitTime time.Duration) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Renew", lockerID, viewModel, waitTime)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Renew indicates an expected call of Renew
|
||||
func (mr *MockLockerMockRecorder) Renew(lockerID, viewModel, waitTime interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Renew", reflect.TypeOf((*MockLocker)(nil).Renew), lockerID, viewModel, waitTime)
|
||||
}
|
141
internal/eventstore/spooler/spooler.go
Normal file
141
internal/eventstore/spooler/spooler.go
Normal file
@ -0,0 +1,141 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/caos/logging"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/query"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Spooler struct {
|
||||
handlers []Handler
|
||||
locker Locker
|
||||
lockID string
|
||||
eventstore eventstore.Eventstore
|
||||
concurrentTasks int
|
||||
queue chan *spooledHandler
|
||||
}
|
||||
|
||||
type Handler interface {
|
||||
query.Handler
|
||||
MinimumCycleDuration() time.Duration
|
||||
}
|
||||
|
||||
type Locker interface {
|
||||
Renew(lockerID, viewModel string, waitTime time.Duration) error
|
||||
}
|
||||
|
||||
type spooledHandler struct {
|
||||
Handler
|
||||
locker Locker
|
||||
lockID string
|
||||
queuedAt time.Time
|
||||
eventstore eventstore.Eventstore
|
||||
}
|
||||
|
||||
func (s *Spooler) Start() {
|
||||
defer logging.LogWithFields("SPOOL-N0V1g", "lockerID", s.lockID, "workers", s.concurrentTasks).Info("spooler started")
|
||||
if s.concurrentTasks < 1 {
|
||||
return
|
||||
}
|
||||
for i := 0; i < s.concurrentTasks; i++ {
|
||||
go func() {
|
||||
for handler := range s.queue {
|
||||
go func(handler *spooledHandler, queue chan<- *spooledHandler) {
|
||||
time.Sleep(handler.MinimumCycleDuration() - time.Since(handler.queuedAt))
|
||||
handler.queuedAt = time.Now()
|
||||
queue <- handler
|
||||
}(handler, s.queue)
|
||||
|
||||
handler.load()
|
||||
}
|
||||
}()
|
||||
}
|
||||
for _, handler := range s.handlers {
|
||||
handler := &spooledHandler{handler, s.locker, s.lockID, time.Now(), s.eventstore}
|
||||
s.queue <- handler
|
||||
}
|
||||
}
|
||||
|
||||
func (s *spooledHandler) load() {
|
||||
errs := make(chan error)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go s.awaitError(cancel, errs)
|
||||
hasLocked := s.lock(ctx, errs)
|
||||
|
||||
defer close(errs)
|
||||
|
||||
if <-hasLocked {
|
||||
events, err := s.query(ctx)
|
||||
if err != nil {
|
||||
errs <- err
|
||||
} else {
|
||||
errs <- s.process(ctx, events)
|
||||
}
|
||||
}
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (s *spooledHandler) awaitError(cancel func(), errs chan error) {
|
||||
select {
|
||||
case err := <-errs:
|
||||
cancel()
|
||||
logging.Log("SPOOL-K2lst").OnError(err).WithField("view", s.ViewModel()).Debug("load canceled")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *spooledHandler) process(ctx context.Context, events []*models.Event) error {
|
||||
for _, event := range events {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
logging.Log("SPOOL-FTKwH").WithField("view", s.ViewModel()).Debug("context canceled")
|
||||
return nil
|
||||
default:
|
||||
if err := s.Process(event); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *spooledHandler) query(ctx context.Context) ([]*models.Event, error) {
|
||||
query, err := s.EventQuery()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s.eventstore.FilterEvents(ctx, query)
|
||||
}
|
||||
|
||||
func (s *spooledHandler) lock(ctx context.Context, errs chan<- error) chan bool {
|
||||
renewTimer := time.After(0)
|
||||
renewDuration := s.MinimumCycleDuration() - 50*time.Millisecond
|
||||
locked := make(chan bool, 1)
|
||||
|
||||
go func(locked chan bool) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-renewTimer:
|
||||
err := s.locker.Renew(s.lockID, s.ViewModel(), s.MinimumCycleDuration()*2)
|
||||
if err == nil {
|
||||
locked <- true
|
||||
renewTimer = time.After(renewDuration)
|
||||
continue
|
||||
}
|
||||
|
||||
if ctx.Err() == nil {
|
||||
errs <- err
|
||||
}
|
||||
|
||||
locked <- false
|
||||
return
|
||||
}
|
||||
}
|
||||
}(locked)
|
||||
|
||||
return locked
|
||||
}
|
332
internal/eventstore/spooler/spooler_test.go
Normal file
332
internal/eventstore/spooler/spooler_test.go
Normal file
@ -0,0 +1,332 @@
|
||||
package spooler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/models"
|
||||
"github.com/caos/zitadel/internal/eventstore/spooler/mock"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
type testHandler struct {
|
||||
cycleDuration time.Duration
|
||||
processSleep time.Duration
|
||||
processError error
|
||||
queryError error
|
||||
viewModel string
|
||||
}
|
||||
|
||||
func (h *testHandler) ViewModel() string {
|
||||
return h.viewModel
|
||||
}
|
||||
func (h *testHandler) EventQuery() (*models.SearchQuery, error) {
|
||||
return nil, h.queryError
|
||||
}
|
||||
func (h *testHandler) Process(*models.Event) error {
|
||||
<-time.After(h.processSleep)
|
||||
return h.processError
|
||||
}
|
||||
func (h *testHandler) MinimumCycleDuration() time.Duration { return h.cycleDuration }
|
||||
|
||||
type eventstoreStub struct {
|
||||
events []*models.Event
|
||||
err error
|
||||
}
|
||||
|
||||
func (es *eventstoreStub) Health(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es *eventstoreStub) AggregateCreator() *models.AggregateCreator {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (es *eventstoreStub) FilterEvents(ctx context.Context, in *models.SearchQuery) ([]*models.Event, error) {
|
||||
if es.err != nil {
|
||||
return nil, es.err
|
||||
}
|
||||
return es.events, nil
|
||||
}
|
||||
func (es *eventstoreStub) PushAggregates(ctx context.Context, in ...*models.Aggregate) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestSpooler_process(t *testing.T) {
|
||||
type fields struct {
|
||||
currentHandler Handler
|
||||
}
|
||||
type args struct {
|
||||
timeout time.Duration
|
||||
events []*models.Event
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "process all events",
|
||||
fields: fields{
|
||||
currentHandler: &testHandler{},
|
||||
},
|
||||
args: args{
|
||||
timeout: 0,
|
||||
events: []*models.Event{&models.Event{}, &models.Event{}},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "deadline exeeded",
|
||||
fields: fields{
|
||||
currentHandler: &testHandler{processSleep: 501 * time.Millisecond},
|
||||
},
|
||||
args: args{
|
||||
timeout: 1 * time.Second,
|
||||
events: []*models.Event{&models.Event{}, &models.Event{}, &models.Event{}, &models.Event{}},
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "process error",
|
||||
fields: fields{
|
||||
currentHandler: &testHandler{processSleep: 1 * time.Second, processError: fmt.Errorf("i am an error")},
|
||||
},
|
||||
args: args{
|
||||
events: []*models.Event{&models.Event{}, &models.Event{}},
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := &spooledHandler{
|
||||
Handler: tt.fields.currentHandler,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
var start time.Time
|
||||
if tt.args.timeout > 0 {
|
||||
ctx, _ = context.WithTimeout(ctx, tt.args.timeout)
|
||||
start = time.Now()
|
||||
}
|
||||
|
||||
if err := s.process(ctx, tt.args.events); (err != nil) != tt.wantErr {
|
||||
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
|
||||
elapsed := time.Since(start).Round(1 * time.Second)
|
||||
if tt.args.timeout != 0 && elapsed != tt.args.timeout {
|
||||
t.Errorf("wrong timeout wanted %v elapsed %v since %v", tt.args.timeout, elapsed, time.Since(start))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpooler_awaitError(t *testing.T) {
|
||||
type fields struct {
|
||||
currentHandler Handler
|
||||
err error
|
||||
canceled bool
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
}{
|
||||
{
|
||||
"no error",
|
||||
fields{
|
||||
err: nil,
|
||||
currentHandler: &testHandler{processSleep: 500 * time.Millisecond},
|
||||
canceled: false,
|
||||
},
|
||||
},
|
||||
{
|
||||
"with error",
|
||||
fields{
|
||||
err: fmt.Errorf("hodor"),
|
||||
currentHandler: &testHandler{processSleep: 500 * time.Millisecond},
|
||||
canceled: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
s := &spooledHandler{
|
||||
Handler: tt.fields.currentHandler,
|
||||
}
|
||||
errs := make(chan error)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
go s.awaitError(cancel, errs)
|
||||
errs <- tt.fields.err
|
||||
|
||||
if ctx.Err() == nil {
|
||||
t.Error("cancel function was not called")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestSpooler_load checks if load terminates
|
||||
func TestSpooler_load(t *testing.T) {
|
||||
type fields struct {
|
||||
currentHandler Handler
|
||||
locker *testLocker
|
||||
lockID string
|
||||
eventstore eventstore.Eventstore
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
}{
|
||||
{
|
||||
"lock exists",
|
||||
fields{
|
||||
currentHandler: &testHandler{processSleep: 500 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
|
||||
lockID: "testID",
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("lock already exists"), 2000*time.Millisecond),
|
||||
},
|
||||
},
|
||||
{
|
||||
"lock fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 1 * time.Second},
|
||||
lockID: "testID",
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("fail"), 2000*time.Millisecond),
|
||||
eventstore: &eventstoreStub{events: []*models.Event{&models.Event{}}},
|
||||
},
|
||||
},
|
||||
{
|
||||
"query fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{processSleep: 100 * time.Millisecond, viewModel: "testView", queryError: fmt.Errorf("query fail"), cycleDuration: 1 * time.Second},
|
||||
lockID: "testID",
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
|
||||
eventstore: &eventstoreStub{err: fmt.Errorf("fail")},
|
||||
},
|
||||
},
|
||||
{
|
||||
"process event fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{processError: fmt.Errorf("oups"), processSleep: 100 * time.Millisecond, viewModel: "testView", cycleDuration: 500 * time.Millisecond},
|
||||
lockID: "testID",
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 1000*time.Millisecond),
|
||||
eventstore: &eventstoreStub{events: []*models.Event{&models.Event{}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer tt.fields.locker.finish()
|
||||
s := &spooledHandler{
|
||||
Handler: tt.fields.currentHandler,
|
||||
locker: tt.fields.locker.mock,
|
||||
lockID: tt.fields.lockID,
|
||||
eventstore: tt.fields.eventstore,
|
||||
}
|
||||
s.load()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSpooler_lock(t *testing.T) {
|
||||
type fields struct {
|
||||
currentHandler Handler
|
||||
locker *testLocker
|
||||
lockID string
|
||||
expectsErr bool
|
||||
}
|
||||
type args struct {
|
||||
deadline time.Time
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
fields fields
|
||||
args args
|
||||
}{
|
||||
{
|
||||
"renew correct",
|
||||
fields{
|
||||
currentHandler: &testHandler{cycleDuration: 1 * time.Second, viewModel: "testView"},
|
||||
lockID: "testID",
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, nil, 2000*time.Millisecond),
|
||||
expectsErr: false,
|
||||
},
|
||||
args{
|
||||
deadline: time.Now().Add(1 * time.Second),
|
||||
},
|
||||
},
|
||||
{
|
||||
"renew fails",
|
||||
fields{
|
||||
currentHandler: &testHandler{cycleDuration: 900 * time.Millisecond, viewModel: "testView"},
|
||||
lockID: "testID",
|
||||
locker: newTestLocker(t, "testID", "testView").expectRenew(t, fmt.Errorf("renew failed"), 1800*time.Millisecond),
|
||||
expectsErr: true,
|
||||
},
|
||||
args{
|
||||
deadline: time.Now().Add(5 * time.Second),
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer tt.fields.locker.finish()
|
||||
s := &spooledHandler{
|
||||
Handler: tt.fields.currentHandler,
|
||||
locker: tt.fields.locker.mock,
|
||||
lockID: tt.fields.lockID,
|
||||
}
|
||||
|
||||
errs := make(chan error, 1)
|
||||
ctx, _ := context.WithDeadline(context.Background(), tt.args.deadline)
|
||||
|
||||
locked := s.lock(ctx, errs)
|
||||
|
||||
if tt.fields.expectsErr {
|
||||
err := <-errs
|
||||
if err == nil {
|
||||
t.Error("No error in error queue")
|
||||
}
|
||||
} else {
|
||||
lock := <-locked
|
||||
if !lock {
|
||||
t.Error("lock should be true")
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type testLocker struct {
|
||||
mock *mock.MockLocker
|
||||
lockerID string
|
||||
viewName string
|
||||
ctrl *gomock.Controller
|
||||
}
|
||||
|
||||
func newTestLocker(t *testing.T, lockerID, viewName string) *testLocker {
|
||||
ctrl := gomock.NewController(t)
|
||||
return &testLocker{mock.NewMockLocker(ctrl), lockerID, viewName, ctrl}
|
||||
}
|
||||
|
||||
func (l *testLocker) expectRenew(t *testing.T, err error, waitTime time.Duration) *testLocker {
|
||||
l.mock.EXPECT().Renew(l.lockerID, l.viewName, gomock.Any()).DoAndReturn(
|
||||
func(_, _ string, gotten time.Duration) error {
|
||||
if waitTime-gotten != 0 {
|
||||
t.Errorf("expected waittime %v got %v", waitTime, gotten)
|
||||
}
|
||||
return err
|
||||
}).Times(1)
|
||||
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *testLocker) finish() {
|
||||
l.ctrl.Finish()
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user