mirror of
				https://github.com/zitadel/zitadel.git
				synced 2025-10-25 02:19:22 +00:00 
			
		
		
		
	 4645045987
			
		
	
	4645045987
	
	
	
		
			
			# Which Problems Are Solved Zitadel currently uses 3 database pool, 1 for queries, 1 for pushing events and 1 for scheduled projection updates. This defeats the purpose of a connection pool which already handles multiple connections. During load tests we found that the current structure of connection pools consumes a lot of database resources. The resource usage dropped after we reduced the amount of database pools to 1 because existing connections can be used more efficiently. # How the Problems Are Solved Removed logic to handle multiple connection pools and use a single one. # Additional Changes none # Additional Context part of https://github.com/zitadel/zitadel/issues/8352
		
			
				
	
	
		
			109 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			109 lines
		
	
	
		
			2.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package eventstore
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"database/sql"
 | |
| 	_ "embed"
 | |
| 	"fmt"
 | |
| 
 | |
| 	"github.com/zitadel/logging"
 | |
| 
 | |
| 	"github.com/zitadel/zitadel/internal/api/authz"
 | |
| 	"github.com/zitadel/zitadel/internal/database"
 | |
| 	"github.com/zitadel/zitadel/internal/eventstore"
 | |
| 	"github.com/zitadel/zitadel/internal/telemetry/tracing"
 | |
| )
 | |
| 
 | |
| var pushTxOpts = &sql.TxOptions{
 | |
| 	Isolation: sql.LevelReadCommitted,
 | |
| 	ReadOnly:  false,
 | |
| }
 | |
| 
 | |
| func (es *Eventstore) Push(ctx context.Context, client database.ContextQueryExecuter, commands ...eventstore.Command) (events []eventstore.Event, err error) {
 | |
| 	ctx, span := tracing.NewSpan(ctx)
 | |
| 	defer func() { span.EndWithError(err) }()
 | |
| 
 | |
| 	events, err = es.writeCommands(ctx, client, commands)
 | |
| 	if isSetupNotExecutedError(err) {
 | |
| 		return es.pushWithoutFunc(ctx, client, commands...)
 | |
| 	}
 | |
| 
 | |
| 	return events, err
 | |
| }
 | |
| 
 | |
| func (es *Eventstore) writeCommands(ctx context.Context, client database.ContextQueryExecuter, commands []eventstore.Command) (_ []eventstore.Event, err error) {
 | |
| 	var conn *sql.Conn
 | |
| 	switch c := client.(type) {
 | |
| 	case database.Client:
 | |
| 		conn, err = c.Conn(ctx)
 | |
| 	case nil:
 | |
| 		conn, err = es.client.Conn(ctx)
 | |
| 		client = conn
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if conn != nil {
 | |
| 		defer conn.Close()
 | |
| 	}
 | |
| 
 | |
| 	tx, close, err := es.pushTx(ctx, client)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	if close != nil {
 | |
| 		defer func() {
 | |
| 			err = close(err)
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	_, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL application_name = '%s'", fmt.Sprintf("zitadel_es_pusher_%s", authz.GetInstance(ctx).InstanceID())))
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	events, err := writeEvents(ctx, tx, commands)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if err = handleUniqueConstraints(ctx, tx, commands); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	err = es.handleFieldCommands(ctx, tx, commands)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return events, nil
 | |
| }
 | |
| 
 | |
| func writeEvents(ctx context.Context, tx database.Tx, commands []eventstore.Command) (_ []eventstore.Event, err error) {
 | |
| 	ctx, span := tracing.NewSpan(ctx)
 | |
| 	defer func() { span.EndWithError(err) }()
 | |
| 
 | |
| 	events, cmds, err := commandsToEvents(ctx, commands)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	rows, err := tx.QueryContext(ctx, `select owner, created_at, "sequence", position from eventstore.push($1::eventstore.command[])`, cmds)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer rows.Close()
 | |
| 
 | |
| 	for i := 0; rows.Next(); i++ {
 | |
| 		err = rows.Scan(&events[i].(*event).command.Owner, &events[i].(*event).createdAt, &events[i].(*event).sequence, &events[i].(*event).position)
 | |
| 		if err != nil {
 | |
| 			logging.WithError(err).Warn("failed to scan events")
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 	if err = rows.Err(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return events, nil
 | |
| }
 |