mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 15:27:33 +00:00
init
This commit is contained in:
0
backend/LICENSE
Normal file
0
backend/LICENSE
Normal file
45
backend/cmd/config/config.go
Normal file
45
backend/cmd/config/config.go
Normal file
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
|
||||
*/
|
||||
package config
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var (
|
||||
// ConfigureCmd represents the config command
|
||||
ConfigureCmd = &cobra.Command{
|
||||
Use: "configure",
|
||||
Short: "Guides you through configuring Zitadel",
|
||||
// Long: `A longer description that spans multiple lines and likely contains examples
|
||||
// and usage of using your command. For example:
|
||||
|
||||
// Cobra is a CLI library for Go that empowers applications.
|
||||
// This application is a tool to generate the needed files
|
||||
// to quickly create a Cobra application.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
fmt.Println("config called")
|
||||
fmt.Println(viper.AllSettings())
|
||||
fmt.Println(viper.Sub("database").AllSettings())
|
||||
viper.en
|
||||
},
|
||||
}
|
||||
|
||||
upgrade bool
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Here you will define your flags and configuration settings.
|
||||
ConfigureCmd.Flags().BoolVarP(&upgrade, "upgrade", "u", false, "Only changed configuration values since the previously used version will be asked for")
|
||||
|
||||
// Cobra supports Persistent Flags which will work for this command
|
||||
// and all subcommands, e.g.:
|
||||
// configureCmd.PersistentFlags().String("foo", "", "A help for foo")
|
||||
|
||||
// Cobra supports local flags which will only run when this command
|
||||
// is called directly, e.g.:
|
||||
}
|
37
backend/cmd/prepare/prepare.go
Normal file
37
backend/cmd/prepare/prepare.go
Normal file
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
|
||||
*/
|
||||
package prepare
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// PrepareCmd represents the prepare command
|
||||
var PrepareCmd = &cobra.Command{
|
||||
Use: "prepare",
|
||||
Short: "Prepares the environment before starting Zitadel",
|
||||
// Long: `A longer description that spans multiple lines and likely contains examples
|
||||
// and usage of using your command. For example:
|
||||
|
||||
// Cobra is a CLI library for Go that empowers applications.
|
||||
// This application is a tool to generate the needed files
|
||||
// to quickly create a Cobra application.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
fmt.Println("prepare called")
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Here you will define your flags and configuration settings.
|
||||
|
||||
// Cobra supports Persistent Flags which will work for this command
|
||||
// and all subcommands, e.g.:
|
||||
// prepareCmd.PersistentFlags().String("foo", "", "A help for foo")
|
||||
|
||||
// Cobra supports local flags which will only run when this command
|
||||
// is called directly, e.g.:
|
||||
// prepareCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
||||
}
|
84
backend/cmd/root.go
Normal file
84
backend/cmd/root.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/zitadel/zitadel/backend/cmd/config"
|
||||
"github.com/zitadel/zitadel/backend/cmd/prepare"
|
||||
"github.com/zitadel/zitadel/backend/cmd/start"
|
||||
"github.com/zitadel/zitadel/backend/cmd/upgrade"
|
||||
)
|
||||
|
||||
var cfgFile string
|
||||
|
||||
// RootCmd represents the base command when called without any subcommands
|
||||
var RootCmd = &cobra.Command{
|
||||
Use: "zitadel [subcommand]",
|
||||
Short: "A brief description of your application",
|
||||
Long: `A longer description that spans multiple lines and likely contains
|
||||
examples and usage of using your application. For example:
|
||||
|
||||
Cobra is a CLI library for Go that empowers applications.
|
||||
This application is a tool to generate the needed files
|
||||
to quickly create a Cobra application.`,
|
||||
// Uncomment the following line if your bare application
|
||||
// has an action associated with it:
|
||||
// Run: func(cmd *cobra.Command, args []string) { },
|
||||
}
|
||||
|
||||
// Execute adds all child commands to the root command and sets flags appropriately.
|
||||
// This is called by main.main(). It only needs to happen once to the rootCmd.
|
||||
func Execute() {
|
||||
err := RootCmd.Execute()
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
RootCmd.AddCommand(config.ConfigureCmd)
|
||||
RootCmd.AddCommand(prepare.PrepareCmd)
|
||||
RootCmd.AddCommand(start.StartCmd)
|
||||
RootCmd.AddCommand(upgrade.UpgradeCmd)
|
||||
|
||||
cobra.OnInitialize(initConfig)
|
||||
|
||||
// Here you will define your flags and configuration settings.
|
||||
// Cobra supports persistent flags, which, if defined here,
|
||||
// will be global for your application.
|
||||
|
||||
RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.zitadel.yaml)")
|
||||
|
||||
// Cobra also supports local flags, which will only run
|
||||
// when this action is called directly.
|
||||
RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
||||
}
|
||||
|
||||
// initConfig reads in config file and ENV variables if set.
|
||||
func initConfig() {
|
||||
if cfgFile != "" {
|
||||
// Use config file from the flag.
|
||||
viper.SetConfigFile(cfgFile)
|
||||
} else {
|
||||
// Find home directory.
|
||||
home, err := os.UserHomeDir()
|
||||
cobra.CheckErr(err)
|
||||
|
||||
// Search config in home directory with name ".zitadel" (without extension).
|
||||
viper.AddConfigPath(home)
|
||||
viper.SetConfigType("yaml")
|
||||
viper.SetConfigName(".zitadel")
|
||||
}
|
||||
|
||||
viper.AutomaticEnv() // read in environment variables that match
|
||||
viper.AllowEmptyEnv(true)
|
||||
viper.SetEnvPrefix("ZITADEL")
|
||||
|
||||
// If a config file is found, read it in.
|
||||
if err := viper.ReadInConfig(); err == nil {
|
||||
fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed())
|
||||
}
|
||||
}
|
37
backend/cmd/start/start.go
Normal file
37
backend/cmd/start/start.go
Normal file
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
|
||||
*/
|
||||
package start
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// StartCmd represents the start command
|
||||
var StartCmd = &cobra.Command{
|
||||
Use: "start",
|
||||
Short: "Starts the Zitadel server",
|
||||
// Long: `A longer description that spans multiple lines and likely contains examples
|
||||
// and usage of using your command. For example:
|
||||
|
||||
// Cobra is a CLI library for Go that empowers applications.
|
||||
// This application is a tool to generate the needed files
|
||||
// to quickly create a Cobra application.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
fmt.Println("start called")
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
// Here you will define your flags and configuration settings.
|
||||
|
||||
// Cobra supports Persistent Flags which will work for this command
|
||||
// and all subcommands, e.g.:
|
||||
// startCmd.PersistentFlags().String("foo", "", "A help for foo")
|
||||
|
||||
// Cobra supports local flags which will only run when this command
|
||||
// is called directly, e.g.:
|
||||
// startCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
||||
}
|
5
backend/cmd/test.yaml
Normal file
5
backend/cmd/test.yaml
Normal file
@@ -0,0 +1,5 @@
|
||||
database:
|
||||
postgres: 'something'
|
||||
cockroach:
|
||||
host: localhost
|
||||
port: 26257
|
38
backend/cmd/upgrade/upgrade.go
Normal file
38
backend/cmd/upgrade/upgrade.go
Normal file
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
|
||||
*/
|
||||
package upgrade
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
// UpgradeCmd represents the upgrade command
|
||||
var UpgradeCmd = &cobra.Command{
|
||||
Use: "upgrade",
|
||||
Short: "Upgrades Zitadel from a previous version",
|
||||
// Long: `A longer description that spans multiple lines and likely contains examples
|
||||
// and usage of using your command. For example:
|
||||
|
||||
// Cobra is a CLI library for Go that empowers applications.
|
||||
// This application is a tool to generate the needed files
|
||||
// to quickly create a Cobra application.`,
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
fmt.Println("upgrade called")
|
||||
},
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
// Here you will define your flags and configuration settings.
|
||||
|
||||
// Cobra supports Persistent Flags which will work for this command
|
||||
// and all subcommands, e.g.:
|
||||
// upgradeCmd.PersistentFlags().String("foo", "", "A help for foo")
|
||||
|
||||
// Cobra supports local flags which will only run when this command
|
||||
// is called directly, e.g.:
|
||||
// upgradeCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
|
||||
}
|
90
backend/domain/instance.go
Normal file
90
backend/domain/instance.go
Normal file
@@ -0,0 +1,90 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
"github.com/zitadel/zitadel/backend/storage/eventstore"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/cache"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/event"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/sql"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/telemetry/logged"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/telemetry/traced"
|
||||
"github.com/zitadel/zitadel/backend/telemetry/tracing"
|
||||
)
|
||||
|
||||
type Instance struct {
|
||||
db database.Pool
|
||||
tracer *tracing.Tracer
|
||||
logger *slog.Logger
|
||||
cache *cache.Instance
|
||||
}
|
||||
|
||||
func NewInstance(db database.Pool, tracer *tracing.Tracer, logger *slog.Logger) *Instance {
|
||||
b := &Instance{
|
||||
db: db,
|
||||
tracer: tracer,
|
||||
logger: logger,
|
||||
|
||||
cache: &cache.Instance{},
|
||||
}
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *Instance) instanceCommandRepo(tx database.Transaction) repository.InstanceRepository {
|
||||
return logged.NewInstance(
|
||||
b.logger,
|
||||
traced.NewInstance(
|
||||
b.tracer,
|
||||
event.NewInstance(
|
||||
eventstore.New(tx),
|
||||
b.cache.SetNext(
|
||||
sql.NewInstance(tx),
|
||||
),
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
func (b *Instance) instanceQueryRepo(tx database.QueryExecutor) repository.InstanceRepository {
|
||||
return logged.NewInstance(
|
||||
b.logger,
|
||||
traced.NewInstance(
|
||||
b.tracer,
|
||||
b.cache.SetNext(
|
||||
sql.NewInstance(tx),
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
func (b *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) {
|
||||
return b.instanceQueryRepo(b.db).ByID(ctx, id)
|
||||
}
|
||||
|
||||
func (b *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) {
|
||||
return b.instanceQueryRepo(b.db).ByDomain(ctx, domain)
|
||||
}
|
||||
|
||||
type SetUpInstance struct {
|
||||
Instance *repository.Instance
|
||||
User *repository.User
|
||||
}
|
||||
|
||||
func (b *Instance) SetUp(ctx context.Context, request *SetUpInstance) (err error) {
|
||||
tx, err := b.db.Begin(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = tx.End(ctx, err)
|
||||
}()
|
||||
err = b.instanceCommandRepo(tx).SetUp(ctx, request.Instance)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return b.userCommandRepo(tx).Create(ctx, request.User)
|
||||
}
|
39
backend/domain/user.go
Normal file
39
backend/domain/user.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
"github.com/zitadel/zitadel/backend/storage/eventstore"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/event"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/sql"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/telemetry/logged"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository/telemetry/traced"
|
||||
)
|
||||
|
||||
func (b *Instance) userCommandRepo(tx database.Transaction) repository.UserRepository {
|
||||
return logged.NewUser(
|
||||
b.logger,
|
||||
traced.NewUser(
|
||||
b.tracer,
|
||||
event.NewUser(
|
||||
eventstore.New(tx),
|
||||
sql.NewUser(tx),
|
||||
),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
func (b *Instance) userQueryRepo(tx database.QueryExecutor) repository.UserRepository {
|
||||
return logged.NewUser(
|
||||
b.logger,
|
||||
traced.NewUser(
|
||||
b.tracer,
|
||||
sql.NewUser(tx),
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
type User struct {
|
||||
ID string
|
||||
Username string
|
||||
}
|
11
backend/main.go
Normal file
11
backend/main.go
Normal file
@@ -0,0 +1,11 @@
|
||||
/*
|
||||
Copyright © 2025 NAME HERE <EMAIL ADDRESS>
|
||||
|
||||
*/
|
||||
package main
|
||||
|
||||
import "github.com/zitadel/zitadel/backend/cmd"
|
||||
|
||||
func main() {
|
||||
cmd.Execute()
|
||||
}
|
225
backend/storage/01_events.sql
Normal file
225
backend/storage/01_events.sql
Normal file
@@ -0,0 +1,225 @@
|
||||
DROP TABLE IF EXISTS properties;
|
||||
DROP TABLE IF EXISTS parents;
|
||||
DROP TABLE IF EXISTS objects;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS objects (
|
||||
type TEXT NOT NULL
|
||||
, id TEXT NOT NULL
|
||||
|
||||
, PRIMARY KEY (type, id)
|
||||
);
|
||||
|
||||
TRUNCATE objects CASCADE;
|
||||
INSERT INTO objects VALUES
|
||||
('instance', 'i1')
|
||||
, ('organization', 'o1')
|
||||
, ('user', 'u1')
|
||||
, ('user', 'u2')
|
||||
, ('organization', 'o2')
|
||||
, ('user', 'u3')
|
||||
, ('project', 'p3')
|
||||
|
||||
, ('instance', 'i2')
|
||||
, ('organization', 'o3')
|
||||
, ('user', 'u4')
|
||||
, ('project', 'p1')
|
||||
, ('project', 'p2')
|
||||
, ('application', 'a1')
|
||||
, ('application', 'a2')
|
||||
, ('org_domain', 'od1')
|
||||
, ('org_domain', 'od2')
|
||||
;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS parents (
|
||||
parent_type TEXT NOT NULL
|
||||
, parent_id TEXT NOT NULL
|
||||
, child_type TEXT NOT NULL
|
||||
, child_id TEXT NOT NULL
|
||||
, PRIMARY KEY (parent_type, parent_id, child_type, child_id)
|
||||
, FOREIGN KEY (parent_type, parent_id) REFERENCES objects(type, id) ON DELETE CASCADE
|
||||
, FOREIGN KEY (child_type, child_id) REFERENCES objects(type, id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
INSERT INTO parents VALUES
|
||||
('instance', 'i1', 'organization', 'o1')
|
||||
, ('organization', 'o1', 'user', 'u1')
|
||||
, ('organization', 'o1', 'user', 'u2')
|
||||
, ('instance', 'i1', 'organization', 'o2')
|
||||
, ('organization', 'o2', 'user', 'u3')
|
||||
, ('organization', 'o2', 'project', 'p3')
|
||||
|
||||
, ('instance', 'i2', 'organization', 'o3')
|
||||
, ('organization', 'o3', 'user', 'u4')
|
||||
, ('organization', 'o3', 'project', 'p1')
|
||||
, ('organization', 'o3', 'project', 'p2')
|
||||
, ('project', 'p1', 'application', 'a1')
|
||||
, ('project', 'p2', 'application', 'a2')
|
||||
, ('organization', 'o3', 'org_domain', 'od1')
|
||||
, ('organization', 'o3', 'org_domain', 'od2')
|
||||
;
|
||||
|
||||
CREATE TABLE properties (
|
||||
object_type TEXT NOT NULL
|
||||
, object_id TEXT NOT NULL
|
||||
, key TEXT NOT NULL
|
||||
, value JSONB NOT NULL
|
||||
, should_index BOOLEAN NOT NULL DEFAULT FALSE
|
||||
|
||||
, PRIMARY KEY (object_type, object_id, key)
|
||||
, FOREIGN KEY (object_type, object_id) REFERENCES objects(type, id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX properties_object_indexed ON properties (object_type, object_id) INCLUDE (value) WHERE should_index;
|
||||
CREATE INDEX properties_value_indexed ON properties (object_type, key, value) WHERE should_index;
|
||||
|
||||
TRUNCATE properties;
|
||||
INSERT INTO properties VALUES
|
||||
('instance', 'i1', 'name', '"Instance 1"', TRUE)
|
||||
, ('instance', 'i1', 'description', '"Instance 1 description"', FALSE)
|
||||
, ('instance', 'i2', 'name', '"Instance 2"', TRUE)
|
||||
, ('organization', 'o1', 'name', '"Organization 1"', TRUE)
|
||||
, ('org_domain', 'od1', 'domain', '"example.com"', TRUE)
|
||||
, ('org_domain', 'od1', 'is_primary', 'true', TRUE)
|
||||
, ('org_domain', 'od1', 'is_verified', 'true', FALSE)
|
||||
, ('org_domain', 'od2', 'domain', '"example.org"', TRUE)
|
||||
, ('org_domain', 'od2', 'is_primary', 'false', TRUE)
|
||||
, ('org_domain', 'od2', 'is_verified', 'false', FALSE)
|
||||
;
|
||||
|
||||
CREATE TABLE events (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid()
|
||||
, type TEXT NOT NULL
|
||||
, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
, revision SMALLINT NOT NULL
|
||||
, creator TEXT NOT NULL
|
||||
, payload JSONB
|
||||
|
||||
, global_sequence NUMERIC NOT NULL DEFAULT pg_current_xact_id()::TEXT::NUMERIC
|
||||
, sequence_order SMALLINT NOT NULL CHECK (sequence_order >= 0)
|
||||
|
||||
-- , object_type TEXT NOT NULL
|
||||
-- , object_id TEXT NOT NULL
|
||||
|
||||
-- , FOREIGN KEY (object_type, object_id) REFERENCES objects(type, id)
|
||||
);
|
||||
|
||||
CREATE TYPE property (
|
||||
-- key must be a json path
|
||||
key TEXT
|
||||
-- value should be a primitive type
|
||||
, value JSONB
|
||||
-- indicates wheter the property should be indexed
|
||||
, should_index BOOLEAN
|
||||
);
|
||||
|
||||
CREATE TYPE parent (
|
||||
parent_type TEXT
|
||||
, parent_id TEXT
|
||||
);
|
||||
|
||||
CREATE TYPE object (
|
||||
type TEXT
|
||||
, id TEXT
|
||||
, properties property[]
|
||||
-- an object automatically inherits the parents of its parent
|
||||
, parents parent[]
|
||||
);
|
||||
|
||||
CREATE TYPE command (
|
||||
type TEXT
|
||||
, revision SMALLINT
|
||||
, creator TEXT
|
||||
, payload JSONB
|
||||
|
||||
-- if properties is null the objects and all its child objects get deleted
|
||||
-- if the value of a property is null the property and all sub fields get deleted
|
||||
-- for example if the key is 'a.b' and the value is null the property 'a.b.c' will be deleted as well
|
||||
, objects object[]
|
||||
);
|
||||
|
||||
CREATE OR REPLACE PROCEDURE update_object(_object object)
|
||||
AS $$
|
||||
DECLARE
|
||||
_property property;
|
||||
BEGIN
|
||||
FOR _property IN ARRAY _object.properties LOOP
|
||||
IF _property.value IS NULL THEN
|
||||
DELETE FROM properties
|
||||
WHERE object_type = _object.type
|
||||
AND object_id = _object.id
|
||||
AND key LIKE CONCAT(_property.key, '%');
|
||||
ELSE
|
||||
INSERT INTO properties (object_type, object_id, key, value, should_index)
|
||||
VALUES (_object.type, _object.id, _property.key, _property.value, _property.should_index)
|
||||
ON CONFLICT (object_type, object_id, key) DO UPDATE SET (value, should_index) = (_property.value, _property.should_index);
|
||||
END IF;
|
||||
END LOOP;
|
||||
END;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE delete_object(_type, _id)
|
||||
AS $$
|
||||
BEGIN
|
||||
WITH RECURSIVE objects_to_delete (_type, _id) AS (
|
||||
SELECT $1, $2
|
||||
|
||||
UNION
|
||||
|
||||
SELECT p.child_type, p.child_id
|
||||
FROM parents p
|
||||
JOIN objects_to_delete o ON p.parent_type = o.type AND p.parent_id = o.id
|
||||
)
|
||||
DELETE FROM objects
|
||||
WHERE (type, id) IN (SELECT * FROM objects_to_delete)
|
||||
END;
|
||||
|
||||
CREATE OR REPLACE FUNCTION push(_commands command[])
|
||||
RETURNS NUMMERIC AS $$
|
||||
DECLARE
|
||||
_command command;
|
||||
_index INT;
|
||||
|
||||
_object object;
|
||||
BEGIN
|
||||
FOR _index IN 1..array_length(_commands, 1) LOOP
|
||||
_command := _commands[_index];
|
||||
INSERT INTO events (type, revision, creator, payload)
|
||||
VALUES (_command.type, _command.revision, _command.creator, _command.payload);
|
||||
|
||||
FOREACH _object IN ARRAY _command.objects LOOP
|
||||
IF _object.properties IS NULL THEN
|
||||
PERFORM delete_object(_object.type, _object.id);
|
||||
ELSE
|
||||
PERFORM update_object(_object);
|
||||
END IF;
|
||||
END LOOP;
|
||||
RETURN pg_current_xact_id()::TEXT::NUMERIC;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
|
||||
BEGIN;
|
||||
|
||||
|
||||
RETURNING *
|
||||
;
|
||||
|
||||
rollback;
|
||||
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
properties
|
||||
WHERE
|
||||
(object_type, object_id) IN (
|
||||
SELECT
|
||||
object_type
|
||||
, object_id
|
||||
FROM
|
||||
properties
|
||||
where
|
||||
object_type = 'instance'
|
||||
and key = 'name'
|
||||
and value = '"Instance 1"'
|
||||
and should_index
|
||||
)
|
||||
;
|
310
backend/storage/02_next_try.sql
Normal file
310
backend/storage/02_next_try.sql
Normal file
@@ -0,0 +1,310 @@
|
||||
-- postgres
|
||||
DROP TABLE IF EXISTS properties;
|
||||
DROP TABLE IF EXISTS parents CASCADE;
|
||||
DROP TABLE IF EXISTS objects CASCADE;
|
||||
DROP TABLE IF EXISTS indexed_properties;
|
||||
DROP TABLE IF EXISTS events;
|
||||
DROP TABLE IF EXISTS models;
|
||||
|
||||
DROP TYPE IF EXISTS object CASCADe;
|
||||
DROP TYPE IF EXISTS model CASCADE;
|
||||
|
||||
CREATE TYPE model AS (
|
||||
name TEXT
|
||||
, id TEXT
|
||||
);
|
||||
|
||||
CREATE TYPE object AS (
|
||||
model TEXT
|
||||
, model_revision SMALLINT
|
||||
, id TEXT
|
||||
, payload JSONB
|
||||
, parents model[]
|
||||
);
|
||||
|
||||
CREATE TABLE models (
|
||||
name TEXT
|
||||
, revision SMALLINT NOT NULL CONSTRAINT positive_revision CHECK (revision > 0)
|
||||
, indexed_paths TEXT[]
|
||||
|
||||
, PRIMARY KEY (name, revision)
|
||||
);
|
||||
|
||||
CREATE TABLE objects (
|
||||
model TEXT NOT NULL
|
||||
, model_revision SMALLINT NOT NULL
|
||||
|
||||
, id TEXT NOT NULL
|
||||
, payload JSONB
|
||||
|
||||
, PRIMARY KEY (model, id)
|
||||
, FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT
|
||||
);
|
||||
|
||||
CREATE TABLE indexed_properties (
|
||||
model TEXT NOT NULL
|
||||
, model_revision SMALLINT NOT NULL
|
||||
, object_id TEXT NOT NULL
|
||||
|
||||
, path TEXT NOT NULL
|
||||
|
||||
, value JSONB
|
||||
, text_value TEXT
|
||||
, number_value NUMERIC
|
||||
, boolean_value BOOLEAN
|
||||
|
||||
, PRIMARY KEY (model, object_id, path)
|
||||
, FOREIGN KEY (model, object_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
, FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION ip_value_converter()
|
||||
RETURNS TRIGGER AS $$
|
||||
BEGIN
|
||||
CASE jsonb_typeof(NEW.value)
|
||||
WHEN 'boolean' THEN
|
||||
NEW.boolean_value := NEW.value::BOOLEAN;
|
||||
NEW.value := NULL;
|
||||
WHEN 'number' THEN
|
||||
NEW.number_value := NEW.value::NUMERIC;
|
||||
NEW.value := NULL;
|
||||
WHEN 'string' THEN
|
||||
NEW.text_value := (NEW.value#>>'{}')::TEXT;
|
||||
NEW.value := NULL;
|
||||
ELSE
|
||||
-- do nothing
|
||||
END CASE;
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE TRIGGER ip_value_converter_before_insert
|
||||
BEFORE INSERT
|
||||
ON indexed_properties
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION ip_value_converter();
|
||||
|
||||
CREATE TRIGGER ip_value_converter_before_update
|
||||
BEFORE UPDATE
|
||||
ON indexed_properties
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION ip_value_converter();
|
||||
|
||||
CREATE INDEX ip_search_model_object ON indexed_properties (model, path, value) WHERE value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_rev_object ON indexed_properties (model, model_revision, path, value) WHERE value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_text ON indexed_properties (model, path, text_value) WHERE text_value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_rev_text ON indexed_properties (model, model_revision, path, text_value) WHERE text_value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_number ON indexed_properties (model, path, number_value) WHERE number_value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_rev_number ON indexed_properties (model, model_revision, path, number_value) WHERE number_value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_boolean ON indexed_properties (model, path, boolean_value) WHERE boolean_value IS NOT NULL;
|
||||
CREATE INDEX ip_search_model_rev_boolean ON indexed_properties (model, model_revision, path, boolean_value) WHERE boolean_value IS NOT NULL;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS parents (
|
||||
parent_model TEXT NOT NULL
|
||||
, parent_id TEXT NOT NULL
|
||||
, child_model TEXT NOT NULL
|
||||
, child_id TEXT NOT NULL
|
||||
|
||||
, PRIMARY KEY (parent_model, parent_id, child_model, child_id)
|
||||
, FOREIGN KEY (parent_model, parent_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
, FOREIGN KEY (child_model, child_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
INSERT INTO models VALUES
|
||||
('instance', 1, ARRAY['name', 'domain.name'])
|
||||
, ('organization', 1, ARRAY['name'])
|
||||
, ('user', 1, ARRAY['username', 'email', 'firstname', 'lastname'])
|
||||
;
|
||||
|
||||
CREATE OR REPLACE FUNCTION jsonb_to_rows(j jsonb, _path text[] DEFAULT ARRAY[]::text[])
|
||||
RETURNS TABLE (path text[], value jsonb)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
k text;
|
||||
v jsonb;
|
||||
BEGIN
|
||||
FOR k, v IN SELECT * FROM jsonb_each(j) LOOP
|
||||
IF jsonb_typeof(v) = 'object' THEN
|
||||
-- Recursive call for nested objects, appending the key to the path
|
||||
RETURN QUERY SELECT * FROM jsonb_to_rows(v, _path || k)
|
||||
UNION VALUES (_path, '{}'::JSONB);
|
||||
ELSE
|
||||
-- Base case: return the key path and value
|
||||
CASE WHEN jsonb_typeof(v) = 'null' THEN
|
||||
RETURN QUERY SELECT _path || k, NULL::jsonb;
|
||||
ELSE
|
||||
RETURN QUERY SELECT _path || k, v;
|
||||
END CASE;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION merge_payload(_old JSONB, _new JSONB)
|
||||
RETURNS JSONB
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
_fields CURSOR FOR SELECT DISTINCT ON (path)
|
||||
path
|
||||
, last_value(value) over (partition by path) as value
|
||||
FROM (
|
||||
SELECT path, value FROM jsonb_to_rows(_old)
|
||||
UNION ALL
|
||||
SELECT path, value FROM jsonb_to_rows(_new)
|
||||
);
|
||||
_path text[];
|
||||
_value jsonb;
|
||||
BEGIN
|
||||
OPEN _fields;
|
||||
LOOP
|
||||
FETCH _fields INTO _path, _value;
|
||||
EXIT WHEN NOT FOUND;
|
||||
IF jsonb_typeof(_value) = 'object' THEN
|
||||
IF _old #> _path IS NOT NULL THEN
|
||||
CONTINUE;
|
||||
END IF;
|
||||
_old = jsonb_set_lax(_old, _path, '{}'::jsonb, TRUE);
|
||||
CONTINUE;
|
||||
END IF;
|
||||
|
||||
_old = jsonb_set_lax(_old, _path, _value, TRUE, 'delete_key');
|
||||
END LOOP;
|
||||
|
||||
RETURN _old;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION set_object(_object object)
|
||||
RETURNS VOID AS $$
|
||||
DECLARE
|
||||
_parent model;
|
||||
BEGIN
|
||||
INSERT INTO objects (model, model_revision, id, payload)
|
||||
VALUES (_object.model, _object.model_revision, _object.id, _object.payload)
|
||||
ON CONFLICT (model, id) DO UPDATE
|
||||
SET
|
||||
payload = merge_payload(objects.payload, EXCLUDED.payload)
|
||||
, model_revision = EXCLUDED.model_revision;
|
||||
|
||||
INSERT INTO indexed_properties (model, model_revision, object_id, path, value)
|
||||
SELECT
|
||||
*
|
||||
FROM (
|
||||
SELECT
|
||||
_object.model
|
||||
, _object.model_revision
|
||||
, _object.id
|
||||
, UNNEST(m.indexed_paths) AS "path"
|
||||
, _object.payload #> string_to_array(UNNEST(m.indexed_paths), '.') AS "value"
|
||||
FROM
|
||||
models m
|
||||
WHERE
|
||||
m.name = _object.model
|
||||
AND m.revision = _object.model_revision
|
||||
GROUP BY
|
||||
m.name
|
||||
, m.revision
|
||||
)
|
||||
WHERE
|
||||
"value" IS NOT NULL
|
||||
ON CONFLICT (model, object_id, path) DO UPDATE
|
||||
SET
|
||||
value = EXCLUDED.value
|
||||
, text_value = EXCLUDED.text_value
|
||||
, number_value = EXCLUDED.number_value
|
||||
, boolean_value = EXCLUDED.boolean_value
|
||||
;
|
||||
|
||||
INSERT INTO parents (parent_model, parent_id, child_model, child_id)
|
||||
VALUES
|
||||
(_object.model, _object.id, _object.model, _object.id)
|
||||
ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING;
|
||||
|
||||
IF _object.parents IS NULL THEN
|
||||
RETURN;
|
||||
END IF;
|
||||
|
||||
FOREACH _parent IN ARRAY _object.parents
|
||||
LOOP
|
||||
INSERT INTO parents (parent_model, parent_id, child_model, child_id)
|
||||
SELECT
|
||||
p.parent_model
|
||||
, p.parent_id
|
||||
, _object.model
|
||||
, _object.id
|
||||
FROM parents p
|
||||
WHERE
|
||||
p.child_model = _parent.name
|
||||
AND p.child_id = _parent.id
|
||||
ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING
|
||||
;
|
||||
|
||||
INSERT INTO parents (parent_model, parent_id, child_model, child_id)
|
||||
VALUES
|
||||
(_parent.name, _parent.id, _object.model, _object.id)
|
||||
ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING;
|
||||
END LOOP;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION set_objects(_objects object[])
|
||||
RETURNS VOID AS $$
|
||||
DECLARE
|
||||
_object object;
|
||||
BEGIN
|
||||
FOREACH _object IN ARRAY _objects
|
||||
LOOP
|
||||
PERFORM set_object(_object);
|
||||
END LOOP;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- CREATE OR REPLACE FUNCTION set_objects(VARIADIC _objects object[])
|
||||
-- RETURNS VOID AS $$
|
||||
-- BEGIN
|
||||
-- PERFORM set_objectS(_objects);
|
||||
-- END;
|
||||
-- $$ LANGUAGE plpgsql;
|
||||
|
||||
SELECT set_objects(
|
||||
ARRAY[
|
||||
ROW('instance', 1::smallint, 'i1', '{"name": "i1", "domain": {"name": "example2.com", "isVerified": false}}', NULL)::object
|
||||
, ROW('organization', 1::smallint, 'o1', '{"name": "o1", "description": "something useful"}', ARRAY[
|
||||
ROW('instance', 'i1')::model
|
||||
])::object
|
||||
, ROW('user', 1::smallint, 'u1', '{"username": "u1", "description": "something useful", "firstname": "Silvan"}', ARRAY[
|
||||
ROW('instance', 'i1')::model
|
||||
, ROW('organization', 'o1')::model
|
||||
])::object
|
||||
]
|
||||
);
|
||||
|
||||
SELECT set_objects(
|
||||
ARRAY[
|
||||
ROW('instance', 1::smallint, 'i1', '{"domain": {"isVerified": true}}', NULL)::object
|
||||
]
|
||||
);
|
||||
|
||||
|
||||
SELECT
|
||||
o.*
|
||||
FROM
|
||||
indexed_properties ip
|
||||
JOIN
|
||||
objects o
|
||||
ON
|
||||
ip.model = o.model
|
||||
AND ip.object_id = o.id
|
||||
WHERE
|
||||
ip.model = 'instance'
|
||||
AND ip.path = 'name'
|
||||
AND ip.text_value = 'i1';
|
||||
;
|
||||
|
||||
select * from merge_payload(
|
||||
'{"a": "asdf", "b": {"c":{"d": 1, "g": {"h": [4,5,6]}}}, "f": [1,2,3]}'::jsonb
|
||||
, '{"b": {"c":{"d": 1, "g": {"i": [4,5,6]}}}, "a": null}'::jsonb
|
||||
);
|
272
backend/storage/03_properties.sql
Normal file
272
backend/storage/03_properties.sql
Normal file
@@ -0,0 +1,272 @@
|
||||
-- postgres
|
||||
DROP TABLE IF EXISTS properties;
|
||||
DROP TABLE IF EXISTS parents CASCADE;
|
||||
DROP TABLE IF EXISTS objects CASCADE;
|
||||
DROP TABLE IF EXISTS indexed_properties;
|
||||
DROP TABLE IF EXISTS events;
|
||||
DROP TABLE IF EXISTS models;
|
||||
|
||||
DROP TYPE IF EXISTS object CASCADe;
|
||||
DROP TYPE IF EXISTS model CASCADE;
|
||||
|
||||
CREATE TYPE model AS (
|
||||
name TEXT
|
||||
, id TEXT
|
||||
);
|
||||
|
||||
CREATE TYPE object AS (
|
||||
model TEXT
|
||||
, model_revision SMALLINT
|
||||
, id TEXT
|
||||
, payload JSONB
|
||||
, parents model[]
|
||||
);
|
||||
|
||||
CREATE TABLE models (
|
||||
name TEXT
|
||||
, revision SMALLINT NOT NULL CONSTRAINT positive_revision CHECK (revision > 0)
|
||||
, indexed_paths TEXT[]
|
||||
|
||||
, PRIMARY KEY (name, revision)
|
||||
);
|
||||
|
||||
CREATE TABLE objects (
|
||||
model TEXT NOT NULL
|
||||
, model_revision SMALLINT NOT NULL
|
||||
|
||||
, id TEXT NOT NULL
|
||||
, payload JSONB
|
||||
|
||||
, PRIMARY KEY (model, id)
|
||||
, FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT
|
||||
);
|
||||
|
||||
CREATE TABLE indexed_properties (
|
||||
model TEXT NOT NULL
|
||||
, model_revision SMALLINT NOT NULL
|
||||
, object_id TEXT NOT NULL
|
||||
|
||||
, path TEXT[] NOT NULL
|
||||
|
||||
, value JSONB
|
||||
, text_value TEXT
|
||||
, number_value NUMERIC
|
||||
, boolean_value BOOLEAN
|
||||
|
||||
, PRIMARY KEY (model, object_id, path)
|
||||
, FOREIGN KEY (model, object_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
, FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS parents (
|
||||
parent_model TEXT NOT NULL
|
||||
, parent_id TEXT NOT NULL
|
||||
, child_model TEXT NOT NULL
|
||||
, child_id TEXT NOT NULL
|
||||
, PRIMARY KEY (parent_model, parent_id, child_model, child_id)
|
||||
, FOREIGN KEY (parent_model, parent_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
, FOREIGN KEY (child_model, child_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION jsonb_to_rows(j jsonb, _path text[] DEFAULT ARRAY[]::text[])
|
||||
RETURNS TABLE (path text[], value jsonb)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
k text;
|
||||
v jsonb;
|
||||
BEGIN
|
||||
FOR k, v IN SELECT * FROM jsonb_each(j) LOOP
|
||||
IF jsonb_typeof(v) = 'object' THEN
|
||||
-- Recursive call for nested objects, appending the key to the path
|
||||
RETURN QUERY SELECT * FROM jsonb_to_rows(v, _path || k);
|
||||
ELSE
|
||||
-- Base case: return the key path and value
|
||||
CASE WHEN jsonb_typeof(v) = 'null' THEN
|
||||
RETURN QUERY SELECT _path || k, NULL::jsonb;
|
||||
ELSE
|
||||
RETURN QUERY SELECT _path || k, v;
|
||||
END CASE;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- after insert trigger which is called after the object was inserted and then inserts the properties
|
||||
CREATE OR REPLACE FUNCTION set_ip_from_object_insert()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
_property RECORD;
|
||||
_model models;
|
||||
BEGIN
|
||||
SELECT * INTO _model FROM models WHERE name = NEW.model AND revision = NEW.model_revision;
|
||||
|
||||
FOR _property IN SELECT * FROM jsonb_to_rows(NEW.payload) LOOP
|
||||
IF ARRAY_TO_STRING(_property.path, '.') = ANY(_model.indexed_paths) THEN
|
||||
INSERT INTO indexed_properties (model, model_revision, object_id, path, value)
|
||||
VALUES (NEW.model, NEW.model_revision, NEW.id, _property.path, _property.value);
|
||||
END IF;
|
||||
END LOOP;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE TRIGGER set_ip_from_object_insert
|
||||
AFTER INSERT ON objects
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION set_ip_from_object_insert();
|
||||
|
||||
-- before update trigger with is called before an object is updated
|
||||
-- it updates the properties table first
|
||||
-- and computes the correct payload for the object
|
||||
-- partial update of the object is allowed
|
||||
-- if the value of a property is set to null the properties and all its children are deleted
|
||||
CREATE OR REPLACE FUNCTION set_ip_from_object_update()
|
||||
RETURNS TRIGGER
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
_property RECORD;
|
||||
_payload JSONB;
|
||||
_model models;
|
||||
_path_index INT;
|
||||
BEGIN
|
||||
_payload := OLD.payload;
|
||||
SELECT * INTO _model FROM models WHERE name = NEW.model AND revision = NEW.model_revision;
|
||||
|
||||
FOR _property IN SELECT * FROM jsonb_to_rows(NEW.payload) ORDER BY array_length(path, 1) LOOP
|
||||
-- set the properties
|
||||
CASE WHEN _property.value IS NULL THEN
|
||||
RAISE NOTICE 'DELETE PROPERTY: %', _property;
|
||||
DELETE FROM indexed_properties
|
||||
WHERE model = NEW.model
|
||||
AND model_revision = NEW.model_revision
|
||||
AND object_id = NEW.id
|
||||
AND path[:ARRAY_LENGTH(_property.path, 1)] = _property.path;
|
||||
ELSE
|
||||
RAISE NOTICE 'UPSERT PROPERTY: %', _property;
|
||||
DELETE FROM indexed_properties
|
||||
WHERE
|
||||
model = NEW.model
|
||||
AND model_revision = NEW.model_revision
|
||||
AND object_id = NEW.id
|
||||
AND (
|
||||
_property.path[:array_length(path, 1)] = path
|
||||
OR path[:array_length(_property.path, 1)] = _property.path
|
||||
)
|
||||
AND array_length(path, 1) <> array_length(_property.path, 1);
|
||||
|
||||
-- insert property if should be indexed
|
||||
IF ARRAY_TO_STRING(_property.path, '.') = ANY(_model.indexed_paths) THEN
|
||||
RAISE NOTICE 'path should be indexed: %', _property.path;
|
||||
INSERT INTO indexed_properties (model, model_revision, object_id, path, value)
|
||||
VALUES (NEW.model, NEW.model_revision, NEW.id, _property.path, _property.value)
|
||||
ON CONFLICT (model, object_id, path) DO UPDATE
|
||||
SET value = EXCLUDED.value;
|
||||
END IF;
|
||||
END CASE;
|
||||
|
||||
-- if property is updated we can set it directly
|
||||
IF _payload #> _property.path IS NOT NULL THEN
|
||||
_payload = jsonb_set_lax(COALESCE(_payload, '{}'::JSONB), _property.path, _property.value, TRUE);
|
||||
EXIT;
|
||||
END IF;
|
||||
-- ensure parent object exists exists
|
||||
FOR _path_index IN 1..(array_length(_property.path, 1)-1) LOOP
|
||||
IF _payload #> _property.path[:_path_index] IS NOT NULL AND jsonb_typeof(_payload #> _property.path[:_path_index]) = 'object' THEN
|
||||
CONTINUE;
|
||||
END IF;
|
||||
|
||||
_payload = jsonb_set(_payload, _property.path[:_path_index], '{}'::JSONB, TRUE);
|
||||
EXIT;
|
||||
END LOOP;
|
||||
_payload = jsonb_set_lax(_payload, _property.path, _property.value, TRUE, 'delete_key');
|
||||
|
||||
END LOOP;
|
||||
|
||||
-- update the payload
|
||||
NEW.payload = _payload;
|
||||
|
||||
RETURN NEW;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE TRIGGER set_ip_from_object_update
|
||||
BEFORE UPDATE ON objects
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION set_ip_from_object_update();
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION set_object(_object object)
|
||||
RETURNS VOID
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
INSERT INTO objects (model, model_revision, id, payload)
|
||||
VALUES (_object.model, _object.model_revision, _object.id, _object.payload)
|
||||
ON CONFLICT (model, id) DO UPDATE
|
||||
SET
|
||||
payload = EXCLUDED.payload
|
||||
, model_revision = EXCLUDED.model_revision
|
||||
;
|
||||
|
||||
INSERT INTO parents (parent_model, parent_id, child_model, child_id)
|
||||
SELECT
|
||||
p.name
|
||||
, p.id
|
||||
, _object.model
|
||||
, _object.id
|
||||
FROM UNNEST(_object.parents) AS p
|
||||
ON CONFLICT DO NOTHING;
|
||||
END;
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION set_objects(_objects object[])
|
||||
RETURNS VOID
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
_object object;
|
||||
BEGIN
|
||||
FOREACH _object IN ARRAY _objects LOOP
|
||||
PERFORM set_object(_object);
|
||||
END LOOP;
|
||||
END;
|
||||
$$;
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
INSERT INTO models VALUES
|
||||
('instance', 1, ARRAY['name', 'domain.name'])
|
||||
, ('organization', 1, ARRAY['name'])
|
||||
, ('user', 1, ARRAY['username', 'email', 'firstname', 'lastname'])
|
||||
;
|
||||
|
||||
INSERT INTO objects VALUES
|
||||
('instance', 1, 'i2', '{"name": "i2", "domain": {"name": "example2.com", "isVerified": false}}')
|
||||
, ('instance', 1, 'i3', '{"name": "i3", "domain": {"name": "example3.com", "isVerified": false}}')
|
||||
, ('instance', 1, 'i4', '{"name": "i4", "domain": {"name": "example4.com", "isVerified": false}}')
|
||||
;
|
||||
|
||||
|
||||
begin;
|
||||
UPDATE objects SET payload = '{"domain": {"isVerified": true}}' WHERE model = 'instance';
|
||||
rollback;
|
||||
|
||||
|
||||
SELECT set_objects(
|
||||
ARRAY[
|
||||
ROW('instance', 1::smallint, 'i1', '{"name": "i1", "domain": {"name": "example2.com", "isVerified": false}}', NULL)::object
|
||||
, ROW('organization', 1::smallint, 'o1', '{"name": "o1", "description": "something useful"}', ARRAY[
|
||||
ROW('instance', 'i1')::model
|
||||
])::object
|
||||
, ROW('user', 1::smallint, 'u1', '{"username": "u1", "description": "something useful", "firstname": "Silvan"}', ARRAY[
|
||||
ROW('instance', 'i1')::model
|
||||
, ROW('organization', 'o1')::model
|
||||
])::object
|
||||
]
|
||||
);
|
280
backend/storage/04_operations.sql
Normal file
280
backend/storage/04_operations.sql
Normal file
@@ -0,0 +1,280 @@
|
||||
-- postgres
|
||||
DROP TABLE IF EXISTS properties;
|
||||
DROP TABLE IF EXISTS parents CASCADE;
|
||||
DROP TABLE IF EXISTS objects CASCADE;
|
||||
DROP TABLE IF EXISTS indexed_properties;
|
||||
DROP TABLE IF EXISTS events;
|
||||
DROP TABLE IF EXISTS models;
|
||||
|
||||
DROP TYPE IF EXISTS object CASCADe;
|
||||
DROP TYPE IF EXISTS model CASCADE;
|
||||
|
||||
CREATE TABLE models (
|
||||
name TEXT
|
||||
, revision SMALLINT NOT NULL CONSTRAINT positive_revision CHECK (revision > 0)
|
||||
, indexed_paths TEXT[]
|
||||
|
||||
, PRIMARY KEY (name, revision)
|
||||
);
|
||||
|
||||
CREATE TABLE objects (
|
||||
model TEXT NOT NULL
|
||||
, model_revision SMALLINT NOT NULL
|
||||
|
||||
, id TEXT NOT NULL
|
||||
, payload JSONB
|
||||
|
||||
, PRIMARY KEY (model, id)
|
||||
, FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT
|
||||
);
|
||||
|
||||
CREATE TYPE operation_type AS ENUM (
|
||||
-- inserts a new object, if the object already exists the operation will fail
|
||||
-- path is ignored
|
||||
'create'
|
||||
-- if path is null an upsert is performed and the payload is overwritten
|
||||
-- if path is not null the value is set at the given path
|
||||
, 'set'
|
||||
-- drops an object if path is null
|
||||
-- if path is set but no value, the field at the given path is dropped
|
||||
-- if path and value are set and the field is an array the value is removed from the array
|
||||
, 'delete'
|
||||
-- adds a value to an array
|
||||
-- or a field if it does not exist, if the field exists the operation will fail
|
||||
, 'add'
|
||||
);
|
||||
|
||||
CREATE TYPE object_manipulation AS (
|
||||
path TEXT[]
|
||||
, operation operation_type
|
||||
, value JSONB
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS parents (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid()
|
||||
, parent_model TEXT NOT NULL
|
||||
, parent_id TEXT NOT NULL
|
||||
, child_model TEXT NOT NULL
|
||||
, child_id TEXT NOT NULL
|
||||
|
||||
, FOREIGN KEY (parent_model, parent_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
, FOREIGN KEY (child_model, child_id) REFERENCES objects(model, id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TYPE parent_operation AS ENUM (
|
||||
'add'
|
||||
, 'remove'
|
||||
);
|
||||
|
||||
CREATE TYPE parent_manipulation AS (
|
||||
model TEXT
|
||||
, id TEXT
|
||||
, operation parent_operation
|
||||
);
|
||||
|
||||
CREATE OR REPLACE FUNCTION jsonb_set_ensure_path(_jsonb JSONB, _path TEXT[], _value JSONB)
|
||||
RETURNS JSONB
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
i INT;
|
||||
BEGIN
|
||||
IF _jsonb #> _path IS NOT NULL THEN
|
||||
RETURN JSONB_SET(_jsonb, _path, _value);
|
||||
END IF;
|
||||
|
||||
FOR i IN REVERSE ARRAY_LENGTH(_path, 1)..1 LOOP
|
||||
_value := JSONB_BUILD_OBJECT(_path[i], _value);
|
||||
IF _jsonb #> _path[:i] IS NOT NULL THEN
|
||||
EXIT;
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
RETURN _jsonb || _value;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- current: {}
|
||||
-- '{"a": {"b": {"c": {"d": {"e": 1}}}}}'::JSONB #> '{a,b,c}' = 1
|
||||
|
||||
drop function manipulate_object;
|
||||
drop function object_set;
|
||||
CREATE OR REPLACE FUNCTION object_set(
|
||||
_model TEXT
|
||||
, _model_revision SMALLINT
|
||||
, _id TEXT
|
||||
|
||||
, _manipulations object_manipulation[]
|
||||
, _parents parent_manipulation[]
|
||||
)
|
||||
RETURNS objects
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
_manipulation object_manipulation;
|
||||
BEGIN
|
||||
FOREACH _manipulation IN ARRAY _manipulations LOOP
|
||||
CASE _manipulation.operation
|
||||
WHEN 'create' THEN
|
||||
INSERT INTO objects (model, model_revision, id, payload)
|
||||
VALUES (_model, _model_revision, _id, _manipulation.value);
|
||||
WHEN 'delete' THEN
|
||||
CASE
|
||||
WHEN _manipulation.path IS NULL THEN
|
||||
DELETE FROM objects
|
||||
WHERE
|
||||
model = _model
|
||||
AND model_revision = _model_revision
|
||||
AND id = _id;
|
||||
WHEN _manipulation.value IS NULL THEN
|
||||
UPDATE
|
||||
objects
|
||||
SET
|
||||
payload = payload #- _manipulation.path
|
||||
WHERE
|
||||
model = _model
|
||||
AND model_revision = _model_revision
|
||||
AND id = _id;
|
||||
ELSE
|
||||
UPDATE
|
||||
objects
|
||||
SET
|
||||
payload = jsonb_set(payload, _manipulation.path, (SELECT JSONB_AGG(v) FROM JSONB_PATH_QUERY(payload, ('$.' || ARRAY_TO_STRING(_manipulation.path, '.') || '[*]')::jsonpath) AS v WHERE v <> _manipulation.value))
|
||||
WHERE
|
||||
model = _model
|
||||
AND model_revision = _model_revision
|
||||
AND id = _id;
|
||||
END CASE;
|
||||
WHEN 'set' THEN
|
||||
IF _manipulation.path IS NULL THEN
|
||||
INSERT INTO objects (model, model_revision, id, payload)
|
||||
VALUES (_model, _model_revision, _id, _manipulation.value)
|
||||
ON CONFLICT (model, model_revision, id)
|
||||
DO UPDATE SET payload = _manipulation.value;
|
||||
ELSE
|
||||
UPDATE
|
||||
objects
|
||||
SET
|
||||
payload = jsonb_set_ensure_path(payload, _manipulation.path, _manipulation.value)
|
||||
WHERE
|
||||
model = _model
|
||||
AND model_revision = _model_revision
|
||||
AND id = _id;
|
||||
END IF;
|
||||
WHEN 'add' THEN
|
||||
UPDATE
|
||||
objects
|
||||
SET
|
||||
-- TODO: parent field must exist
|
||||
payload = CASE
|
||||
WHEN jsonb_typeof(payload #> _manipulation.path) IS NULL THEN
|
||||
jsonb_set_ensure_path(payload, _manipulation.path, _manipulation.value)
|
||||
WHEN jsonb_typeof(payload #> _manipulation.path) = 'array' THEN
|
||||
jsonb_set(payload, _manipulation.path, COALESCE(payload #> _manipulation.path, '[]'::JSONB) || _manipulation.value)
|
||||
-- ELSE
|
||||
-- RAISE EXCEPTION 'Field at path % is not an array', _manipulation.path;
|
||||
END
|
||||
WHERE
|
||||
model = _model
|
||||
AND model_revision = _model_revision
|
||||
AND id = _id;
|
||||
-- TODO: RAISE EXCEPTION 'Field at path % is not an array', _manipulation.path;
|
||||
END CASE;
|
||||
END LOOP;
|
||||
|
||||
FOREACH _parent IN ARRAY _parents LOOP
|
||||
CASE _parent.operation
|
||||
WHEN 'add' THEN
|
||||
-- insert the new parent and all its parents
|
||||
INSERT INTO parents (parent_model, parent_id, child_model, child_id)
|
||||
(
|
||||
SELECT
|
||||
id
|
||||
FROM parents p
|
||||
WHERE
|
||||
p.child_model = _parent.model
|
||||
AND p.child_id = _parent.id
|
||||
UNION
|
||||
SELECT
|
||||
_parent.model
|
||||
, _parent.id
|
||||
, _model
|
||||
, _id
|
||||
)
|
||||
ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING;
|
||||
WHEN 'remove' THEN
|
||||
-- remove the parent including the objects childs parent
|
||||
DELETE FROM parents
|
||||
WHERE id IN (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
parents p
|
||||
WHERE
|
||||
p.child_model = _model
|
||||
AND p.child_id = _id
|
||||
AND p.parent_model = _parent.model
|
||||
AND p.parent_id = _parent.id
|
||||
UNION
|
||||
SELECT
|
||||
id
|
||||
FROM (
|
||||
SELECT
|
||||
id
|
||||
FROM
|
||||
parents p
|
||||
WHERE
|
||||
p.parent_model = _model
|
||||
AND p.parent_id = _id
|
||||
)
|
||||
WHERE
|
||||
|
||||
);
|
||||
END CASE;
|
||||
END LOOP;
|
||||
RETURN NULL;
|
||||
END;
|
||||
$$;
|
||||
|
||||
INSERT INTO models VALUES
|
||||
('instance', 1, ARRAY['name', 'domain.name'])
|
||||
, ('organization', 1, ARRAY['name'])
|
||||
, ('user', 1, ARRAY['username', 'email', 'firstname', 'lastname'])
|
||||
;
|
||||
|
||||
rollback;
|
||||
BEGIN;
|
||||
SELECT * FROM manipulate_object(
|
||||
'instance'
|
||||
, 1::SMALLINT
|
||||
, 'i1'
|
||||
, ARRAY[
|
||||
ROW(NULL, 'create', '{"name": "i1"}'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['domain'], 'set', '{"name": "example.com", "isVerified": false}'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['domain', 'isVerified'], 'set', 'true'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['domain', 'name'], 'delete', NULL)::object_manipulation
|
||||
, ROW(ARRAY['domain', 'name'], 'add', '"i1.com"')::object_manipulation
|
||||
, ROW(ARRAY['managers'], 'set', '[]'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['managers', 'objects'], 'add', '[{"a": "asdf"}, {"a": "qewr"}]'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['managers', 'objects'], 'delete', '{"a": "asdf"}'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['some', 'objects'], 'set', '{"a": "asdf"}'::JSONB)::object_manipulation
|
||||
-- , ROW(NULL, 'delete', NULL)::object_manipulation
|
||||
]
|
||||
);
|
||||
select * from objects;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
SELECT * FROM manipulate_object(
|
||||
'instance'
|
||||
, 1::SMALLINT
|
||||
, 'i1'
|
||||
, ARRAY[
|
||||
ROW(NULL, 'create', '{"name": "i1"}'::JSONB)::object_manipulation
|
||||
, ROW(ARRAY['domain', 'name'], 'set', '"example.com"'::JSONB)::object_manipulation
|
||||
]
|
||||
);
|
||||
select * from objects;
|
||||
ROLLBACK;
|
||||
|
||||
select jsonb_path_query_array('{"a": [12, 13, 14, 15]}'::JSONB, ('$.a ? (@ != $val)')::jsonpath, jsonb_build_object('val', '12'));
|
62
backend/storage/05_event_from_manipulation.sql
Normal file
62
backend/storage/05_event_from_manipulation.sql
Normal file
@@ -0,0 +1,62 @@
|
||||
CREATE TABLE IF NOT EXISTS aggregates(
|
||||
id TEXT NOT NULL
|
||||
, type TEXT NOT NULL
|
||||
, instance_id TEXT NOT NULL
|
||||
|
||||
, current_sequence INT NOT NULL DEFAULT 0
|
||||
|
||||
, PRIMARY KEY (instance_id, type, id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid()
|
||||
|
||||
-- object type that the event is related to
|
||||
, aggregate TEXT NOT NULL
|
||||
-- id of the object that the event is related to
|
||||
, aggregate_id TEXT NOT NULL
|
||||
, instance_id TEXT NOT NULL
|
||||
|
||||
-- time the event was created
|
||||
, created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
-- user that created the event
|
||||
, creator TEXT
|
||||
-- type of the event
|
||||
, type TEXT NOT NULL
|
||||
-- version of the event
|
||||
, revision SMALLINT NOT NULL
|
||||
-- changed fields or NULL
|
||||
, payload JSONB
|
||||
|
||||
, position NUMERIC NOT NULL DEFAULT pg_current_xact_id()::TEXT::NUMERIC
|
||||
, in_position_order INT2 NOT NULL
|
||||
|
||||
, FOREIGN KEY (instance_id, aggregate, aggregate_id) REFERENCES aggregates(instance_id, type, id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS instances(
|
||||
id TEXT
|
||||
, name TEXT NOT NULL
|
||||
, created_at TIMESTAMPTZ NOT NULL
|
||||
, updated_at TIMESTAMPTZ NOT NULL
|
||||
|
||||
, default_org_id TEXT
|
||||
, iam_project_id TEXT
|
||||
, console_client_id TEXT
|
||||
, console_app_id TEXT
|
||||
, default_language VARCHAR(10)
|
||||
|
||||
, PRIMARY KEY (id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS instance_domains(
|
||||
instance_id TEXT NOT NULL
|
||||
, domain TEXT NOT NULL
|
||||
, is_primary BOOLEAN NOT NULL DEFAULT FALSE
|
||||
, is_verified BOOLEAN NOT NULL DEFAULT FALSE
|
||||
|
||||
, PRIMARY KEY (instance_id, domain)
|
||||
, FOREIGN KEY (instance_id) REFERENCES instances(id) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS instance_domain_search_idx ON instance_domains (domain);
|
8
backend/storage/cache/cache.go
vendored
Normal file
8
backend/storage/cache/cache.go
vendored
Normal file
@@ -0,0 +1,8 @@
|
||||
package cache
|
||||
|
||||
type Cache[K comparable, V any] interface {
|
||||
Get(key K) (V, bool)
|
||||
Set(key K, value V)
|
||||
Delete(key K)
|
||||
Clear()
|
||||
}
|
47
backend/storage/cache/gomap/map.go
vendored
Normal file
47
backend/storage/cache/gomap/map.go
vendored
Normal file
@@ -0,0 +1,47 @@
|
||||
package gomap
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/cache"
|
||||
)
|
||||
|
||||
type Map[K comparable, V any] struct {
|
||||
mu sync.RWMutex
|
||||
items map[K]V
|
||||
}
|
||||
|
||||
// Clear implements cache.Cache.
|
||||
func (m *Map[K, V]) Clear() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.items = make(map[K]V, len(m.items))
|
||||
}
|
||||
|
||||
// Delete implements cache.Cache.
|
||||
func (m *Map[K, V]) Delete(key K) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
delete(m.items, key)
|
||||
}
|
||||
|
||||
// Get implements cache.Cache.
|
||||
func (m *Map[K, V]) Get(key K) (V, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
value, exists := m.items[key]
|
||||
return value, exists
|
||||
}
|
||||
|
||||
// Set implements cache.Cache.
|
||||
func (m *Map[K, V]) Set(key K, value V) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.items[key] = value
|
||||
}
|
||||
|
||||
var _ cache.Cache[string, string] = &Map[string, string]{}
|
7
backend/storage/database/config.go
Normal file
7
backend/storage/database/config.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package database
|
||||
|
||||
var Config = make(map[string]any)
|
||||
|
||||
func AddDatabaseConfig(name string, configure func(map[string]any) error) {
|
||||
Config[name] = configure
|
||||
}
|
83
backend/storage/database/database.go
Normal file
83
backend/storage/database/database.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/fs"
|
||||
)
|
||||
|
||||
type Row interface {
|
||||
Scan(dest ...any) error
|
||||
}
|
||||
|
||||
type Rows interface {
|
||||
Row
|
||||
Next() bool
|
||||
Close()
|
||||
Err() error
|
||||
}
|
||||
|
||||
type Transaction interface {
|
||||
Commit(ctx context.Context) error
|
||||
Rollback(ctx context.Context) error
|
||||
End(ctx context.Context, err error) error
|
||||
|
||||
QueryExecutor
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
Beginner
|
||||
QueryExecutor
|
||||
|
||||
Release(ctx context.Context) error
|
||||
}
|
||||
|
||||
type Pool interface {
|
||||
Beginner
|
||||
QueryExecutor
|
||||
|
||||
Acquire(ctx context.Context) (Client, error)
|
||||
Close(ctx context.Context) error
|
||||
}
|
||||
|
||||
type TransactionOptions struct {
|
||||
IsolationLevel IsolationLevel
|
||||
AccessMode AccessMode
|
||||
}
|
||||
|
||||
type IsolationLevel uint8
|
||||
|
||||
const (
|
||||
IsolationLevelSerializable IsolationLevel = iota
|
||||
IsolationLevelReadCommitted
|
||||
)
|
||||
|
||||
type AccessMode uint8
|
||||
|
||||
const (
|
||||
AccessModeReadWrite AccessMode = iota
|
||||
AccessModeReadOnly
|
||||
)
|
||||
|
||||
type Beginner interface {
|
||||
Begin(ctx context.Context, opts *TransactionOptions) (Transaction, error)
|
||||
}
|
||||
|
||||
type QueryExecutor interface {
|
||||
Querier
|
||||
Executor
|
||||
}
|
||||
|
||||
type Querier interface {
|
||||
Query(ctx context.Context, sql string, args ...any) (Rows, error)
|
||||
QueryRow(ctx context.Context, sql string, args ...any) Row
|
||||
}
|
||||
|
||||
type Executor interface {
|
||||
Exec(ctx context.Context, sql string, args ...any) error
|
||||
}
|
||||
|
||||
// LoadStatements sets the sql statements strings
|
||||
// TODO: implement
|
||||
func LoadStatements(fs.FS) error {
|
||||
return nil
|
||||
}
|
4
backend/storage/database/postgres/config.go
Normal file
4
backend/storage/database/postgres/config.go
Normal file
@@ -0,0 +1,4 @@
|
||||
package postgres
|
||||
|
||||
type Config struct {
|
||||
}
|
46
backend/storage/database/postgres/conn.go
Normal file
46
backend/storage/database/postgres/conn.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
)
|
||||
|
||||
type pgxConn struct{ *pgxpool.Conn }
|
||||
|
||||
var _ database.Client = (*pgxConn)(nil)
|
||||
|
||||
// Release implements [database.Client].
|
||||
func (c *pgxConn) Release(_ context.Context) error {
|
||||
c.Conn.Release()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Begin implements [database.Client].
|
||||
func (c *pgxConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
|
||||
tx, err := c.Conn.BeginTx(ctx, transactionOptionsToPgx(opts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxTx{tx}, nil
|
||||
}
|
||||
|
||||
// Query implements sql.Client.
|
||||
// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn.
|
||||
func (c *pgxConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||
return c.Conn.Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// QueryRow implements sql.Client.
|
||||
// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn.
|
||||
func (c *pgxConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||
return c.Conn.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// Exec implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||
func (c *pgxConn) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := c.Conn.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
55
backend/storage/database/postgres/pool.go
Normal file
55
backend/storage/database/postgres/pool.go
Normal file
@@ -0,0 +1,55 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
)
|
||||
|
||||
type pgxPool struct{ pgxpool.Pool }
|
||||
|
||||
var _ database.Pool = (*pgxPool)(nil)
|
||||
|
||||
// Acquire implements [database.Pool].
|
||||
func (c *pgxPool) Acquire(ctx context.Context) (database.Client, error) {
|
||||
conn, err := c.Pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxConn{conn}, nil
|
||||
}
|
||||
|
||||
// Query implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool.
|
||||
func (c *pgxPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||
return c.Pool.Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// QueryRow implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool.
|
||||
func (c *pgxPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||
return c.Pool.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// Exec implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||
func (c *pgxPool) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := c.Pool.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
// Begin implements [database.Pool].
|
||||
func (c *pgxPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) {
|
||||
tx, err := c.Pool.BeginTx(ctx, transactionOptionsToPgx(opts))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pgxTx{tx}, nil
|
||||
}
|
||||
|
||||
// Close implements [database.Pool].
|
||||
func (c *pgxPool) Close(_ context.Context) error {
|
||||
c.Pool.Close()
|
||||
return nil
|
||||
}
|
83
backend/storage/database/postgres/tx.go
Normal file
83
backend/storage/database/postgres/tx.go
Normal file
@@ -0,0 +1,83 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
)
|
||||
|
||||
type pgxTx struct{ pgx.Tx }
|
||||
|
||||
var _ database.Transaction = (*pgxTx)(nil)
|
||||
|
||||
// Commit implements [database.Transaction].
|
||||
func (tx *pgxTx) Commit(ctx context.Context) error {
|
||||
return tx.Tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// Rollback implements [database.Transaction].
|
||||
func (tx *pgxTx) Rollback(ctx context.Context) error {
|
||||
return tx.Tx.Rollback(ctx)
|
||||
}
|
||||
|
||||
// End implements [database.Transaction].
|
||||
func (tx *pgxTx) End(ctx context.Context, err error) error {
|
||||
if err != nil {
|
||||
tx.Rollback(ctx)
|
||||
return err
|
||||
}
|
||||
return tx.Commit(ctx)
|
||||
}
|
||||
|
||||
// Query implements [database.Transaction].
|
||||
// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx.
|
||||
func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) {
|
||||
return tx.Tx.Query(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// QueryRow implements [database.Transaction].
|
||||
// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx.
|
||||
func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row {
|
||||
return tx.Tx.QueryRow(ctx, sql, args...)
|
||||
}
|
||||
|
||||
// Exec implements [database.Pool].
|
||||
// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool.
|
||||
func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) error {
|
||||
_, err := tx.Tx.Exec(ctx, sql, args...)
|
||||
return err
|
||||
}
|
||||
|
||||
func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions {
|
||||
if opts == nil {
|
||||
return pgx.TxOptions{}
|
||||
}
|
||||
|
||||
return pgx.TxOptions{
|
||||
IsoLevel: isolationToPgx(opts.IsolationLevel),
|
||||
AccessMode: accessModeToPgx(opts.AccessMode),
|
||||
}
|
||||
}
|
||||
|
||||
func isolationToPgx(isolation database.IsolationLevel) pgx.TxIsoLevel {
|
||||
switch isolation {
|
||||
case database.IsolationLevelSerializable:
|
||||
return pgx.Serializable
|
||||
case database.IsolationLevelReadCommitted:
|
||||
return pgx.ReadCommitted
|
||||
default:
|
||||
return pgx.Serializable
|
||||
}
|
||||
}
|
||||
|
||||
func accessModeToPgx(accessMode database.AccessMode) pgx.TxAccessMode {
|
||||
switch accessMode {
|
||||
case database.AccessModeReadWrite:
|
||||
return pgx.ReadWrite
|
||||
case database.AccessModeReadOnly:
|
||||
return pgx.ReadOnly
|
||||
default:
|
||||
return pgx.ReadWrite
|
||||
}
|
||||
}
|
155
backend/storage/event_store.go
Normal file
155
backend/storage/event_store.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package storage
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
// "time"
|
||||
|
||||
// "github.com/jackc/pgx/v5"
|
||||
// "github.com/jackc/pgx/v5/pgconn"
|
||||
// "github.com/jackc/pgx/v5/pgxpool"
|
||||
// )
|
||||
|
||||
// type EventStore interface {
|
||||
// Write(ctx context.Context, commands ...Command) error
|
||||
// }
|
||||
|
||||
// type Command interface {
|
||||
// Aggregate() Aggregate
|
||||
|
||||
// Type() string
|
||||
// Payload() any
|
||||
// Revision() uint8
|
||||
// Creator() string
|
||||
|
||||
// Operations() []Operation
|
||||
// }
|
||||
|
||||
// type Event interface {
|
||||
// ID() string
|
||||
|
||||
// Aggregate() Aggregate
|
||||
|
||||
// CreatedAt() time.Time
|
||||
// Type() string
|
||||
// UnmarshalPayload(ptr any) error
|
||||
|
||||
// Revision() uint8
|
||||
// Creator() string
|
||||
|
||||
// Position() float64
|
||||
// InPositionOrder() uint16
|
||||
// }
|
||||
|
||||
// type Aggregate struct {
|
||||
// Type string
|
||||
// ID string
|
||||
// Instance string
|
||||
// }
|
||||
|
||||
// type Operation interface {
|
||||
// Exec(ctx context.Context, tx TX) error
|
||||
// }
|
||||
|
||||
// type TX interface {
|
||||
// Query() // Placeholder for future query methods
|
||||
// }
|
||||
|
||||
// type createInstance struct {
|
||||
// id string
|
||||
// creator string
|
||||
// Name string
|
||||
// }
|
||||
|
||||
// var (
|
||||
// _ Command = (*createInstance)(nil)
|
||||
// )
|
||||
|
||||
// func (c *createInstance) Aggregate() Aggregate {
|
||||
// return Aggregate{
|
||||
// Type: "instance",
|
||||
// ID: c.id,
|
||||
// }
|
||||
// }
|
||||
|
||||
// func (c *createInstance) Type() string {
|
||||
// return "instance.created"
|
||||
// }
|
||||
|
||||
// func (c *createInstance) Payload() any {
|
||||
// return c
|
||||
// }
|
||||
|
||||
// func (c *createInstance) Revision() uint8 {
|
||||
// return 1
|
||||
// }
|
||||
|
||||
// func (c *createInstance) Creator() string {
|
||||
// return c.creator
|
||||
// }
|
||||
|
||||
// func (c *createInstance) Operations() []Operation {
|
||||
// return []Operation{}
|
||||
// }
|
||||
|
||||
// type executor[R any] interface {
|
||||
// Exec(ctx context.Context, query string, args ...any) (R, error)
|
||||
// }
|
||||
|
||||
// type querier[R any, Q Query[R]] interface {
|
||||
// Query(ctx context.Context, query Q) (R, error)
|
||||
// // Query(ctx context.Context, query string, args ...any) (R, error)
|
||||
// }
|
||||
|
||||
// type Query[R any] interface {
|
||||
// Query(ctx context.Context) (R, error)
|
||||
// }
|
||||
|
||||
// var _ executor[pgconn.CommandTag] = (*pgxExecutor)(nil)
|
||||
|
||||
// type pgxExecutor struct {
|
||||
// conn *pgx.Conn
|
||||
// }
|
||||
|
||||
// func (e *pgxExecutor) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error) {
|
||||
// return e.conn.Exec(ctx, query, args...)
|
||||
// }
|
||||
|
||||
// type pgxQuerier struct {
|
||||
// conn *pgx.Conn
|
||||
// }
|
||||
|
||||
// type pgxCreateInstanceQuery struct {
|
||||
// Name string
|
||||
// }
|
||||
|
||||
// type instance struct{
|
||||
// id string
|
||||
// name string
|
||||
// }
|
||||
|
||||
// type instanceByIDQuery[R Rows] struct{
|
||||
// id string
|
||||
// }
|
||||
|
||||
// type Rows interface {
|
||||
// Next() bool
|
||||
// Scan(dest ...any) error
|
||||
// }
|
||||
|
||||
// func (q *instanceByIDQuery[R]) Query(ctx context.Context) (*instance, error) {
|
||||
// return nil, nil
|
||||
// }
|
||||
|
||||
// type pgxInstanceRepository struct {
|
||||
// pool pgxpool.Pool
|
||||
// }
|
||||
|
||||
// func (r *pgxInstanceRepository) InstanceByID(ctx context.Context, id string) (*instance, error) {
|
||||
// query := &instanceByIDQuery[pgx.Rows]{}
|
||||
// query.
|
||||
// return nil, nil
|
||||
// }
|
||||
|
||||
// func (q *pgxQuerier) Query(ctx context.Context, query string, args ...any) (pgx.Rows, error) {
|
||||
// return q.conn.Query(ctx, query, args...)
|
||||
// }
|
21
backend/storage/eventstore/event_store.go
Normal file
21
backend/storage/eventstore/event_store.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package eventstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
)
|
||||
|
||||
type Eventstore struct {
|
||||
executor database.Executor
|
||||
}
|
||||
|
||||
func New(executor database.Executor) *Eventstore {
|
||||
return &Eventstore{executor: executor}
|
||||
}
|
||||
|
||||
type Event interface{}
|
||||
|
||||
func (e *Eventstore) Push(ctx context.Context, events ...Event) error {
|
||||
return nil
|
||||
}
|
111
backend/storage/instance_repository.go
Normal file
111
backend/storage/instance_repository.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package storage
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
|
||||
// "github.com/jackc/pgx/v5"
|
||||
// "github.com/jackc/pgx/v5/pgxpool"
|
||||
// )
|
||||
|
||||
// type Instance struct {
|
||||
// ID string
|
||||
// Name string
|
||||
// }
|
||||
|
||||
// // type InstanceRepository interface {
|
||||
// // InstanceByID(id string) (*Instance, error)
|
||||
// // }
|
||||
|
||||
// type Row interface {
|
||||
// Scan(dest ...interface{}) error
|
||||
// }
|
||||
|
||||
// type RowQuerier[R Row] interface {
|
||||
// QueryRow(ctx context.Context, query string, args ...any) R
|
||||
// }
|
||||
|
||||
// type Rows interface {
|
||||
// Row
|
||||
// Next() bool
|
||||
// Close()
|
||||
// Err() error
|
||||
// }
|
||||
|
||||
// type RowsQuerier[R Rows] interface {
|
||||
// Query(ctx context.Context, query string, args ...any) (R, error)
|
||||
// }
|
||||
|
||||
// type Executor interface {
|
||||
// Exec(ctx context.Context, query string, args ...any) error
|
||||
// }
|
||||
|
||||
// type instanceByIDQuery[R Row, Q RowQuerier[R]] struct {
|
||||
// id string
|
||||
// querier Q
|
||||
// }
|
||||
|
||||
// func (q instanceByIDQuery[R, Q]) Query(ctx context.Context) (*Instance, error) {
|
||||
// row := q.querier.QueryRow(ctx, "SELECT * FROM instances WHERE id = $1", q.id)
|
||||
// var instance Instance
|
||||
// if err := row.Scan(&instance.ID, &instance.Name); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return &instance, nil
|
||||
// }
|
||||
|
||||
// type InstanceRepository interface {
|
||||
// ByIDQuery(ctx context.Context, id string) (*Instance, error)
|
||||
// ByDomainQuery(ctx context.Context, domain string) (*Instance, error)
|
||||
// }
|
||||
|
||||
// type InstanceRepositorySQL struct {
|
||||
// pool *pgxpool.Pool
|
||||
// }
|
||||
|
||||
// type InstanceRepositoryMap struct {
|
||||
// instances map[string]*Instance
|
||||
// domains *InstanceDomainRepositoryMao
|
||||
// }
|
||||
|
||||
// type InstanceDomainRepositoryMao struct {
|
||||
// domains map[string]string
|
||||
// }
|
||||
|
||||
// func GetInstanceByID[R Row, Q RowQuerier[R]](ctx context.Context, querier Q, id string) (*Instance, error) {
|
||||
// row := querier.QueryRow(ctx, "SELECT * FROM instances WHERE id = $1", id)
|
||||
// var instance Instance
|
||||
// if err := row.Scan(&instance.ID, &instance.Name); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return &instance, nil
|
||||
// }
|
||||
|
||||
// const instanceByDomainQuery = `SELECT
|
||||
// i.*
|
||||
// FROM
|
||||
// instances i
|
||||
// JOIN
|
||||
// instance_domains id
|
||||
// ON
|
||||
// id.instance_id = i.id
|
||||
// WHERE
|
||||
// id.domain = $1`
|
||||
|
||||
// func GetInstanceByDomain[R Row, Q RowQuerier[R]](ctx context.Context, querier Q, domain string) (*Instance, error) {
|
||||
// row := querier.QueryRow(ctx, instanceByDomainQuery, domain)
|
||||
// var instance Instance
|
||||
// if err := row.Scan(); err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return &instance, nil
|
||||
// }
|
||||
|
||||
// func CreateInstance[E Executor](ctx context.Context, executor E, instance *Instance) error {
|
||||
// return executor.Exec(ctx, "INSERT INTO instances (id, name) VALUES ($1, $2)", instance.ID, instance.Name)
|
||||
// }
|
||||
|
||||
// func bla(ctx context.Context) {
|
||||
// var c *pgxpool.Tx
|
||||
// instance, err := instanceByIDQuery[pgx.Row, *pgxpool.Tx]{querier: c}.Query(ctx)
|
||||
// _, _ = instance, err
|
||||
// }
|
75
backend/storage/repo.go
Normal file
75
backend/storage/repo.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package storage
|
||||
|
||||
// import (
|
||||
// "context"
|
||||
|
||||
// "github.com/jackc/pgx/v5"
|
||||
// "github.com/jackc/pgx/v5/pgxpool"
|
||||
// )
|
||||
|
||||
// type row interface {
|
||||
// Scan(dest ...any) error
|
||||
// }
|
||||
|
||||
// type rows interface {
|
||||
// row
|
||||
// Next() bool
|
||||
// Close()
|
||||
// Err() error
|
||||
// }
|
||||
|
||||
// type querier interface {
|
||||
// NewRowQuery()
|
||||
// NewRowsQuery()
|
||||
// }
|
||||
|
||||
// type Client interface {
|
||||
// // querier
|
||||
// InstanceRepository() instanceRepository
|
||||
// }
|
||||
|
||||
// type instanceRepository interface {
|
||||
// ByIDQuery(ctx context.Context, id string) (*Instance, error)
|
||||
// ByDomainQuery(ctx context.Context, domain string) (*Instance, error)
|
||||
// }
|
||||
|
||||
// type pgxClient pgxpool.Pool
|
||||
|
||||
// func (c *pgxClient) Begin(ctx context.Context) (*pgxTx, error) {
|
||||
// tx, err := (*pgxpool.Pool)(c).Begin(ctx)
|
||||
// if err != nil {
|
||||
// return nil, err
|
||||
// }
|
||||
// return (*pgxTx)(tx.(*pgxpool.Tx)), nil
|
||||
// }
|
||||
|
||||
// func (c *pgxClient) InstanceRepository() instanceRepository {
|
||||
// return &pgxInstanceRepository[pgxpool.Pool]{client: (*pgxpool.Pool)(c)}
|
||||
// }
|
||||
|
||||
// type pgxTx pgxpool.Tx
|
||||
|
||||
// func (c *pgxTx) InstanceRepository() instanceRepository {
|
||||
// return &pgxInstanceRepository[pgxpool.Tx]{client: (*pgxpool.Tx)(c)}
|
||||
// }
|
||||
|
||||
// type pgxInstanceRepository[C pgxpool.Pool | pgxpool.Tx] struct {
|
||||
// client *C
|
||||
// }
|
||||
|
||||
// func (r *pgxInstanceRepository[C]) ByIDQuery(ctx context.Context, id string) (*Instance, error) {
|
||||
// // return r.client
|
||||
// pgx.Tx
|
||||
// return nil, nil
|
||||
// }
|
||||
|
||||
// func (r *pgxInstanceRepository[C]) ByDomainQuery(ctx context.Context, domain string) (*Instance, error) {
|
||||
|
||||
// return nil, nil
|
||||
// }
|
||||
|
||||
// func blabla() {
|
||||
// var client Client = &pgxClient{&pgxpool.Pool{}}
|
||||
|
||||
// client.be
|
||||
// }
|
86
backend/storage/repository/cache/instance.go
vendored
Normal file
86
backend/storage/repository/cache/instance.go
vendored
Normal file
@@ -0,0 +1,86 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/cache"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
type Instance struct {
|
||||
mu *sync.RWMutex
|
||||
byID cache.Cache[string, *repository.Instance]
|
||||
byDomain cache.Cache[string, *repository.Instance]
|
||||
|
||||
next repository.InstanceRepository
|
||||
}
|
||||
|
||||
func (i *Instance) SetNext(next repository.InstanceRepository) *Instance {
|
||||
return &Instance{
|
||||
mu: i.mu,
|
||||
byID: i.byID,
|
||||
byDomain: i.byDomain,
|
||||
next: next,
|
||||
}
|
||||
}
|
||||
|
||||
// ByDomain implements repository.InstanceRepository.
|
||||
func (i *Instance) ByDomain(ctx context.Context, domain string) (instance *repository.Instance, err error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
if instance, ok := i.byDomain.Get(domain); ok {
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
instance, err = i.next.ByDomain(ctx, domain)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
i.set(instance, domain)
|
||||
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
// ByID implements repository.InstanceRepository.
|
||||
func (i *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) {
|
||||
i.mu.RLock()
|
||||
defer i.mu.RUnlock()
|
||||
|
||||
if instance, ok := i.byID.Get(id); ok {
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
instance, err := i.next.ByID(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
i.set(instance, "")
|
||||
return instance, nil
|
||||
}
|
||||
|
||||
// SetUp implements repository.InstanceRepository.
|
||||
func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) error {
|
||||
err := i.next.SetUp(ctx, instance)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i.set(instance, "")
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ repository.InstanceRepository = (*Instance)(nil)
|
||||
|
||||
func (i *Instance) set(instance *repository.Instance, domain string) {
|
||||
i.mu.Lock()
|
||||
defer i.mu.Unlock()
|
||||
if domain != "" {
|
||||
i.byDomain.Set(domain, instance)
|
||||
}
|
||||
i.byID.Set(instance.ID, instance)
|
||||
}
|
45
backend/storage/repository/cache/user.go
vendored
Normal file
45
backend/storage/repository/cache/user.go
vendored
Normal file
@@ -0,0 +1,45 @@
|
||||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/cache"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
type User struct {
|
||||
cache.Cache[string, *repository.User]
|
||||
|
||||
next repository.UserRepository
|
||||
}
|
||||
|
||||
// ByID implements repository.UserRepository.
|
||||
func (u *User) ByID(ctx context.Context, id string) (*repository.User, error) {
|
||||
if user, ok := u.Get(id); ok {
|
||||
return user, nil
|
||||
}
|
||||
|
||||
user, err := u.next.ByID(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u.set(user)
|
||||
return user, nil
|
||||
}
|
||||
|
||||
// Create implements repository.UserRepository.
|
||||
func (u *User) Create(ctx context.Context, user *repository.User) error {
|
||||
err := u.next.Create(ctx, user)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
u.set(user)
|
||||
return nil
|
||||
}
|
||||
|
||||
var _ repository.UserRepository = (*User)(nil)
|
||||
|
||||
func (u *User) set(user *repository.User) {
|
||||
u.Cache.Set(user.ID, user)
|
||||
}
|
37
backend/storage/repository/event/instance.go
Normal file
37
backend/storage/repository/event/instance.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/eventstore"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
var _ repository.InstanceRepository = (*Instance)(nil)
|
||||
|
||||
type Instance struct {
|
||||
*eventstore.Eventstore
|
||||
|
||||
next repository.InstanceRepository
|
||||
}
|
||||
|
||||
func NewInstance(eventstore *eventstore.Eventstore, next repository.InstanceRepository) *Instance {
|
||||
return &Instance{next: next, Eventstore: eventstore}
|
||||
}
|
||||
|
||||
func (i *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) {
|
||||
return i.next.ByID(ctx, id)
|
||||
}
|
||||
|
||||
func (i *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) {
|
||||
return i.next.ByDomain(ctx, domain)
|
||||
}
|
||||
|
||||
func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) error {
|
||||
err := i.next.SetUp(ctx, instance)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return i.Push(ctx, instance)
|
||||
}
|
33
backend/storage/repository/event/user.go
Normal file
33
backend/storage/repository/event/user.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/eventstore"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
var _ repository.UserRepository = (*User)(nil)
|
||||
|
||||
type User struct {
|
||||
*eventstore.Eventstore
|
||||
|
||||
next repository.UserRepository
|
||||
}
|
||||
|
||||
func NewUser(eventstore *eventstore.Eventstore, next repository.UserRepository) *User {
|
||||
return &User{next: next, Eventstore: eventstore}
|
||||
}
|
||||
|
||||
func (i *User) ByID(ctx context.Context, id string) (*repository.User, error) {
|
||||
return i.next.ByID(ctx, id)
|
||||
}
|
||||
|
||||
func (i *User) Create(ctx context.Context, user *repository.User) error {
|
||||
err := i.next.Create(ctx, user)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return i.Push(ctx, user)
|
||||
}
|
14
backend/storage/repository/instance.go
Normal file
14
backend/storage/repository/instance.go
Normal file
@@ -0,0 +1,14 @@
|
||||
package repository
|
||||
|
||||
import "context"
|
||||
|
||||
type InstanceRepository interface {
|
||||
SetUp(ctx context.Context, instance *Instance) error
|
||||
ByID(ctx context.Context, id string) (*Instance, error)
|
||||
ByDomain(ctx context.Context, domain string) (*Instance, error)
|
||||
}
|
||||
|
||||
type Instance struct {
|
||||
ID string
|
||||
Name string
|
||||
}
|
45
backend/storage/repository/sql/instance.go
Normal file
45
backend/storage/repository/sql/instance.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
func NewInstance(client database.QueryExecutor) repository.InstanceRepository {
|
||||
return &Instance{client: client}
|
||||
}
|
||||
|
||||
type Instance struct {
|
||||
client database.QueryExecutor
|
||||
}
|
||||
|
||||
const instanceByDomainQuery = `SELECT i.id, i.name FROM instances i JOIN instance_domains id ON i.id = id.instance_id WHERE id.domain = $1`
|
||||
|
||||
// ByDomain implements [InstanceRepository].
|
||||
func (r *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) {
|
||||
row := r.client.QueryRow(ctx, instanceByDomainQuery, domain)
|
||||
var instance repository.Instance
|
||||
if err := row.Scan(&instance.ID, &instance.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &instance, nil
|
||||
}
|
||||
|
||||
const instanceByIDQuery = `SELECT id, name FROM instances WHERE id = $1`
|
||||
|
||||
// ByID implements [InstanceRepository].
|
||||
func (r *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) {
|
||||
row := r.client.QueryRow(ctx, instanceByIDQuery, id)
|
||||
var instance repository.Instance
|
||||
if err := row.Scan(&instance.ID, &instance.Name); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &instance, nil
|
||||
}
|
||||
|
||||
// SetUp implements [InstanceRepository].
|
||||
func (r *Instance) SetUp(ctx context.Context, instance *repository.Instance) error {
|
||||
return r.client.Exec(ctx, "INSERT INTO instances (id, name) VALUES ($1, $2)", instance.ID, instance.Name)
|
||||
}
|
33
backend/storage/repository/sql/user.go
Normal file
33
backend/storage/repository/sql/user.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/database"
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
func NewUser(client database.QueryExecutor) repository.UserRepository {
|
||||
return &User{client: client}
|
||||
}
|
||||
|
||||
type User struct {
|
||||
client database.QueryExecutor
|
||||
}
|
||||
|
||||
const userByIDQuery = `SELECT id, username FROM users WHERE id = $1`
|
||||
|
||||
// ByID implements [UserRepository].
|
||||
func (r *User) ByID(ctx context.Context, id string) (*repository.User, error) {
|
||||
row := r.client.QueryRow(ctx, userByIDQuery, id)
|
||||
var user repository.User
|
||||
if err := row.Scan(&user.ID, &user.Username); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &user, nil
|
||||
}
|
||||
|
||||
// Create implements [UserRepository].
|
||||
func (r *User) Create(ctx context.Context, user *repository.User) error {
|
||||
return r.client.Exec(ctx, "INSERT INTO users (id, username) VALUES ($1, $2)", user.ID, user.Username)
|
||||
}
|
40
backend/storage/repository/telemetry/logged/instance.go
Normal file
40
backend/storage/repository/telemetry/logged/instance.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package logged
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
type Instance struct {
|
||||
*slog.Logger
|
||||
|
||||
next repository.InstanceRepository
|
||||
}
|
||||
|
||||
func NewInstance(logger *slog.Logger, next repository.InstanceRepository) *Instance {
|
||||
return &Instance{Logger: logger, next: next}
|
||||
}
|
||||
|
||||
var _ repository.InstanceRepository = (*Instance)(nil)
|
||||
|
||||
func (i *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) {
|
||||
i.Logger.InfoContext(ctx, "By ID Query", slog.String("id", id))
|
||||
return i.next.ByID(ctx, id)
|
||||
}
|
||||
|
||||
func (i *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) {
|
||||
i.Logger.InfoContext(ctx, "By Domain Query", slog.String("domain", domain))
|
||||
return i.next.ByDomain(ctx, domain)
|
||||
}
|
||||
|
||||
func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) error {
|
||||
err := i.next.SetUp(ctx, instance)
|
||||
if err != nil {
|
||||
i.Logger.ErrorContext(ctx, "Failed to set up instance", slog.Any("instance", instance), slog.Any("cause", err))
|
||||
return err
|
||||
}
|
||||
i.Logger.InfoContext(ctx, "Instance set up", slog.Any("instance", instance))
|
||||
return nil
|
||||
}
|
35
backend/storage/repository/telemetry/logged/user.go
Normal file
35
backend/storage/repository/telemetry/logged/user.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package logged
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
)
|
||||
|
||||
type User struct {
|
||||
*slog.Logger
|
||||
|
||||
next repository.UserRepository
|
||||
}
|
||||
|
||||
func NewUser(logger *slog.Logger, next repository.UserRepository) *User {
|
||||
return &User{Logger: logger, next: next}
|
||||
}
|
||||
|
||||
var _ repository.UserRepository = (*User)(nil)
|
||||
|
||||
func (i *User) ByID(ctx context.Context, id string) (*repository.User, error) {
|
||||
i.Logger.InfoContext(ctx, "By ID Query", slog.String("id", id))
|
||||
return i.next.ByID(ctx, id)
|
||||
}
|
||||
|
||||
func (i *User) Create(ctx context.Context, user *repository.User) error {
|
||||
err := i.next.Create(ctx, user)
|
||||
if err != nil {
|
||||
i.Logger.ErrorContext(ctx, "Failed to create user", slog.Any("user", user), slog.Any("cause", err))
|
||||
return err
|
||||
}
|
||||
i.Logger.InfoContext(ctx, "User created successfully", slog.Any("user", user))
|
||||
return nil
|
||||
}
|
54
backend/storage/repository/telemetry/traced/instance.go
Normal file
54
backend/storage/repository/telemetry/traced/instance.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package traced
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
"github.com/zitadel/zitadel/backend/telemetry/tracing"
|
||||
)
|
||||
|
||||
var _ repository.InstanceRepository = (*Instance)(nil)
|
||||
|
||||
type Instance struct {
|
||||
*tracing.Tracer
|
||||
|
||||
next repository.InstanceRepository
|
||||
}
|
||||
|
||||
func NewInstance(tracer *tracing.Tracer, next repository.InstanceRepository) *Instance {
|
||||
return &Instance{Tracer: tracer, next: next}
|
||||
}
|
||||
|
||||
func (i *Instance) SetNext(next repository.InstanceRepository) *Instance {
|
||||
return &Instance{Tracer: i.Tracer, next: next}
|
||||
}
|
||||
|
||||
// ByDomain implements [repository.InstanceRepository].
|
||||
func (i *Instance) ByDomain(ctx context.Context, domain string) (instance *repository.Instance, err error) {
|
||||
i.Tracer.Decorate(ctx, func(ctx context.Context) error {
|
||||
instance, err = i.next.ByDomain(ctx, domain)
|
||||
return err
|
||||
})
|
||||
|
||||
return instance, err
|
||||
}
|
||||
|
||||
// ByID implements [repository.InstanceRepository].
|
||||
func (i *Instance) ByID(ctx context.Context, id string) (instance *repository.Instance, err error) {
|
||||
i.Tracer.Decorate(ctx, func(ctx context.Context) error {
|
||||
instance, err = i.next.ByID(ctx, id)
|
||||
return err
|
||||
})
|
||||
|
||||
return instance, err
|
||||
}
|
||||
|
||||
// SetUp implements [repository.InstanceRepository].
|
||||
func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) (err error) {
|
||||
i.Tracer.Decorate(ctx, func(ctx context.Context) error {
|
||||
err = i.next.SetUp(ctx, instance)
|
||||
return err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
44
backend/storage/repository/telemetry/traced/user.go
Normal file
44
backend/storage/repository/telemetry/traced/user.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package traced
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zitadel/zitadel/backend/storage/repository"
|
||||
"github.com/zitadel/zitadel/backend/telemetry/tracing"
|
||||
)
|
||||
|
||||
var _ repository.UserRepository = (*User)(nil)
|
||||
|
||||
type User struct {
|
||||
*tracing.Tracer
|
||||
|
||||
next repository.UserRepository
|
||||
}
|
||||
|
||||
func NewUser(tracer *tracing.Tracer, next repository.UserRepository) *User {
|
||||
return &User{Tracer: tracer, next: next}
|
||||
}
|
||||
|
||||
func (i *User) SetNext(next repository.UserRepository) *User {
|
||||
return &User{Tracer: i.Tracer, next: next}
|
||||
}
|
||||
|
||||
// ByID implements [repository.UserRepository].
|
||||
func (i *User) ByID(ctx context.Context, id string) (user *repository.User, err error) {
|
||||
i.Tracer.Decorate(ctx, func(ctx context.Context) error {
|
||||
user, err = i.next.ByID(ctx, id)
|
||||
return err
|
||||
})
|
||||
|
||||
return user, err
|
||||
}
|
||||
|
||||
// Create implements [repository.UserRepository].
|
||||
func (i *User) Create(ctx context.Context, user *repository.User) (err error) {
|
||||
i.Tracer.Decorate(ctx, func(ctx context.Context) error {
|
||||
err = i.next.Create(ctx, user)
|
||||
return err
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
13
backend/storage/repository/user.go
Normal file
13
backend/storage/repository/user.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package repository
|
||||
|
||||
import "context"
|
||||
|
||||
type UserRepository interface {
|
||||
Create(ctx context.Context, user *User) error
|
||||
ByID(ctx context.Context, id string) (*User, error)
|
||||
}
|
||||
|
||||
type User struct {
|
||||
ID string
|
||||
Username string
|
||||
}
|
43
backend/storage/storage.go
Normal file
43
backend/storage/storage.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package storage
|
||||
|
||||
// import "context"
|
||||
|
||||
// type Storage interface {
|
||||
// Write(ctx context.Context, commands ...Command) error
|
||||
// }
|
||||
|
||||
// type Command interface {
|
||||
// // Subjects returns a list of subjects which describe the command
|
||||
// // the type should always be in past tense
|
||||
// // "." is used as separator
|
||||
// // e.g. "user.created"
|
||||
// Type() string
|
||||
// // Payload returns the payload of the command
|
||||
// // The payload can either be
|
||||
// // - a struct
|
||||
// // - a map
|
||||
// // - nil
|
||||
// Payload() any
|
||||
// // Revision returns the revision of the command
|
||||
// Revision() uint8
|
||||
// // Creator returns the user id who created the command
|
||||
// Creator() string
|
||||
|
||||
// // Object returns the object the command belongs to
|
||||
// Object() Model
|
||||
// // Parents returns the parents of the object
|
||||
// // If the list is empty there are no parents
|
||||
// Parents() []Model
|
||||
|
||||
// // Objects returns the models to update during inserting the commands
|
||||
// Objects() []Object
|
||||
// }
|
||||
|
||||
// type Model struct {
|
||||
// Name string
|
||||
// ID string
|
||||
// }
|
||||
|
||||
// type Object struct {
|
||||
// Model Model
|
||||
// }
|
70
backend/telemetry/tracing/tracer.go
Normal file
70
backend/telemetry/tracing/tracer.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type Tracer struct{ trace.Tracer }
|
||||
|
||||
func NewTracer(name string) Tracer {
|
||||
return Tracer{otel.Tracer(name)}
|
||||
}
|
||||
|
||||
type DecorateOption func(*decorateOptions)
|
||||
|
||||
type decorateOptions struct {
|
||||
startOpts []trace.SpanStartOption
|
||||
endOpts []trace.SpanEndOption
|
||||
|
||||
spanName string
|
||||
}
|
||||
|
||||
func WithSpanName(name string) DecorateOption {
|
||||
return func(o *decorateOptions) {
|
||||
o.spanName = name
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpanStartOptions(opts ...trace.SpanStartOption) DecorateOption {
|
||||
return func(o *decorateOptions) {
|
||||
o.startOpts = append(o.startOpts, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func WithSpanEndOptions(opts ...trace.SpanEndOption) DecorateOption {
|
||||
return func(o *decorateOptions) {
|
||||
o.endOpts = append(o.endOpts, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (t Tracer) Decorate(ctx context.Context, fn func(ctx context.Context) error, opts ...DecorateOption) {
|
||||
o := new(decorateOptions)
|
||||
for _, opt := range opts {
|
||||
opt(o)
|
||||
}
|
||||
|
||||
if o.spanName == "" {
|
||||
o.spanName = functionName()
|
||||
}
|
||||
|
||||
_, span := t.Tracer.Start(ctx, o.spanName, o.startOpts...)
|
||||
defer span.End(o.endOpts...)
|
||||
|
||||
if err := fn(ctx); err != nil {
|
||||
span.RecordError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func functionName() string {
|
||||
counter, _, _, success := runtime.Caller(2)
|
||||
|
||||
if !success {
|
||||
return "zitadel"
|
||||
}
|
||||
|
||||
return runtime.FuncForPC(counter).Name()
|
||||
}
|
Reference in New Issue
Block a user