Merge branch 'main' into clean-transactional-propsal

This commit is contained in:
adlerhurst
2025-07-30 07:42:11 +02:00
117 changed files with 5490 additions and 1075 deletions

View File

@@ -6,7 +6,7 @@ import "context"
type Init func(context.Context, *Check) error
type Check struct {
Executes []func(ex Executer, projectionName string) (bool, error)
Executes []func(ctx context.Context, executer Executer, projectionName string) (bool, error)
}
func (c *Check) IsNoop() bool {

View File

@@ -646,7 +646,7 @@ func (h *Handler) executeStatements(ctx context.Context, tx *sql.Tx, statements
for i, statement := range statements {
select {
case <-ctx.Done():
break
return lastProcessedIndex, ctx.Err()
default:
err := h.executeStatement(ctx, tx, statement)
if err != nil {
@@ -669,7 +669,7 @@ func (h *Handler) executeStatement(ctx context.Context, tx *sql.Tx, statement *S
return err
}
if err = statement.Execute(tx, h.projection.Name()); err != nil {
if err = statement.Execute(ctx, tx, h.projection.Name()); err != nil {
h.log().WithError(err).Error("statement execution failed")
_, rollbackErr := tx.ExecContext(ctx, "ROLLBACK TO SAVEPOINT exec_stmt")

View File

@@ -200,7 +200,7 @@ func (h *Handler) Init(ctx context.Context) error {
}
for i, execute := range check.Init().Executes {
logging.WithFields("projection", h.projection.Name(), "execute", i).Debug("executing check")
next, err := execute(tx, h.projection.Name())
next, err := execute(ctx, tx, h.projection.Name())
if err != nil {
logging.OnError(tx.Rollback()).Debug("unable to rollback")
return err
@@ -218,7 +218,7 @@ func NewTableCheck(table *Table, opts ...execOption) *handler.Check {
create := func(config execConfig) string {
return createTableStatement(table, config.tableName, "")
}
executes := make([]func(handler.Executer, string) (bool, error), len(table.indices)+1)
executes := make([]func(context.Context, handler.Executer, string) (bool, error), len(table.indices)+1)
executes[0] = execNextIfExists(config, create, opts, true)
for i, index := range table.indices {
executes[i+1] = execNextIfExists(config, createIndexCheck(index), opts, true)
@@ -239,7 +239,7 @@ func NewMultiTableCheck(primaryTable *Table, secondaryTables ...*SuffixedTable)
}
return &handler.Check{
Executes: []func(handler.Executer, string) (bool, error){
Executes: []func(context.Context, handler.Executer, string) (bool, error){
execNextIfExists(config, create, nil, true),
},
}
@@ -257,14 +257,14 @@ func NewViewCheck(selectStmt string, secondaryTables ...*SuffixedTable) *handler
}
return &handler.Check{
Executes: []func(handler.Executer, string) (bool, error){
Executes: []func(context.Context, handler.Executer, string) (bool, error){
execNextIfExists(config, create, nil, false),
},
}
}
func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(handler.Executer, string) (bool, error) {
return func(handler handler.Executer, name string) (shouldExecuteNext bool, err error) {
func execNextIfExists(config execConfig, q query, opts []execOption, executeNext bool) func(ctx context.Context, handler handler.Executer, name string) (bool, error) {
return func(ctx context.Context, handler handler.Executer, name string) (shouldExecuteNext bool, err error) {
_, err = handler.Exec("SAVEPOINT exec_stmt")
if err != nil {
return false, zerrors.ThrowInternal(err, "V2-U1wlz", "create savepoint failed")
@@ -280,7 +280,7 @@ func execNextIfExists(config execConfig, q query, opts []execOption, executeNext
return
}
}()
err = exec(config, q, opts)(handler, name)
err = exec(config, q, opts)(ctx, handler, name)
return false, err
}
}

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"database/sql"
"encoding/json"
"errors"
@@ -91,7 +92,7 @@ type Statement struct {
Execute Exec
}
type Exec func(ex Executer, projectionName string) error
type Exec func(ctx context.Context, ex Executer, projectionName string) error
func WithTableSuffix(name string) func(*execConfig) {
return func(o *execConfig) {
@@ -670,7 +671,7 @@ type execConfig struct {
type query func(config execConfig) string
func exec(config execConfig, q query, opts []execOption) Exec {
return func(ex Executer, projectionName string) (err error) {
return func(ctx context.Context, ex Executer, projectionName string) (err error) {
if projectionName == "" {
return ErrNoProjection
}
@@ -694,12 +695,12 @@ func exec(config execConfig, q query, opts []execOption) Exec {
}
func multiExec(execList []Exec) Exec {
return func(ex Executer, projectionName string) error {
return func(ctx context.Context, ex Executer, projectionName string) error {
for _, exec := range execList {
if exec == nil {
continue
}
if err := exec(ex, projectionName); err != nil {
if err := exec(ctx, ex, projectionName); err != nil {
return err
}
}

View File

@@ -1,6 +1,7 @@
package handler
import (
"context"
"database/sql"
"errors"
"reflect"
@@ -197,7 +198,7 @@ func TestNewCreateStatement(t *testing.T) {
tt.want.executer.t = t
stmt := NewCreateStatement(tt.args.event, tt.args.values)
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -506,7 +507,7 @@ func TestNewUpsertStatement(t *testing.T) {
tt.want.executer.t = t
stmt := NewUpsertStatement(tt.args.event, tt.args.conflictCols, tt.args.values)
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -710,7 +711,7 @@ func TestNewUpdateStatement(t *testing.T) {
tt.want.executer.t = t
stmt := NewUpdateStatement(tt.args.event, tt.args.values, tt.args.conditions)
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -827,7 +828,7 @@ func TestNewDeleteStatement(t *testing.T) {
tt.want.executer.t = t
stmt := NewDeleteStatement(tt.args.event, tt.args.conditions)
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -878,7 +879,7 @@ func TestNewNoOpStatement(t *testing.T) {
return
}
tt.want.executer.t = t
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -1054,7 +1055,7 @@ func TestNewMultiStatement(t *testing.T) {
return
}
tt.want.executer.t = t
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -1338,7 +1339,7 @@ func TestNewCopyStatement(t *testing.T) {
tt.want.executer.t = t
stmt := NewCopyStatement(tt.args.event, tt.args.conflictingCols, tt.args.from, tt.args.to, tt.args.conds)
err := stmt.Execute(tt.want.executer, tt.args.table)
err := stmt.Execute(t.Context(), tt.want.executer, tt.args.table)
if !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
@@ -1349,7 +1350,7 @@ func TestNewCopyStatement(t *testing.T) {
func TestStatement_Execute(t *testing.T) {
type fields struct {
execute func(ex Executer, projectionName string) error
execute func(ctx context.Context, ex Executer, projectionName string) error
}
type want struct {
isErr func(error) bool
@@ -1366,7 +1367,7 @@ func TestStatement_Execute(t *testing.T) {
{
name: "execute returns no error",
fields: fields{
execute: func(ex Executer, projectionName string) error { return nil },
execute: func(ctx context.Context, ex Executer, projectionName string) error { return nil },
},
args: args{
projectionName: "my_projection",
@@ -1383,7 +1384,7 @@ func TestStatement_Execute(t *testing.T) {
projectionName: "my_projection",
},
fields: fields{
execute: func(ex Executer, projectionName string) error { return errTest },
execute: func(ctx context.Context, ex Executer, projectionName string) error { return errTest },
},
want: want{
isErr: func(err error) bool {
@@ -1397,7 +1398,7 @@ func TestStatement_Execute(t *testing.T) {
stmt := &Statement{
Execute: tt.fields.execute,
}
if err := stmt.Execute(nil, tt.args.projectionName); !tt.want.isErr(err) {
if err := stmt.Execute(t.Context(), nil, tt.args.projectionName); !tt.want.isErr(err) {
t.Errorf("unexpected error: %v", err)
}
})