Tim Möhlmann fe1337536f
fix(db): add additional connection pool for projection spooling (#7094)
* fix(db): add additional connection pool for projection spooling

* use correct connection pool for projections

---------

Co-authored-by: Livio Spring <livio.a@gmail.com>
2023-12-20 16:13:04 +00:00

93 lines
1.7 KiB
Go

package dialect
import (
"database/sql"
"sync"
"time"
)
type Dialect struct {
Matcher Matcher
Config Connector
IsDefault bool
}
var (
dialects []*Dialect
defaultDialect *Dialect
dialectsMu sync.Mutex
)
type Matcher interface {
MatchName(string) bool
Decode([]interface{}) (Connector, error)
}
const (
QueryAppName = "zitadel_queries"
EventstorePusherAppName = "zitadel_es_pusher"
ProjectionSpoolerAppName = "zitadel_projection_spooler"
defaultAppName = "zitadel"
)
// DBPurpose is what the resulting connection pool is used for.
type DBPurpose int
const (
DBPurposeQuery DBPurpose = iota
DBPurposeEventPusher
DBPurposeProjectionSpooler
)
func (p DBPurpose) AppName() string {
switch p {
case DBPurposeQuery:
return QueryAppName
case DBPurposeEventPusher:
return EventstorePusherAppName
case DBPurposeProjectionSpooler:
return ProjectionSpoolerAppName
default:
return defaultAppName
}
}
type Connector interface {
Connect(useAdmin bool, pusherRatio, spoolerRatio float64, purpose DBPurpose) (*sql.DB, error)
Password() string
Database
}
type Database interface {
DatabaseName() string
Username() string
Type() string
Timetravel(time.Duration) string
}
func Register(matcher Matcher, config Connector, isDefault bool) {
dialectsMu.Lock()
defer dialectsMu.Unlock()
d := &Dialect{Matcher: matcher, Config: config}
if isDefault {
defaultDialect = d
return
}
dialects = append(dialects, d)
}
func SelectByConfig(config map[string]interface{}) *Dialect {
for key := range config {
for _, d := range dialects {
if d.Matcher.MatchName(key) {
return d
}
}
}
return defaultDialect
}