feat(cache): redis cache (#8822)

# Which Problems Are Solved

Add a cache implementation using Redis single mode. This does not add
support for Redis Cluster or sentinel.

# How the Problems Are Solved

Added the `internal/cache/redis` package. All operations occur
atomically, including setting of secondary indexes, using LUA scripts
where needed.

The [`miniredis`](https://github.com/alicebob/miniredis) package is used
to run unit tests.

# Additional Changes

- Move connector code to `internal/cache/connector/...` and remove
duplicate code from `query` and `command` packages.
- Fix a missed invalidation on the restrictions projection

# Additional Context

Closes #8130
This commit is contained in:
Tim Möhlmann
2024-11-04 11:44:51 +01:00
committed by GitHub
parent 9c3e5e467b
commit 250f2344c8
50 changed files with 1767 additions and 293 deletions

View File

@@ -0,0 +1,28 @@
package pg
import (
"github.com/zitadel/zitadel/internal/cache"
"github.com/zitadel/zitadel/internal/database"
)
type Config struct {
Enabled bool
AutoPrune cache.AutoPruneConfig
}
type Connector struct {
PGXPool
Dialect string
Config Config
}
func NewConnector(config Config, client *database.DB) *Connector {
if !config.Enabled {
return nil
}
return &Connector{
PGXPool: client.Pool,
Dialect: client.Type(),
Config: config,
}
}

View File

@@ -0,0 +1,7 @@
create unlogged table if not exists cache.objects_{{ . }}
partition of cache.objects
for values in ('{{ . }}');
create unlogged table if not exists cache.string_keys_{{ . }}
partition of cache.string_keys
for values in ('{{ . }}');

View File

@@ -0,0 +1,5 @@
delete from cache.string_keys k
where k.cache_name = $1
and k.index_id = $2
and k.index_key = any($3)
;

19
internal/cache/connector/pg/get.sql vendored Normal file
View File

@@ -0,0 +1,19 @@
update cache.objects
set last_used_at = now()
where cache_name = $1
and (
select object_id
from cache.string_keys k
where cache_name = $1
and index_id = $2
and index_key = $3
) = id
and case when $4::interval > '0s'
then created_at > now()-$4::interval -- max age
else true
end
and case when $5::interval > '0s'
then last_used_at > now()-$5::interval -- last use
else true
end
returning payload;

View File

@@ -0,0 +1,9 @@
delete from cache.objects o
using cache.string_keys k
where k.cache_name = $1
and k.index_id = $2
and k.index_key = any($3)
and o.cache_name = k.cache_name
and o.id = k.object_id
;

176
internal/cache/connector/pg/pg.go vendored Normal file
View File

@@ -0,0 +1,176 @@
package pg
import (
"context"
_ "embed"
"errors"
"log/slog"
"strings"
"text/template"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"golang.org/x/exp/slices"
"github.com/zitadel/zitadel/internal/cache"
"github.com/zitadel/zitadel/internal/telemetry/tracing"
)
var (
//go:embed create_partition.sql.tmpl
createPartitionQuery string
createPartitionTmpl = template.Must(template.New("create_partition").Parse(createPartitionQuery))
//go:embed set.sql
setQuery string
//go:embed get.sql
getQuery string
//go:embed invalidate.sql
invalidateQuery string
//go:embed delete.sql
deleteQuery string
//go:embed prune.sql
pruneQuery string
//go:embed truncate.sql
truncateQuery string
)
type PGXPool interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
type pgCache[I ~int, K ~string, V cache.Entry[I, K]] struct {
purpose cache.Purpose
config *cache.Config
indices []I
connector *Connector
logger *slog.Logger
}
// NewCache returns a cache that stores and retrieves objects using PostgreSQL unlogged tables.
func NewCache[I ~int, K ~string, V cache.Entry[I, K]](ctx context.Context, purpose cache.Purpose, config cache.Config, indices []I, connector *Connector) (cache.PrunerCache[I, K, V], error) {
c := &pgCache[I, K, V]{
purpose: purpose,
config: &config,
indices: indices,
connector: connector,
logger: config.Log.Slog().With("cache_purpose", purpose),
}
c.logger.InfoContext(ctx, "pg cache logging enabled")
if connector.Dialect == "postgres" {
if err := c.createPartition(ctx); err != nil {
return nil, err
}
}
return c, nil
}
func (c *pgCache[I, K, V]) createPartition(ctx context.Context) error {
var query strings.Builder
if err := createPartitionTmpl.Execute(&query, c.purpose.String()); err != nil {
return err
}
_, err := c.connector.Exec(ctx, query.String())
return err
}
func (c *pgCache[I, K, V]) Set(ctx context.Context, entry V) {
//nolint:errcheck
c.set(ctx, entry)
}
func (c *pgCache[I, K, V]) set(ctx context.Context, entry V) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
keys := c.indexKeysFromEntry(entry)
c.logger.DebugContext(ctx, "pg cache set", "index_key", keys)
_, err = c.connector.Exec(ctx, setQuery, c.purpose.String(), keys, entry)
if err != nil {
c.logger.ErrorContext(ctx, "pg cache set", "err", err)
return err
}
return nil
}
func (c *pgCache[I, K, V]) Get(ctx context.Context, index I, key K) (value V, ok bool) {
value, err := c.get(ctx, index, key)
if err == nil {
c.logger.DebugContext(ctx, "pg cache get", "index", index, "key", key)
return value, true
}
logger := c.logger.With("err", err, "index", index, "key", key)
if errors.Is(err, pgx.ErrNoRows) {
logger.InfoContext(ctx, "pg cache miss")
return value, false
}
logger.ErrorContext(ctx, "pg cache get", "err", err)
return value, false
}
func (c *pgCache[I, K, V]) get(ctx context.Context, index I, key K) (value V, err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
if !slices.Contains(c.indices, index) {
return value, cache.NewIndexUnknownErr(index)
}
err = c.connector.QueryRow(ctx, getQuery, c.purpose.String(), index, key, c.config.MaxAge, c.config.LastUseAge).Scan(&value)
return value, err
}
func (c *pgCache[I, K, V]) Invalidate(ctx context.Context, index I, keys ...K) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
_, err = c.connector.Exec(ctx, invalidateQuery, c.purpose.String(), index, keys)
c.logger.DebugContext(ctx, "pg cache invalidate", "index", index, "keys", keys)
return err
}
func (c *pgCache[I, K, V]) Delete(ctx context.Context, index I, keys ...K) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
_, err = c.connector.Exec(ctx, deleteQuery, c.purpose.String(), index, keys)
c.logger.DebugContext(ctx, "pg cache delete", "index", index, "keys", keys)
return err
}
func (c *pgCache[I, K, V]) Prune(ctx context.Context) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
_, err = c.connector.Exec(ctx, pruneQuery, c.purpose.String(), c.config.MaxAge, c.config.LastUseAge)
c.logger.DebugContext(ctx, "pg cache prune")
return err
}
func (c *pgCache[I, K, V]) Truncate(ctx context.Context) (err error) {
ctx, span := tracing.NewSpan(ctx)
defer func() { span.EndWithError(err) }()
_, err = c.connector.Exec(ctx, truncateQuery, c.purpose.String())
c.logger.DebugContext(ctx, "pg cache truncate")
return err
}
type indexKey[I, K comparable] struct {
IndexID I `json:"index_id"`
IndexKey K `json:"index_key"`
}
func (c *pgCache[I, K, V]) indexKeysFromEntry(entry V) []indexKey[I, K] {
keys := make([]indexKey[I, K], 0, len(c.indices)*3) // naive assumption
for _, index := range c.indices {
for _, key := range entry.Keys(index) {
keys = append(keys, indexKey[I, K]{
IndexID: index,
IndexKey: key,
})
}
}
return keys
}

526
internal/cache/connector/pg/pg_test.go vendored Normal file
View File

@@ -0,0 +1,526 @@
package pg
import (
"context"
"regexp"
"testing"
"time"
"github.com/jackc/pgx/v5"
"github.com/pashagolub/pgxmock/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/cache"
)
type testIndex int
const (
testIndexID testIndex = iota
testIndexName
)
var testIndices = []testIndex{
testIndexID,
testIndexName,
}
type testObject struct {
ID string
Name []string
}
func (o *testObject) Keys(index testIndex) []string {
switch index {
case testIndexID:
return []string{o.ID}
case testIndexName:
return o.Name
default:
return nil
}
}
func TestNewCache(t *testing.T) {
tests := []struct {
name string
expect func(pgxmock.PgxCommonIface)
wantErr error
}{
{
name: "error",
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(regexp.QuoteMeta(expectedCreatePartitionQuery)).
WillReturnError(pgx.ErrTxClosed)
},
wantErr: pgx.ErrTxClosed,
},
{
name: "success",
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(regexp.QuoteMeta(expectedCreatePartitionQuery)).
WillReturnResult(pgxmock.NewResult("CREATE TABLE", 0))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conf := cache.Config{
Log: &logging.Config{
Level: "debug",
AddSource: true,
},
}
pool, err := pgxmock.NewPool()
require.NoError(t, err)
tt.expect(pool)
connector := &Connector{
PGXPool: pool,
Dialect: "postgres",
}
c, err := NewCache[testIndex, string, *testObject](context.Background(), cachePurpose, conf, testIndices, connector)
require.ErrorIs(t, err, tt.wantErr)
if tt.wantErr == nil {
assert.NotNil(t, c)
}
err = pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
func Test_pgCache_Set(t *testing.T) {
queryExpect := regexp.QuoteMeta(setQuery)
type args struct {
entry *testObject
}
tests := []struct {
name string
args args
expect func(pgxmock.PgxCommonIface)
wantErr error
}{
{
name: "error",
args: args{
&testObject{
ID: "id1",
Name: []string{"foo", "bar"},
},
},
expect: func(ppi pgxmock.PgxCommonIface) {
ppi.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(),
[]indexKey[testIndex, string]{
{IndexID: testIndexID, IndexKey: "id1"},
{IndexID: testIndexName, IndexKey: "foo"},
{IndexID: testIndexName, IndexKey: "bar"},
},
&testObject{
ID: "id1",
Name: []string{"foo", "bar"},
}).
WillReturnError(pgx.ErrTxClosed)
},
wantErr: pgx.ErrTxClosed,
},
{
name: "success",
args: args{
&testObject{
ID: "id1",
Name: []string{"foo", "bar"},
},
},
expect: func(ppi pgxmock.PgxCommonIface) {
ppi.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(),
[]indexKey[testIndex, string]{
{IndexID: testIndexID, IndexKey: "id1"},
{IndexID: testIndexName, IndexKey: "foo"},
{IndexID: testIndexName, IndexKey: "bar"},
},
&testObject{
ID: "id1",
Name: []string{"foo", "bar"},
}).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, pool := prepareCache(t, cache.Config{})
defer pool.Close()
tt.expect(pool)
err := c.(*pgCache[testIndex, string, *testObject]).
set(context.Background(), tt.args.entry)
require.ErrorIs(t, err, tt.wantErr)
err = pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
func Test_pgCache_Get(t *testing.T) {
queryExpect := regexp.QuoteMeta(getQuery)
type args struct {
index testIndex
key string
}
tests := []struct {
name string
config cache.Config
args args
expect func(pgxmock.PgxCommonIface)
want *testObject
wantOk bool
}{
{
name: "invalid index",
config: cache.Config{
MaxAge: time.Minute,
LastUseAge: time.Second,
},
args: args{
index: 99,
key: "id1",
},
expect: func(pci pgxmock.PgxCommonIface) {},
wantOk: false,
},
{
name: "no rows",
config: cache.Config{
MaxAge: 0,
LastUseAge: 0,
},
args: args{
index: testIndexID,
key: "id1",
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectQuery(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, "id1", time.Duration(0), time.Duration(0)).
WillReturnRows(pgxmock.NewRows([]string{"payload"}))
},
wantOk: false,
},
{
name: "error",
config: cache.Config{
MaxAge: 0,
LastUseAge: 0,
},
args: args{
index: testIndexID,
key: "id1",
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectQuery(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, "id1", time.Duration(0), time.Duration(0)).
WillReturnError(pgx.ErrTxClosed)
},
wantOk: false,
},
{
name: "ok",
config: cache.Config{
MaxAge: time.Minute,
LastUseAge: time.Second,
},
args: args{
index: testIndexID,
key: "id1",
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectQuery(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, "id1", time.Minute, time.Second).
WillReturnRows(
pgxmock.NewRows([]string{"payload"}).AddRow(&testObject{
ID: "id1",
Name: []string{"foo", "bar"},
}),
)
},
want: &testObject{
ID: "id1",
Name: []string{"foo", "bar"},
},
wantOk: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, pool := prepareCache(t, tt.config)
defer pool.Close()
tt.expect(pool)
got, ok := c.Get(context.Background(), tt.args.index, tt.args.key)
assert.Equal(t, tt.wantOk, ok)
assert.Equal(t, tt.want, got)
err := pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
func Test_pgCache_Invalidate(t *testing.T) {
queryExpect := regexp.QuoteMeta(invalidateQuery)
type args struct {
index testIndex
keys []string
}
tests := []struct {
name string
config cache.Config
args args
expect func(pgxmock.PgxCommonIface)
wantErr error
}{
{
name: "error",
config: cache.Config{
MaxAge: 0,
LastUseAge: 0,
},
args: args{
index: testIndexID,
keys: []string{"id1", "id2"},
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
WillReturnError(pgx.ErrTxClosed)
},
wantErr: pgx.ErrTxClosed,
},
{
name: "ok",
config: cache.Config{
MaxAge: time.Minute,
LastUseAge: time.Second,
},
args: args{
index: testIndexID,
keys: []string{"id1", "id2"},
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, pool := prepareCache(t, tt.config)
defer pool.Close()
tt.expect(pool)
err := c.Invalidate(context.Background(), tt.args.index, tt.args.keys...)
assert.ErrorIs(t, err, tt.wantErr)
err = pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
func Test_pgCache_Delete(t *testing.T) {
queryExpect := regexp.QuoteMeta(deleteQuery)
type args struct {
index testIndex
keys []string
}
tests := []struct {
name string
config cache.Config
args args
expect func(pgxmock.PgxCommonIface)
wantErr error
}{
{
name: "error",
config: cache.Config{
MaxAge: 0,
LastUseAge: 0,
},
args: args{
index: testIndexID,
keys: []string{"id1", "id2"},
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
WillReturnError(pgx.ErrTxClosed)
},
wantErr: pgx.ErrTxClosed,
},
{
name: "ok",
config: cache.Config{
MaxAge: time.Minute,
LastUseAge: time.Second,
},
args: args{
index: testIndexID,
keys: []string{"id1", "id2"},
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(), testIndexID, []string{"id1", "id2"}).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, pool := prepareCache(t, tt.config)
defer pool.Close()
tt.expect(pool)
err := c.Delete(context.Background(), tt.args.index, tt.args.keys...)
assert.ErrorIs(t, err, tt.wantErr)
err = pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
func Test_pgCache_Prune(t *testing.T) {
queryExpect := regexp.QuoteMeta(pruneQuery)
tests := []struct {
name string
config cache.Config
expect func(pgxmock.PgxCommonIface)
wantErr error
}{
{
name: "error",
config: cache.Config{
MaxAge: 0,
LastUseAge: 0,
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(), time.Duration(0), time.Duration(0)).
WillReturnError(pgx.ErrTxClosed)
},
wantErr: pgx.ErrTxClosed,
},
{
name: "ok",
config: cache.Config{
MaxAge: time.Minute,
LastUseAge: time.Second,
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String(), time.Minute, time.Second).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, pool := prepareCache(t, tt.config)
defer pool.Close()
tt.expect(pool)
err := c.Prune(context.Background())
assert.ErrorIs(t, err, tt.wantErr)
err = pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
func Test_pgCache_Truncate(t *testing.T) {
queryExpect := regexp.QuoteMeta(truncateQuery)
tests := []struct {
name string
config cache.Config
expect func(pgxmock.PgxCommonIface)
wantErr error
}{
{
name: "error",
config: cache.Config{
MaxAge: 0,
LastUseAge: 0,
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String()).
WillReturnError(pgx.ErrTxClosed)
},
wantErr: pgx.ErrTxClosed,
},
{
name: "ok",
config: cache.Config{
MaxAge: time.Minute,
LastUseAge: time.Second,
},
expect: func(pci pgxmock.PgxCommonIface) {
pci.ExpectExec(queryExpect).
WithArgs(cachePurpose.String()).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, pool := prepareCache(t, tt.config)
defer pool.Close()
tt.expect(pool)
err := c.Truncate(context.Background())
assert.ErrorIs(t, err, tt.wantErr)
err = pool.ExpectationsWereMet()
assert.NoError(t, err)
})
}
}
const (
cachePurpose = cache.PurposeAuthzInstance
expectedCreatePartitionQuery = `create unlogged table if not exists cache.objects_authz_instance
partition of cache.objects
for values in ('authz_instance');
create unlogged table if not exists cache.string_keys_authz_instance
partition of cache.string_keys
for values in ('authz_instance');
`
)
func prepareCache(t *testing.T, conf cache.Config) (cache.PrunerCache[testIndex, string, *testObject], pgxmock.PgxPoolIface) {
conf.Log = &logging.Config{
Level: "debug",
AddSource: true,
}
pool, err := pgxmock.NewPool()
require.NoError(t, err)
pool.ExpectExec(regexp.QuoteMeta(expectedCreatePartitionQuery)).
WillReturnResult(pgxmock.NewResult("CREATE TABLE", 0))
connector := &Connector{
PGXPool: pool,
Dialect: "postgres",
}
c, err := NewCache[testIndex, string, *testObject](context.Background(), cachePurpose, conf, testIndices, connector)
require.NoError(t, err)
return c, pool
}

18
internal/cache/connector/pg/prune.sql vendored Normal file
View File

@@ -0,0 +1,18 @@
delete from cache.objects o
where o.cache_name = $1
and (
case when $2::interval > '0s'
then created_at < now()-$2::interval -- max age
else false
end
or case when $3::interval > '0s'
then last_used_at < now()-$3::interval -- last use
else false
end
or o.id not in (
select object_id
from cache.string_keys
where cache_name = $1
)
)
;

19
internal/cache/connector/pg/set.sql vendored Normal file
View File

@@ -0,0 +1,19 @@
with object as (
insert into cache.objects (cache_name, payload)
values ($1, $3)
returning id
)
insert into cache.string_keys (
cache_name,
index_id,
index_key,
object_id
)
select $1, keys.index_id, keys.index_key, id as object_id
from object, jsonb_to_recordset($2) keys (
index_id bigint,
index_key text
)
on conflict (cache_name, index_id, index_key) do
update set object_id = EXCLUDED.object_id
;

View File

@@ -0,0 +1,3 @@
delete from cache.objects o
where o.cache_name = $1
;