package sql

import (
	"context"

	"github.com/caos/logging"
	repo "github.com/caos/zitadel/internal/eventstore/repository"
)

func (db *CRDB) Step20(ctx context.Context, latestSequence uint64) error {
	currentSequence := uint64(0)
	limit := uint64(500)
	previousSequences := make(map[repo.AggregateType]Sequence)
	for currentSequence < latestSequence {
		events, err := db.Filter(ctx, &repo.SearchQuery{
			Columns: repo.ColumnsEvent,
			Limit:   limit,
			Filters: [][]*repo.Filter{
				{
					&repo.Filter{
						Field:     repo.FieldSequence,
						Operation: repo.OperationGreater,
						Value:     currentSequence,
					},
				},
			},
		})
		if err != nil {
			return err
		}

		tx, err := db.client.Begin()
		if err != nil {
			return err
		}

		for _, event := range events {
			if _, err := tx.Exec("SAVEPOINT event_update"); err != nil {
				return err
			}
			seq := Sequence(previousSequences[event.AggregateType])
			if _, err = tx.Exec("UPDATE eventstore.events SET previous_aggregate_type_sequence = $1 WHERE event_sequence = $2", &seq, event.Sequence); err != nil {
				return err
			}
			if _, err = tx.Exec("RELEASE SAVEPOINT event_update"); err != nil {
				return err
			}
			previousSequences[event.AggregateType] = Sequence(event.Sequence)
			currentSequence = event.Sequence
		}

		if err = tx.Commit(); err != nil {
			return err
		}
		logging.LogWithFields("SQL-bXVwS", "currentSeq", currentSequence, "events", len(events)).Info("events updated")
	}
	return nil
}