fix(notify): notify user in projection (#3889)

* start implement notify user in projection

* fix(stmt): add copy to multi stmt

* use projections for notify users

* feat: notifications from projections

* feat: notifications from projections

* cleanup

* pre-release

* fix tests

* fix types

* fix command

* fix queryNotifyUser

* fix: build version

* fix: HumanPasswordlessInitCodeSent

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
This commit is contained in:
Livio Spring
2022-07-06 14:09:49 +02:00
committed by GitHub
parent d15a15c809
commit a1d404291d
46 changed files with 2018 additions and 1839 deletions

View File

@@ -189,6 +189,12 @@ func AddDeleteStatement(conditions []handler.Condition, opts ...execOption) func
}
}
func AddCopyStatement(from, to []handler.Column, conditions []handler.Condition, opts ...execOption) func(eventstore.Event) Exec {
return func(event eventstore.Event) Exec {
return NewCopyStatement(event, from, to, conditions, opts...).Execute
}
}
func NewArrayAppendCol(column string, value interface{}) handler.Column {
return handler.Column{
Name: column,
@@ -233,19 +239,19 @@ func NewArrayIntersectCol(column string, value interface{}) handler.Column {
// if the value of a col is empty the data will be copied from the selected row
// if the value of a col is not empty the data will be set by the static value
// conds represent the conditions for the selection subquery
func NewCopyStatement(event eventstore.Event, cols []handler.Column, conds []handler.Condition, opts ...execOption) *handler.Statement {
columnNames := make([]string, len(cols))
selectColumns := make([]string, len(cols))
func NewCopyStatement(event eventstore.Event, from, to []handler.Column, conds []handler.Condition, opts ...execOption) *handler.Statement {
columnNames := make([]string, len(to))
selectColumns := make([]string, len(from))
argCounter := 0
args := []interface{}{}
for i, col := range cols {
columnNames[i] = col.Name
selectColumns[i] = col.Name
if col.Value != nil {
for i := range from {
columnNames[i] = to[i].Name
selectColumns[i] = from[i].Name
if from[i].Value != nil {
argCounter++
selectColumns[i] = "$" + strconv.Itoa(argCounter)
args = append(args, col.Value)
args = append(args, from[i].Value)
}
}
@@ -260,7 +266,7 @@ func NewCopyStatement(event eventstore.Event, cols []handler.Column, conds []han
args: args,
}
if len(cols) == 0 {
if len(from) == 0 || len(to) == 0 || len(from) != len(to) {
config.err = handler.ErrNoValues
}

View File

@@ -801,7 +801,8 @@ func TestNewCopyStatement(t *testing.T) {
type args struct {
table string
event *testEvent
cols []handler.Column
from []handler.Column
to []handler.Column
conds []handler.Condition
}
type want struct {
@@ -856,7 +857,12 @@ func TestNewCopyStatement(t *testing.T) {
previousSequence: 0,
},
conds: []handler.Condition{},
cols: []handler.Column{
from: []handler.Column{
{
Name: "col",
},
},
to: []handler.Column{
{
Name: "col",
},
@@ -876,7 +882,44 @@ func TestNewCopyStatement(t *testing.T) {
},
},
{
name: "no values",
name: "more to than from cols",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
conds: []handler.Condition{},
from: []handler.Column{
{
Name: "col",
},
},
to: []handler.Column{
{
Name: "col",
},
{
Name: "col2",
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
shouldExecute: false,
},
isErr: func(err error) bool {
return errors.Is(err, handler.ErrNoCondition)
},
},
},
{
name: "no columns",
args: args{
table: "my_table",
event: &testEvent{
@@ -889,7 +932,7 @@ func TestNewCopyStatement(t *testing.T) {
Name: "col",
},
},
cols: []handler.Column{},
from: []handler.Column{},
},
want: want{
table: "my_table",
@@ -905,7 +948,7 @@ func TestNewCopyStatement(t *testing.T) {
},
},
{
name: "correct",
name: "correct same column names",
args: args{
table: "my_table",
event: &testEvent{
@@ -913,7 +956,7 @@ func TestNewCopyStatement(t *testing.T) {
sequence: 1,
previousSequence: 0,
},
cols: []handler.Column{
from: []handler.Column{
{
Name: "state",
Value: 1,
@@ -928,6 +971,20 @@ func TestNewCopyStatement(t *testing.T) {
Name: "col_b",
},
},
to: []handler.Column{
{
Name: "state",
},
{
Name: "id",
},
{
Name: "col_a",
},
{
Name: "col_b",
},
},
conds: []handler.Condition{
{
Name: "id",
@@ -958,11 +1015,78 @@ func TestNewCopyStatement(t *testing.T) {
},
},
},
{
name: "correct different column names",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
from: []handler.Column{
{
Value: 1,
},
{
Name: "id",
},
{
Name: "col_a",
},
{
Name: "col_b",
},
},
to: []handler.Column{
{
Name: "state",
},
{
Name: "id",
},
{
Name: "col_c",
},
{
Name: "col_d",
},
},
conds: []handler.Condition{
{
Name: "id",
Value: 2,
},
{
Name: "state",
Value: 3,
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
params: []params{
{
query: "UPSERT INTO my_table (state, id, col_c, col_d) SELECT $1, id, col_a, col_b FROM my_table AS copy_table WHERE copy_table.id = $2 AND copy_table.state = $3",
args: []interface{}{1, 2, 3},
},
},
shouldExecute: true,
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.want.executer.t = t
stmt := NewCopyStatement(tt.args.event, tt.args.cols, tt.args.conds)
stmt := NewCopyStatement(tt.args.event, tt.args.from, tt.args.to, tt.args.conds)
err := stmt.Execute(tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {