mirror of
https://github.com/zitadel/zitadel.git
synced 2025-10-24 08:09:13 +00:00
fix: add keys to projections (#2728)
* fix: add keys to projections * change to multiple tables * merge * change migration version * merge * Update migrations/cockroach/V1.98__keys.sql Co-authored-by: Silvan <silvan.reusser@gmail.com> * Update migrations/cockroach/V1.98__keys.sql Co-authored-by: Silvan <silvan.reusser@gmail.com> * check keys Co-authored-by: Silvan <silvan.reusser@gmail.com>
This commit is contained in:
@@ -1,9 +1,11 @@
|
||||
package crypto
|
||||
|
||||
import (
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/golang/mock/gomock"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
)
|
||||
|
||||
func CreateMockEncryptionAlg(ctrl *gomock.Controller) EncryptionAlgorithm {
|
||||
@@ -24,6 +26,14 @@ func CreateMockEncryptionAlg(ctrl *gomock.Controller) EncryptionAlgorithm {
|
||||
return string(code), nil
|
||||
},
|
||||
)
|
||||
mCrypto.EXPECT().Decrypt(gomock.Any(), gomock.Any()).AnyTimes().DoAndReturn(
|
||||
func(code []byte, keyID string) ([]byte, error) {
|
||||
if keyID != "id" {
|
||||
return nil, errors.ThrowInternal(nil, "id", "invalid key id")
|
||||
}
|
||||
return code, nil
|
||||
},
|
||||
)
|
||||
return mCrypto
|
||||
}
|
||||
|
||||
|
123
internal/query/projection/key.go
Normal file
123
internal/query/projection/key.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/caos/logging"
|
||||
|
||||
"github.com/caos/zitadel/internal/config/systemdefaults"
|
||||
"github.com/caos/zitadel/internal/crypto"
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler/crdb"
|
||||
"github.com/caos/zitadel/internal/repository/keypair"
|
||||
)
|
||||
|
||||
type KeyProjection struct {
|
||||
crdb.StatementHandler
|
||||
encryptionAlgorithm crypto.EncryptionAlgorithm
|
||||
}
|
||||
|
||||
const (
|
||||
KeyProjectionTable = "zitadel.projections.keys"
|
||||
KeyPrivateTable = KeyProjectionTable + "_" + privateKeyTableSuffix
|
||||
KeyPublicTable = KeyProjectionTable + "_" + publicKeyTableSuffix
|
||||
)
|
||||
|
||||
func NewKeyProjection(ctx context.Context, config crdb.StatementHandlerConfig, keyConfig systemdefaults.KeyConfig) (_ *KeyProjection, err error) {
|
||||
p := &KeyProjection{}
|
||||
config.ProjectionName = KeyProjectionTable
|
||||
config.Reducers = p.reducers()
|
||||
p.StatementHandler = crdb.NewStatementHandler(ctx, config)
|
||||
p.encryptionAlgorithm, err = crypto.NewAESCrypto(keyConfig.EncryptionConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *KeyProjection) reducers() []handler.AggregateReducer {
|
||||
return []handler.AggregateReducer{
|
||||
{
|
||||
Aggregate: keypair.AggregateType,
|
||||
EventRedusers: []handler.EventReducer{
|
||||
{
|
||||
Event: keypair.AddedEventType,
|
||||
Reduce: p.reduceKeyPairAdded,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
KeyColumnID = "id"
|
||||
KeyColumnCreationDate = "creation_date"
|
||||
KeyColumnChangeDate = "change_date"
|
||||
KeyColumnResourceOwner = "resource_owner"
|
||||
KeyColumnSequence = "sequence"
|
||||
KeyColumnAlgorithm = "algorithm"
|
||||
KeyColumnUse = "use"
|
||||
|
||||
privateKeyTableSuffix = "private"
|
||||
KeyPrivateColumnID = "id"
|
||||
KeyPrivateColumnExpiry = "expiry"
|
||||
KeyPrivateColumnKey = "key"
|
||||
|
||||
publicKeyTableSuffix = "public"
|
||||
KeyPublicColumnID = "id"
|
||||
KeyPublicColumnExpiry = "expiry"
|
||||
KeyPublicColumnKey = "key"
|
||||
)
|
||||
|
||||
func (p *KeyProjection) reduceKeyPairAdded(event eventstore.EventReader) (*handler.Statement, error) {
|
||||
e, ok := event.(*keypair.AddedEvent)
|
||||
if !ok {
|
||||
logging.LogWithFields("HANDL-GEdg3", "seq", event.Sequence(), "expectedType", keypair.AddedEventType).Error("wrong event type")
|
||||
return nil, errors.ThrowInvalidArgument(nil, "HANDL-SAbr2", "reduce.wrong.event.type")
|
||||
}
|
||||
if e.PrivateKey.Expiry.Before(time.Now()) && e.PublicKey.Expiry.Before(time.Now()) {
|
||||
return crdb.NewNoOpStatement(e), nil
|
||||
}
|
||||
creates := []func(eventstore.EventReader) crdb.Exec{
|
||||
crdb.AddCreateStatement(
|
||||
[]handler.Column{
|
||||
handler.NewCol(KeyColumnID, e.Aggregate().ID),
|
||||
handler.NewCol(KeyColumnCreationDate, e.CreationDate()),
|
||||
handler.NewCol(KeyColumnChangeDate, e.CreationDate()),
|
||||
handler.NewCol(KeyColumnResourceOwner, e.Aggregate().ResourceOwner),
|
||||
handler.NewCol(KeyColumnSequence, e.Sequence()),
|
||||
handler.NewCol(KeyColumnAlgorithm, e.Algorithm),
|
||||
handler.NewCol(KeyColumnUse, e.Usage),
|
||||
},
|
||||
),
|
||||
}
|
||||
if e.PrivateKey.Expiry.After(time.Now()) {
|
||||
creates = append(creates, crdb.AddCreateStatement(
|
||||
[]handler.Column{
|
||||
handler.NewCol(KeyPrivateColumnID, e.Aggregate().ID),
|
||||
handler.NewCol(KeyPrivateColumnExpiry, e.PrivateKey.Expiry),
|
||||
handler.NewCol(KeyPrivateColumnKey, e.PrivateKey.Key),
|
||||
},
|
||||
crdb.WithTableSuffix(privateKeyTableSuffix),
|
||||
))
|
||||
}
|
||||
if e.PublicKey.Expiry.After(time.Now()) {
|
||||
publicKey, err := crypto.Decrypt(e.PublicKey.Key, p.encryptionAlgorithm)
|
||||
if err != nil {
|
||||
logging.LogWithFields("HANDL-SDfw2", "seq", event.Sequence()).Error("cannot decrypt public key")
|
||||
return nil, errors.ThrowInternal(err, "HANDL-DAg2f", "cannot decrypt public key")
|
||||
}
|
||||
creates = append(creates, crdb.AddCreateStatement(
|
||||
[]handler.Column{
|
||||
handler.NewCol(KeyPublicColumnID, e.Aggregate().ID),
|
||||
handler.NewCol(KeyPublicColumnExpiry, e.PublicKey.Expiry),
|
||||
handler.NewCol(KeyPublicColumnKey, publicKey),
|
||||
},
|
||||
crdb.WithTableSuffix(publicKeyTableSuffix),
|
||||
))
|
||||
}
|
||||
return crdb.NewMultiStatement(e, creates...), nil
|
||||
}
|
118
internal/query/projection/key_test.go
Normal file
118
internal/query/projection/key_test.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package projection
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
|
||||
"github.com/caos/zitadel/internal/crypto"
|
||||
"github.com/caos/zitadel/internal/domain"
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler"
|
||||
"github.com/caos/zitadel/internal/eventstore/repository"
|
||||
"github.com/caos/zitadel/internal/repository/keypair"
|
||||
)
|
||||
|
||||
func TestKeyProjection_reduces(t *testing.T) {
|
||||
type args struct {
|
||||
event func(t *testing.T) eventstore.EventReader
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
reduce func(event eventstore.EventReader) (*handler.Statement, error)
|
||||
want wantReduce
|
||||
}{
|
||||
{
|
||||
name: "reduceKeyPairAdded",
|
||||
args: args{
|
||||
event: getEvent(testEvent(
|
||||
repository.EventType(keypair.AddedEventType),
|
||||
keypair.AggregateType,
|
||||
keypairAddedEventData(time.Now().Add(time.Hour)),
|
||||
), keypair.AddedEventMapper),
|
||||
},
|
||||
reduce: (&KeyProjection{encryptionAlgorithm: crypto.CreateMockEncryptionAlg(gomock.NewController(t))}).reduceKeyPairAdded,
|
||||
want: wantReduce{
|
||||
projection: KeyProjectionTable,
|
||||
aggregateType: eventstore.AggregateType("key_pair"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{
|
||||
executions: []execution{
|
||||
{
|
||||
expectedStmt: "INSERT INTO zitadel.projections.keys (id, creation_date, change_date, resource_owner, sequence, algorithm, use) VALUES ($1, $2, $3, $4, $5, $6, $7)",
|
||||
expectedArgs: []interface{}{
|
||||
"agg-id",
|
||||
anyArg{},
|
||||
anyArg{},
|
||||
"ro-id",
|
||||
uint64(15),
|
||||
"algorithm",
|
||||
domain.KeyUsageSigning,
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO zitadel.projections.keys_private (id, expiry, key) VALUES ($1, $2, $3)",
|
||||
expectedArgs: []interface{}{
|
||||
"agg-id",
|
||||
anyArg{},
|
||||
&crypto.CryptoValue{
|
||||
CryptoType: crypto.TypeEncryption,
|
||||
Algorithm: "enc",
|
||||
KeyID: "id",
|
||||
Crypted: []byte("privateKey"),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
expectedStmt: "INSERT INTO zitadel.projections.keys_public (id, expiry, key) VALUES ($1, $2, $3)",
|
||||
expectedArgs: []interface{}{
|
||||
"agg-id",
|
||||
anyArg{},
|
||||
[]byte("publicKey"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reduceKeyPairAdded expired",
|
||||
args: args{
|
||||
event: getEvent(testEvent(
|
||||
repository.EventType(keypair.AddedEventType),
|
||||
keypair.AggregateType,
|
||||
keypairAddedEventData(time.Now().Add(-time.Hour)),
|
||||
), keypair.AddedEventMapper),
|
||||
},
|
||||
reduce: (&KeyProjection{}).reduceKeyPairAdded,
|
||||
want: wantReduce{
|
||||
projection: KeyProjectionTable,
|
||||
aggregateType: eventstore.AggregateType("key_pair"),
|
||||
sequence: 15,
|
||||
previousSequence: 10,
|
||||
executer: &testExecuter{},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
event := baseEvent(t)
|
||||
got, err := tt.reduce(event)
|
||||
if !errors.IsErrorInvalidArgument(err) {
|
||||
t.Errorf("no wrong event mapping: %v, got: %v", err, got)
|
||||
}
|
||||
|
||||
event = tt.args.event(t)
|
||||
got, err = tt.reduce(event)
|
||||
assertReduce(t, got, err, tt.want)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func keypairAddedEventData(t time.Time) []byte {
|
||||
return []byte(`{"algorithm": "algorithm", "usage": 0, "privateKey": {"key": {"cryptoType": 0, "algorithm": "enc", "keyID": "id", "crypted": "cHJpdmF0ZUtleQ=="}, "expiry": "` + t.Format(time.RFC3339) + `"}, "publicKey": {"key": {"cryptoType": 0, "algorithm": "enc", "keyID": "id", "crypted": "cHVibGljS2V5"}, "expiry": "` + t.Format(time.RFC3339) + `"}}`)
|
||||
}
|
@@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"time"
|
||||
|
||||
"github.com/caos/zitadel/internal/config/systemdefaults"
|
||||
"github.com/caos/zitadel/internal/eventstore"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler"
|
||||
"github.com/caos/zitadel/internal/eventstore/handler/crdb"
|
||||
@@ -16,7 +17,7 @@ const (
|
||||
failedEventsTable = "projections.failed_events"
|
||||
)
|
||||
|
||||
func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, config Config) error {
|
||||
func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, config Config, defaults systemdefaults.SystemDefaults) error {
|
||||
projectionConfig := crdb.StatementHandlerConfig{
|
||||
ProjectionHandlerConfig: handler.ProjectionHandlerConfig{
|
||||
HandlerConfig: handler.HandlerConfig{
|
||||
@@ -62,8 +63,9 @@ func Start(ctx context.Context, sqlClient *sql.DB, es *eventstore.Eventstore, co
|
||||
NewIAMMemberProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["iam_members"]))
|
||||
NewProjectMemberProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["project_members"]))
|
||||
NewProjectGrantMemberProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["project_grant_members"]))
|
||||
_, err := NewKeyProjection(ctx, applyCustomConfig(projectionConfig, config.Customizations["keys"]), defaults.KeyConfig)
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
func applyCustomConfig(config crdb.StatementHandlerConfig, customConfig CustomConfig) crdb.StatementHandlerConfig {
|
||||
|
@@ -11,6 +11,7 @@ import (
|
||||
"github.com/caos/zitadel/internal/query/projection"
|
||||
"github.com/caos/zitadel/internal/repository/action"
|
||||
iam_repo "github.com/caos/zitadel/internal/repository/iam"
|
||||
"github.com/caos/zitadel/internal/repository/keypair"
|
||||
"github.com/caos/zitadel/internal/repository/org"
|
||||
"github.com/caos/zitadel/internal/repository/project"
|
||||
usr_repo "github.com/caos/zitadel/internal/repository/user"
|
||||
@@ -43,8 +44,9 @@ func StartQueries(ctx context.Context, es *eventstore.Eventstore, projections pr
|
||||
org.RegisterEventMappers(repo.eventstore)
|
||||
project.RegisterEventMappers(repo.eventstore)
|
||||
action.RegisterEventMappers(repo.eventstore)
|
||||
keypair.RegisterEventMappers(repo.eventstore)
|
||||
|
||||
err = projection.Start(ctx, sqlClient, es, projections)
|
||||
err = projection.Start(ctx, sqlClient, es, projections, defaults)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
27
migrations/cockroach/V1.98__keys.sql
Normal file
27
migrations/cockroach/V1.98__keys.sql
Normal file
@@ -0,0 +1,27 @@
|
||||
CREATE TABLE zitadel.projections.keys (
|
||||
id STRING,
|
||||
creation_date TIMESTAMPTZ NOT NULL,
|
||||
change_date TIMESTAMPTZ NOT NULL,
|
||||
resource_owner STRING NOT NULL,
|
||||
sequence INT8 NOT NULL,
|
||||
algorithm STRING DEFAULT '' NOT NULL,
|
||||
use STRING DEFAULT '' NOT NULL,
|
||||
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE TABLE zitadel.projections.keys_private(
|
||||
id STRING REFERENCES zitadel.projections.keys ON DELETE NO ACTION,
|
||||
expiry TIMESTAMPTZ NOT NULL,
|
||||
key JSONB NOT NULL,
|
||||
|
||||
PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE TABLE zitadel.projections.keys_public (
|
||||
id STRING REFERENCES zitadel.projections.keys ON DELETE NO ACTION,
|
||||
expiry TIMESTAMPTZ NOT NULL,
|
||||
key BYTES NOT NULL,
|
||||
|
||||
PRIMARY KEY (id)
|
||||
);
|
Reference in New Issue
Block a user