mirror of
https://github.com/zitadel/zitadel.git
synced 2025-12-06 19:36:41 +00:00
fix(cache): use key versioning (#10657)
# Which Problems Are Solved
Cached object may have a different schema between Zitadel versions.
# How the Problems Are Solved
Use the curent build version in DB based cache connectors PostgreSQL and
Redis.
# Additional Changes
- Cleanup the ZitadelVersion field from the authz Instance
- Solve potential race condition on global variables in build package.
# Additional Context
- Closes https://github.com/zitadel/zitadel/issues/10648
- Obsoletes https://github.com/zitadel/zitadel/pull/10646
- Needs to be back-ported to v4 over
https://github.com/zitadel/zitadel/pull/10645
(cherry picked from commit f6f37d3a31)
This commit is contained in:
committed by
Livio Spring
parent
f9b3c1ef50
commit
6e90d4a927
66
internal/cache/connector/pg/pg.go
vendored
66
internal/cache/connector/pg/pg.go
vendored
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
_ "embed"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"slices"
|
||||
"strings"
|
||||
@@ -40,21 +41,23 @@ type PGXPool interface {
|
||||
}
|
||||
|
||||
type pgCache[I ~int, K ~string, V cache.Entry[I, K]] struct {
|
||||
purpose cache.Purpose
|
||||
config *cache.Config
|
||||
indices []I
|
||||
connector *Connector
|
||||
logger *slog.Logger
|
||||
purpose cache.Purpose
|
||||
zitadelVersion string
|
||||
config *cache.Config
|
||||
indices []I
|
||||
connector *Connector
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewCache returns a cache that stores and retrieves objects using PostgreSQL unlogged tables.
|
||||
func NewCache[I ~int, K ~string, V cache.Entry[I, K]](ctx context.Context, purpose cache.Purpose, config cache.Config, indices []I, connector *Connector) (cache.PrunerCache[I, K, V], error) {
|
||||
func NewCache[I ~int, K ~string, V cache.Entry[I, K]](ctx context.Context, purpose cache.Purpose, zitadelVersion string, config cache.Config, indices []I, connector *Connector) (cache.PrunerCache[I, K, V], error) {
|
||||
c := &pgCache[I, K, V]{
|
||||
purpose: purpose,
|
||||
config: &config,
|
||||
indices: indices,
|
||||
connector: connector,
|
||||
logger: config.Log.Slog().With("cache_purpose", purpose),
|
||||
purpose: purpose,
|
||||
zitadelVersion: zitadelVersion,
|
||||
config: &config,
|
||||
indices: indices,
|
||||
connector: connector,
|
||||
logger: config.Log.Slog().With("cache_purpose", purpose),
|
||||
}
|
||||
c.logger.InfoContext(ctx, "pg cache logging enabled")
|
||||
|
||||
@@ -115,7 +118,14 @@ func (c *pgCache[I, K, V]) get(ctx context.Context, index I, key K) (value V, er
|
||||
if !slices.Contains(c.indices, index) {
|
||||
return value, cache.NewIndexUnknownErr(index)
|
||||
}
|
||||
err = c.connector.QueryRow(ctx, getQuery, c.purpose.String(), index, key, c.config.MaxAge, c.config.LastUseAge).Scan(&value)
|
||||
err = c.connector.QueryRow(ctx,
|
||||
getQuery,
|
||||
c.purpose.String(),
|
||||
index,
|
||||
c.versionedKey(key),
|
||||
c.config.MaxAge,
|
||||
c.config.LastUseAge,
|
||||
).Scan(&value)
|
||||
return value, err
|
||||
}
|
||||
|
||||
@@ -123,7 +133,8 @@ func (c *pgCache[I, K, V]) Invalidate(ctx context.Context, index I, keys ...K) (
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
|
||||
_, err = c.connector.Exec(ctx, invalidateQuery, c.purpose.String(), index, keys)
|
||||
versionedKeys := c.versionedKeys(keys)
|
||||
_, err = c.connector.Exec(ctx, invalidateQuery, c.purpose.String(), index, versionedKeys)
|
||||
c.logger.DebugContext(ctx, "pg cache invalidate", "index", index, "keys", keys)
|
||||
return err
|
||||
}
|
||||
@@ -132,7 +143,8 @@ func (c *pgCache[I, K, V]) Delete(ctx context.Context, index I, keys ...K) (err
|
||||
ctx, span := tracing.NewSpan(ctx)
|
||||
defer func() { span.EndWithError(err) }()
|
||||
|
||||
_, err = c.connector.Exec(ctx, deleteQuery, c.purpose.String(), index, keys)
|
||||
versionedKeys := c.versionedKeys(keys)
|
||||
_, err = c.connector.Exec(ctx, deleteQuery, c.purpose.String(), index, versionedKeys)
|
||||
c.logger.DebugContext(ctx, "pg cache delete", "index", index, "keys", keys)
|
||||
return err
|
||||
}
|
||||
@@ -155,20 +167,32 @@ func (c *pgCache[I, K, V]) Truncate(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
type indexKey[I, K comparable] struct {
|
||||
IndexID I `json:"index_id"`
|
||||
IndexKey K `json:"index_key"`
|
||||
type indexKey[I comparable] struct {
|
||||
IndexID I `json:"index_id"`
|
||||
IndexKey string `json:"index_key"`
|
||||
}
|
||||
|
||||
func (c *pgCache[I, K, V]) indexKeysFromEntry(entry V) []indexKey[I, K] {
|
||||
keys := make([]indexKey[I, K], 0, len(c.indices)*3) // naive assumption
|
||||
func (c *pgCache[I, K, V]) indexKeysFromEntry(entry V) []indexKey[I] {
|
||||
keys := make([]indexKey[I], 0, len(c.indices)*3) // naive assumption
|
||||
for _, index := range c.indices {
|
||||
for _, key := range entry.Keys(index) {
|
||||
keys = append(keys, indexKey[I, K]{
|
||||
keys = append(keys, indexKey[I]{
|
||||
IndexID: index,
|
||||
IndexKey: key,
|
||||
IndexKey: c.versionedKey(key),
|
||||
})
|
||||
}
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func (c *pgCache[I, K, V]) versionedKey(key K) string {
|
||||
return fmt.Sprintf("%s:%s", c.zitadelVersion, key)
|
||||
}
|
||||
|
||||
func (c *pgCache[I, K, V]) versionedKeys(key []K) []string {
|
||||
result := make([]string, len(key))
|
||||
for i, k := range key {
|
||||
result[i] = c.versionedKey(k)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
34
internal/cache/connector/pg/pg_test.go
vendored
34
internal/cache/connector/pg/pg_test.go
vendored
@@ -80,7 +80,7 @@ func TestNewCache(t *testing.T) {
|
||||
PGXPool: pool,
|
||||
}
|
||||
|
||||
c, err := NewCache[testIndex, string, *testObject](context.Background(), cachePurpose, conf, testIndices, connector)
|
||||
c, err := NewCache[testIndex, string, *testObject](context.Background(), cachePurpose, "VERSION", conf, testIndices, connector)
|
||||
require.ErrorIs(t, err, tt.wantErr)
|
||||
if tt.wantErr == nil {
|
||||
assert.NotNil(t, c)
|
||||
@@ -115,10 +115,10 @@ func Test_pgCache_Set(t *testing.T) {
|
||||
expect: func(ppi pgxmock.PgxCommonIface) {
|
||||
ppi.ExpectExec(queryExpect).
|
||||
WithArgs(cachePurpose.String(),
|
||||
[]indexKey[testIndex, string]{
|
||||
{IndexID: testIndexID, IndexKey: "id1"},
|
||||
{IndexID: testIndexName, IndexKey: "foo"},
|
||||
{IndexID: testIndexName, IndexKey: "bar"},
|
||||
[]indexKey[testIndex]{
|
||||
{IndexID: testIndexID, IndexKey: "VERSION:id1"},
|
||||
{IndexID: testIndexName, IndexKey: "VERSION:foo"},
|
||||
{IndexID: testIndexName, IndexKey: "VERSION:bar"},
|
||||
},
|
||||
&testObject{
|
||||
ID: "id1",
|
||||
@@ -139,10 +139,10 @@ func Test_pgCache_Set(t *testing.T) {
|
||||
expect: func(ppi pgxmock.PgxCommonIface) {
|
||||
ppi.ExpectExec(queryExpect).
|
||||
WithArgs(cachePurpose.String(),
|
||||
[]indexKey[testIndex, string]{
|
||||
{IndexID: testIndexID, IndexKey: "id1"},
|
||||
{IndexID: testIndexName, IndexKey: "foo"},
|
||||
{IndexID: testIndexName, IndexKey: "bar"},
|
||||
[]indexKey[testIndex]{
|
||||
{IndexID: testIndexID, IndexKey: "VERSION:id1"},
|
||||
{IndexID: testIndexName, IndexKey: "VERSION:foo"},
|
||||
{IndexID: testIndexName, IndexKey: "VERSION:bar"},
|
||||
},
|
||||
&testObject{
|
||||
ID: "id1",
|
||||
@@ -207,7 +207,7 @@ func Test_pgCache_Get(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectQuery(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, "id1", time.Duration(0), time.Duration(0)).
|
||||
WithArgs(cachePurpose.String(), testIndexID, "VERSION:id1", time.Duration(0), time.Duration(0)).
|
||||
WillReturnRows(pgxmock.NewRows([]string{"payload"}))
|
||||
},
|
||||
wantOk: false,
|
||||
@@ -224,7 +224,7 @@ func Test_pgCache_Get(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectQuery(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, "id1", time.Duration(0), time.Duration(0)).
|
||||
WithArgs(cachePurpose.String(), testIndexID, "VERSION:id1", time.Duration(0), time.Duration(0)).
|
||||
WillReturnError(pgx.ErrTxClosed)
|
||||
},
|
||||
wantOk: false,
|
||||
@@ -241,7 +241,7 @@ func Test_pgCache_Get(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectQuery(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, "id1", time.Minute, time.Second).
|
||||
WithArgs(cachePurpose.String(), testIndexID, "VERSION:id1", time.Minute, time.Second).
|
||||
WillReturnRows(
|
||||
pgxmock.NewRows([]string{"payload"}).AddRow(&testObject{
|
||||
ID: "id1",
|
||||
@@ -296,7 +296,7 @@ func Test_pgCache_Invalidate(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectExec(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"VERSION:id1", "VERSION:id2"}).
|
||||
WillReturnError(pgx.ErrTxClosed)
|
||||
},
|
||||
wantErr: pgx.ErrTxClosed,
|
||||
@@ -313,7 +313,7 @@ func Test_pgCache_Invalidate(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectExec(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"VERSION:id1", "VERSION:id2"}).
|
||||
WillReturnResult(pgxmock.NewResult("DELETE", 1))
|
||||
},
|
||||
},
|
||||
@@ -358,7 +358,7 @@ func Test_pgCache_Delete(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectExec(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"VERSION:id1", "VERSION:id2"}).
|
||||
WillReturnError(pgx.ErrTxClosed)
|
||||
},
|
||||
wantErr: pgx.ErrTxClosed,
|
||||
@@ -375,7 +375,7 @@ func Test_pgCache_Delete(t *testing.T) {
|
||||
},
|
||||
expect: func(pci pgxmock.PgxCommonIface) {
|
||||
pci.ExpectExec(queryExpect).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
|
||||
WithArgs(cachePurpose.String(), testIndexID, []string{"VERSION:id1", "VERSION:id2"}).
|
||||
WillReturnResult(pgxmock.NewResult("DELETE", 1))
|
||||
},
|
||||
},
|
||||
@@ -518,7 +518,7 @@ func prepareCache(t *testing.T, conf cache.Config) (cache.PrunerCache[testIndex,
|
||||
connector := &Connector{
|
||||
PGXPool: pool,
|
||||
}
|
||||
c, err := NewCache[testIndex, string, *testObject](context.Background(), cachePurpose, conf, testIndices, connector)
|
||||
c, err := NewCache[testIndex, string, *testObject](context.Background(), cachePurpose, "VERSION", conf, testIndices, connector)
|
||||
require.NoError(t, err)
|
||||
return c, pool
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user