fix: initial failures (#4291)

* fix(cmd): read configuration correctly

* fix(database): read weakly typed config

* fix(database): correct handling of update columns

Co-authored-by: Livio Spring <livio.a@gmail.com>
This commit is contained in:
Silvan
2022-09-01 09:24:26 +02:00
committed by GitHub
parent 32b751a3a0
commit 2f647ce9a2
13 changed files with 164 additions and 48 deletions

View File

@@ -112,7 +112,7 @@ func (h *StatementHandler) SearchQuery(ctx context.Context, instanceIDs []string
return queryBuilder, h.bulkLimit, nil
}
//Update implements handler.Update
// Update implements handler.Update
func (h *StatementHandler) Update(ctx context.Context, stmts []*handler.Statement, reduce handler.Reduce) (index int, err error) {
if len(stmts) == 0 {
return -1, nil
@@ -248,8 +248,8 @@ stmts:
return lastSuccessfulIdx
}
//executeStmt handles sql statements
//an error is returned if the statement could not be inserted properly
// executeStmt handles sql statements
// an error is returned if the statement could not be inserted properly
func (h *StatementHandler) executeStmt(tx *sql.Tx, stmt *handler.Statement) error {
if stmt.IsNoop() {
return nil

View File

@@ -72,8 +72,17 @@ func NewUpsertStatement(event eventstore.Event, conflictCols []handler.Column, v
}
q := func(config execConfig) string {
var updateStmt string
// the postgres standard does not allow to update a single column using a multi-column update
// discussion: https://www.postgresql.org/message-id/17451.1509381766%40sss.pgh.pa.us
// see Compatibility in https://www.postgresql.org/docs/current/sql-update.html
if len(updateCols) == 1 && !strings.HasPrefix(updateVals[0], "SELECT") {
updateStmt = "UPDATE SET " + updateCols[0] + " = " + updateVals[0]
} else {
updateStmt = "UPDATE SET (" + strings.Join(updateCols, ", ") + ") = (" + strings.Join(updateVals, ", ") + ")"
}
return "INSERT INTO " + config.tableName + " (" + strings.Join(cols, ", ") + ") VALUES (" + strings.Join(params, ", ") + ")" +
" ON CONFLICT (" + strings.Join(conflictTarget, ", ") + ") DO UPDATE SET (" + strings.Join(updateCols, ", ") + ") = (" + strings.Join(updateVals, ", ") + ")"
" ON CONFLICT (" + strings.Join(conflictTarget, ", ") + ") DO " + updateStmt
}
return &handler.Statement{
@@ -130,6 +139,12 @@ func NewUpdateStatement(event eventstore.Event, values []handler.Column, conditi
}
q := func(config execConfig) string {
// the postgres standard does not allow to update a single column using a multi-column update
// discussion: https://www.postgresql.org/message-id/17451.1509381766%40sss.pgh.pa.us
// see Compatibility in https://www.postgresql.org/docs/current/sql-update.html
if len(cols) == 1 && !strings.HasPrefix(params[0], "SELECT") {
return "UPDATE " + config.tableName + " SET " + cols[0] + " = " + params[0] + " WHERE " + strings.Join(wheres, " AND ")
}
return "UPDATE " + config.tableName + " SET (" + strings.Join(cols, ", ") + ") = (" + strings.Join(params, ", ") + ") WHERE " + strings.Join(wheres, " AND ")
}

View File

@@ -282,7 +282,53 @@ func TestNewUpsertStatement(t *testing.T) {
},
},
{
name: "correct",
name: "correct UPDATE multi col",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
conflictCols: []handler.Column{
handler.NewCol("col1", nil),
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
{
Name: "col2",
Value: "val",
},
{
Name: "col3",
Value: "val",
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
params: []params{
{
query: "INSERT INTO my_table (col1, col2, col3) VALUES ($1, $2, $3) ON CONFLICT (col1) DO UPDATE SET (col2, col3) = (EXCLUDED.col2, EXCLUDED.col3)",
args: []interface{}{"val", "val", "val"},
},
},
shouldExecute: true,
},
isErr: func(err error) bool {
return err == nil
},
},
},
{
name: "correct UPDATE single col",
args: args{
table: "my_table",
event: &testEvent{
@@ -312,7 +358,7 @@ func TestNewUpsertStatement(t *testing.T) {
executer: &wantExecuter{
params: []params{
{
query: "INSERT INTO my_table (col1, col2) VALUES ($1, $2) ON CONFLICT (col1) DO UPDATE SET (col2) = (EXCLUDED.col2)",
query: "INSERT INTO my_table (col1, col2) VALUES ($1, $2) ON CONFLICT (col1) DO UPDATE SET col2 = EXCLUDED.col2",
args: []interface{}{"val", "val"},
},
},
@@ -454,7 +500,7 @@ func TestNewUpdateStatement(t *testing.T) {
},
},
{
name: "correct",
name: "correct single column",
args: args{
table: "my_table",
event: &testEvent{
@@ -483,7 +529,7 @@ func TestNewUpdateStatement(t *testing.T) {
executer: &wantExecuter{
params: []params{
{
query: "UPDATE my_table SET (col1) = ($1) WHERE (col2 = $2)",
query: "UPDATE my_table SET col1 = $1 WHERE (col2 = $2)",
args: []interface{}{"val", 1},
},
},
@@ -494,6 +540,51 @@ func TestNewUpdateStatement(t *testing.T) {
},
},
},
{
name: "correct multi column",
args: args{
table: "my_table",
event: &testEvent{
aggregateType: "agg",
sequence: 1,
previousSequence: 0,
},
values: []handler.Column{
{
Name: "col1",
Value: "val",
},
{
Name: "col3",
Value: "val5",
},
},
conditions: []handler.Condition{
{
Name: "col2",
Value: 1,
},
},
},
want: want{
table: "my_table",
aggregateType: "agg",
sequence: 1,
previousSequence: 1,
executer: &wantExecuter{
params: []params{
{
query: "UPDATE my_table SET (col1, col3) = ($1, $2) WHERE (col2 = $3)",
args: []interface{}{"val", "val5", 1},
},
},
shouldExecute: true,
},
isErr: func(err error) bool {
return err == nil
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -808,11 +899,11 @@ func TestNewMultiStatement(t *testing.T) {
args: []interface{}{1},
},
{
query: "INSERT INTO my_table (col1, col2) VALUES ($1, $2) ON CONFLICT (col1) DO UPDATE SET (col2) = (EXCLUDED.col2)",
query: "INSERT INTO my_table (col1, col2) VALUES ($1, $2) ON CONFLICT (col1) DO UPDATE SET col2 = EXCLUDED.col2",
args: []interface{}{1, 2},
},
{
query: "UPDATE my_table SET (col1) = ($1) WHERE (col1 = $2)",
query: "UPDATE my_table SET col1 = $1 WHERE (col1 = $2)",
args: []interface{}{1, 1},
},
},