diff --git a/backend/LICENSE b/backend/LICENSE new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/cmd/config/config.go b/backend/cmd/config/config.go new file mode 100644 index 0000000000..cbd0478cbb --- /dev/null +++ b/backend/cmd/config/config.go @@ -0,0 +1,45 @@ +/* +Copyright © 2025 NAME HERE +*/ +package config + +import ( + "fmt" + + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var ( + // ConfigureCmd represents the config command + ConfigureCmd = &cobra.Command{ + Use: "configure", + Short: "Guides you through configuring Zitadel", + // Long: `A longer description that spans multiple lines and likely contains examples + // and usage of using your command. For example: + + // Cobra is a CLI library for Go that empowers applications. + // This application is a tool to generate the needed files + // to quickly create a Cobra application.`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("config called") + fmt.Println(viper.AllSettings()) + fmt.Println(viper.Sub("database").AllSettings()) + viper.en + }, + } + + upgrade bool +) + +func init() { + // Here you will define your flags and configuration settings. + ConfigureCmd.Flags().BoolVarP(&upgrade, "upgrade", "u", false, "Only changed configuration values since the previously used version will be asked for") + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // configureCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: +} diff --git a/backend/cmd/prepare/prepare.go b/backend/cmd/prepare/prepare.go new file mode 100644 index 0000000000..b4f8ba4bfa --- /dev/null +++ b/backend/cmd/prepare/prepare.go @@ -0,0 +1,37 @@ +/* +Copyright © 2025 NAME HERE +*/ +package prepare + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +// PrepareCmd represents the prepare command +var PrepareCmd = &cobra.Command{ + Use: "prepare", + Short: "Prepares the environment before starting Zitadel", + // Long: `A longer description that spans multiple lines and likely contains examples + // and usage of using your command. For example: + + // Cobra is a CLI library for Go that empowers applications. + // This application is a tool to generate the needed files + // to quickly create a Cobra application.`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("prepare called") + }, +} + +func init() { + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // prepareCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // prepareCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/backend/cmd/root.go b/backend/cmd/root.go new file mode 100644 index 0000000000..2de31735d8 --- /dev/null +++ b/backend/cmd/root.go @@ -0,0 +1,84 @@ +package cmd + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/zitadel/zitadel/backend/cmd/config" + "github.com/zitadel/zitadel/backend/cmd/prepare" + "github.com/zitadel/zitadel/backend/cmd/start" + "github.com/zitadel/zitadel/backend/cmd/upgrade" +) + +var cfgFile string + +// RootCmd represents the base command when called without any subcommands +var RootCmd = &cobra.Command{ + Use: "zitadel [subcommand]", + Short: "A brief description of your application", + Long: `A longer description that spans multiple lines and likely contains +examples and usage of using your application. For example: + +Cobra is a CLI library for Go that empowers applications. +This application is a tool to generate the needed files +to quickly create a Cobra application.`, + // Uncomment the following line if your bare application + // has an action associated with it: + // Run: func(cmd *cobra.Command, args []string) { }, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + err := RootCmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func init() { + RootCmd.AddCommand(config.ConfigureCmd) + RootCmd.AddCommand(prepare.PrepareCmd) + RootCmd.AddCommand(start.StartCmd) + RootCmd.AddCommand(upgrade.UpgradeCmd) + + cobra.OnInitialize(initConfig) + + // Here you will define your flags and configuration settings. + // Cobra supports persistent flags, which, if defined here, + // will be global for your application. + + RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is $HOME/.zitadel.yaml)") + + // Cobra also supports local flags, which will only run + // when this action is called directly. + RootCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} + +// initConfig reads in config file and ENV variables if set. +func initConfig() { + if cfgFile != "" { + // Use config file from the flag. + viper.SetConfigFile(cfgFile) + } else { + // Find home directory. + home, err := os.UserHomeDir() + cobra.CheckErr(err) + + // Search config in home directory with name ".zitadel" (without extension). + viper.AddConfigPath(home) + viper.SetConfigType("yaml") + viper.SetConfigName(".zitadel") + } + + viper.AutomaticEnv() // read in environment variables that match + viper.AllowEmptyEnv(true) + viper.SetEnvPrefix("ZITADEL") + + // If a config file is found, read it in. + if err := viper.ReadInConfig(); err == nil { + fmt.Fprintln(os.Stderr, "Using config file:", viper.ConfigFileUsed()) + } +} diff --git a/backend/cmd/start/start.go b/backend/cmd/start/start.go new file mode 100644 index 0000000000..40f5301a84 --- /dev/null +++ b/backend/cmd/start/start.go @@ -0,0 +1,37 @@ +/* +Copyright © 2025 NAME HERE +*/ +package start + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +// StartCmd represents the start command +var StartCmd = &cobra.Command{ + Use: "start", + Short: "Starts the Zitadel server", + // Long: `A longer description that spans multiple lines and likely contains examples + // and usage of using your command. For example: + + // Cobra is a CLI library for Go that empowers applications. + // This application is a tool to generate the needed files + // to quickly create a Cobra application.`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("start called") + }, +} + +func init() { + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // startCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // startCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/backend/cmd/test.yaml b/backend/cmd/test.yaml new file mode 100644 index 0000000000..966a8f74d2 --- /dev/null +++ b/backend/cmd/test.yaml @@ -0,0 +1,5 @@ +database: + postgres: 'something' + cockroach: + host: localhost + port: 26257 \ No newline at end of file diff --git a/backend/cmd/upgrade/upgrade.go b/backend/cmd/upgrade/upgrade.go new file mode 100644 index 0000000000..b9d17b64a4 --- /dev/null +++ b/backend/cmd/upgrade/upgrade.go @@ -0,0 +1,38 @@ +/* +Copyright © 2025 NAME HERE +*/ +package upgrade + +import ( + "fmt" + + "github.com/spf13/cobra" +) + +// UpgradeCmd represents the upgrade command +var UpgradeCmd = &cobra.Command{ + Use: "upgrade", + Short: "Upgrades Zitadel from a previous version", + // Long: `A longer description that spans multiple lines and likely contains examples + // and usage of using your command. For example: + + // Cobra is a CLI library for Go that empowers applications. + // This application is a tool to generate the needed files + // to quickly create a Cobra application.`, + Run: func(cmd *cobra.Command, args []string) { + fmt.Println("upgrade called") + }, +} + +func init() { + + // Here you will define your flags and configuration settings. + + // Cobra supports Persistent Flags which will work for this command + // and all subcommands, e.g.: + // upgradeCmd.PersistentFlags().String("foo", "", "A help for foo") + + // Cobra supports local flags which will only run when this command + // is called directly, e.g.: + // upgradeCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle") +} diff --git a/backend/domain/instance.go b/backend/domain/instance.go new file mode 100644 index 0000000000..cc0edf6433 --- /dev/null +++ b/backend/domain/instance.go @@ -0,0 +1,90 @@ +package domain + +import ( + "context" + "log/slog" + + "github.com/zitadel/zitadel/backend/storage/database" + "github.com/zitadel/zitadel/backend/storage/eventstore" + "github.com/zitadel/zitadel/backend/storage/repository" + "github.com/zitadel/zitadel/backend/storage/repository/cache" + "github.com/zitadel/zitadel/backend/storage/repository/event" + "github.com/zitadel/zitadel/backend/storage/repository/sql" + "github.com/zitadel/zitadel/backend/storage/repository/telemetry/logged" + "github.com/zitadel/zitadel/backend/storage/repository/telemetry/traced" + "github.com/zitadel/zitadel/backend/telemetry/tracing" +) + +type Instance struct { + db database.Pool + tracer *tracing.Tracer + logger *slog.Logger + cache *cache.Instance +} + +func NewInstance(db database.Pool, tracer *tracing.Tracer, logger *slog.Logger) *Instance { + b := &Instance{ + db: db, + tracer: tracer, + logger: logger, + + cache: &cache.Instance{}, + } + + return b +} + +func (b *Instance) instanceCommandRepo(tx database.Transaction) repository.InstanceRepository { + return logged.NewInstance( + b.logger, + traced.NewInstance( + b.tracer, + event.NewInstance( + eventstore.New(tx), + b.cache.SetNext( + sql.NewInstance(tx), + ), + ), + ), + ) +} + +func (b *Instance) instanceQueryRepo(tx database.QueryExecutor) repository.InstanceRepository { + return logged.NewInstance( + b.logger, + traced.NewInstance( + b.tracer, + b.cache.SetNext( + sql.NewInstance(tx), + ), + ), + ) +} + +func (b *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) { + return b.instanceQueryRepo(b.db).ByID(ctx, id) +} + +func (b *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) { + return b.instanceQueryRepo(b.db).ByDomain(ctx, domain) +} + +type SetUpInstance struct { + Instance *repository.Instance + User *repository.User +} + +func (b *Instance) SetUp(ctx context.Context, request *SetUpInstance) (err error) { + tx, err := b.db.Begin(ctx, nil) + if err != nil { + return err + } + defer func() { + err = tx.End(ctx, err) + }() + err = b.instanceCommandRepo(tx).SetUp(ctx, request.Instance) + if err != nil { + return err + } + return b.userCommandRepo(tx).Create(ctx, request.User) +} diff --git a/backend/domain/user.go b/backend/domain/user.go new file mode 100644 index 0000000000..a032bcd6bd --- /dev/null +++ b/backend/domain/user.go @@ -0,0 +1,39 @@ +package domain + +import ( + "github.com/zitadel/zitadel/backend/storage/database" + "github.com/zitadel/zitadel/backend/storage/eventstore" + "github.com/zitadel/zitadel/backend/storage/repository" + "github.com/zitadel/zitadel/backend/storage/repository/event" + "github.com/zitadel/zitadel/backend/storage/repository/sql" + "github.com/zitadel/zitadel/backend/storage/repository/telemetry/logged" + "github.com/zitadel/zitadel/backend/storage/repository/telemetry/traced" +) + +func (b *Instance) userCommandRepo(tx database.Transaction) repository.UserRepository { + return logged.NewUser( + b.logger, + traced.NewUser( + b.tracer, + event.NewUser( + eventstore.New(tx), + sql.NewUser(tx), + ), + ), + ) +} + +func (b *Instance) userQueryRepo(tx database.QueryExecutor) repository.UserRepository { + return logged.NewUser( + b.logger, + traced.NewUser( + b.tracer, + sql.NewUser(tx), + ), + ) +} + +type User struct { + ID string + Username string +} diff --git a/backend/main.go b/backend/main.go new file mode 100644 index 0000000000..3f1b3f1747 --- /dev/null +++ b/backend/main.go @@ -0,0 +1,11 @@ +/* +Copyright © 2025 NAME HERE + +*/ +package main + +import "github.com/zitadel/zitadel/backend/cmd" + +func main() { + cmd.Execute() +} diff --git a/backend/storage/01_events.sql b/backend/storage/01_events.sql new file mode 100644 index 0000000000..a9c4d106ab --- /dev/null +++ b/backend/storage/01_events.sql @@ -0,0 +1,225 @@ +DROP TABLE IF EXISTS properties; +DROP TABLE IF EXISTS parents; +DROP TABLE IF EXISTS objects; + +CREATE TABLE IF NOT EXISTS objects ( + type TEXT NOT NULL + , id TEXT NOT NULL + + , PRIMARY KEY (type, id) +); + +TRUNCATE objects CASCADE; +INSERT INTO objects VALUES + ('instance', 'i1') + , ('organization', 'o1') + , ('user', 'u1') + , ('user', 'u2') + , ('organization', 'o2') + , ('user', 'u3') + , ('project', 'p3') + + , ('instance', 'i2') + , ('organization', 'o3') + , ('user', 'u4') + , ('project', 'p1') + , ('project', 'p2') + , ('application', 'a1') + , ('application', 'a2') + , ('org_domain', 'od1') + , ('org_domain', 'od2') +; + +CREATE TABLE IF NOT EXISTS parents ( + parent_type TEXT NOT NULL + , parent_id TEXT NOT NULL + , child_type TEXT NOT NULL + , child_id TEXT NOT NULL + , PRIMARY KEY (parent_type, parent_id, child_type, child_id) + , FOREIGN KEY (parent_type, parent_id) REFERENCES objects(type, id) ON DELETE CASCADE + , FOREIGN KEY (child_type, child_id) REFERENCES objects(type, id) ON DELETE CASCADE +); + +INSERT INTO parents VALUES + ('instance', 'i1', 'organization', 'o1') + , ('organization', 'o1', 'user', 'u1') + , ('organization', 'o1', 'user', 'u2') + , ('instance', 'i1', 'organization', 'o2') + , ('organization', 'o2', 'user', 'u3') + , ('organization', 'o2', 'project', 'p3') + + , ('instance', 'i2', 'organization', 'o3') + , ('organization', 'o3', 'user', 'u4') + , ('organization', 'o3', 'project', 'p1') + , ('organization', 'o3', 'project', 'p2') + , ('project', 'p1', 'application', 'a1') + , ('project', 'p2', 'application', 'a2') + , ('organization', 'o3', 'org_domain', 'od1') + , ('organization', 'o3', 'org_domain', 'od2') +; + +CREATE TABLE properties ( + object_type TEXT NOT NULL + , object_id TEXT NOT NULL + , key TEXT NOT NULL + , value JSONB NOT NULL + , should_index BOOLEAN NOT NULL DEFAULT FALSE + + , PRIMARY KEY (object_type, object_id, key) + , FOREIGN KEY (object_type, object_id) REFERENCES objects(type, id) ON DELETE CASCADE +); + +CREATE INDEX properties_object_indexed ON properties (object_type, object_id) INCLUDE (value) WHERE should_index; +CREATE INDEX properties_value_indexed ON properties (object_type, key, value) WHERE should_index; + +TRUNCATE properties; +INSERT INTO properties VALUES + ('instance', 'i1', 'name', '"Instance 1"', TRUE) + , ('instance', 'i1', 'description', '"Instance 1 description"', FALSE) + , ('instance', 'i2', 'name', '"Instance 2"', TRUE) + , ('organization', 'o1', 'name', '"Organization 1"', TRUE) + , ('org_domain', 'od1', 'domain', '"example.com"', TRUE) + , ('org_domain', 'od1', 'is_primary', 'true', TRUE) + , ('org_domain', 'od1', 'is_verified', 'true', FALSE) + , ('org_domain', 'od2', 'domain', '"example.org"', TRUE) + , ('org_domain', 'od2', 'is_primary', 'false', TRUE) + , ('org_domain', 'od2', 'is_verified', 'false', FALSE) +; + +CREATE TABLE events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid() + , type TEXT NOT NULL + , created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + , revision SMALLINT NOT NULL + , creator TEXT NOT NULL + , payload JSONB + + , global_sequence NUMERIC NOT NULL DEFAULT pg_current_xact_id()::TEXT::NUMERIC + , sequence_order SMALLINT NOT NULL CHECK (sequence_order >= 0) + + -- , object_type TEXT NOT NULL + -- , object_id TEXT NOT NULL + + -- , FOREIGN KEY (object_type, object_id) REFERENCES objects(type, id) +); + +CREATE TYPE property ( + -- key must be a json path + key TEXT + -- value should be a primitive type + , value JSONB + -- indicates wheter the property should be indexed + , should_index BOOLEAN +); + +CREATE TYPE parent ( + parent_type TEXT + , parent_id TEXT +); + +CREATE TYPE object ( + type TEXT + , id TEXT + , properties property[] + -- an object automatically inherits the parents of its parent + , parents parent[] +); + +CREATE TYPE command ( + type TEXT + , revision SMALLINT + , creator TEXT + , payload JSONB + + -- if properties is null the objects and all its child objects get deleted + -- if the value of a property is null the property and all sub fields get deleted + -- for example if the key is 'a.b' and the value is null the property 'a.b.c' will be deleted as well + , objects object[] +); + +CREATE OR REPLACE PROCEDURE update_object(_object object) +AS $$ +DECLARE + _property property; +BEGIN + FOR _property IN ARRAY _object.properties LOOP + IF _property.value IS NULL THEN + DELETE FROM properties + WHERE object_type = _object.type + AND object_id = _object.id + AND key LIKE CONCAT(_property.key, '%'); + ELSE + INSERT INTO properties (object_type, object_id, key, value, should_index) + VALUES (_object.type, _object.id, _property.key, _property.value, _property.should_index) + ON CONFLICT (object_type, object_id, key) DO UPDATE SET (value, should_index) = (_property.value, _property.should_index); + END IF; + END LOOP; +END; + +CREATE OR REPLACE PROCEDURE delete_object(_type, _id) +AS $$ +BEGIN + WITH RECURSIVE objects_to_delete (_type, _id) AS ( + SELECT $1, $2 + + UNION + + SELECT p.child_type, p.child_id + FROM parents p + JOIN objects_to_delete o ON p.parent_type = o.type AND p.parent_id = o.id + ) + DELETE FROM objects + WHERE (type, id) IN (SELECT * FROM objects_to_delete) +END; + +CREATE OR REPLACE FUNCTION push(_commands command[]) +RETURNS NUMMERIC AS $$ +DECLARE + _command command; + _index INT; + + _object object; +BEGIN + FOR _index IN 1..array_length(_commands, 1) LOOP + _command := _commands[_index]; + INSERT INTO events (type, revision, creator, payload) + VALUES (_command.type, _command.revision, _command.creator, _command.payload); + + FOREACH _object IN ARRAY _command.objects LOOP + IF _object.properties IS NULL THEN + PERFORM delete_object(_object.type, _object.id); + ELSE + PERFORM update_object(_object); + END IF; + END LOOP; + RETURN pg_current_xact_id()::TEXT::NUMERIC; +END; +$$ LANGUAGE plpgsql; + + +BEGIN; + + +RETURNING * +; + +rollback; + +SELECT + * +FROM + properties +WHERE + (object_type, object_id) IN ( + SELECT + object_type + , object_id + FROM + properties + where + object_type = 'instance' + and key = 'name' + and value = '"Instance 1"' + and should_index + ) +; \ No newline at end of file diff --git a/backend/storage/02_next_try.sql b/backend/storage/02_next_try.sql new file mode 100644 index 0000000000..2c7243508c --- /dev/null +++ b/backend/storage/02_next_try.sql @@ -0,0 +1,310 @@ +-- postgres +DROP TABLE IF EXISTS properties; +DROP TABLE IF EXISTS parents CASCADE; +DROP TABLE IF EXISTS objects CASCADE; +DROP TABLE IF EXISTS indexed_properties; +DROP TABLE IF EXISTS events; +DROP TABLE IF EXISTS models; + +DROP TYPE IF EXISTS object CASCADe; +DROP TYPE IF EXISTS model CASCADE; + +CREATE TYPE model AS ( + name TEXT + , id TEXT +); + +CREATE TYPE object AS ( + model TEXT + , model_revision SMALLINT + , id TEXT + , payload JSONB + , parents model[] +); + +CREATE TABLE models ( + name TEXT + , revision SMALLINT NOT NULL CONSTRAINT positive_revision CHECK (revision > 0) + , indexed_paths TEXT[] + + , PRIMARY KEY (name, revision) +); + +CREATE TABLE objects ( + model TEXT NOT NULL + , model_revision SMALLINT NOT NULL + + , id TEXT NOT NULL + , payload JSONB + + , PRIMARY KEY (model, id) + , FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT +); + +CREATE TABLE indexed_properties ( + model TEXT NOT NULL + , model_revision SMALLINT NOT NULL + , object_id TEXT NOT NULL + + , path TEXT NOT NULL + + , value JSONB + , text_value TEXT + , number_value NUMERIC + , boolean_value BOOLEAN + + , PRIMARY KEY (model, object_id, path) + , FOREIGN KEY (model, object_id) REFERENCES objects(model, id) ON DELETE CASCADE + , FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT +); + +CREATE OR REPLACE FUNCTION ip_value_converter() +RETURNS TRIGGER AS $$ +BEGIN + CASE jsonb_typeof(NEW.value) + WHEN 'boolean' THEN + NEW.boolean_value := NEW.value::BOOLEAN; + NEW.value := NULL; + WHEN 'number' THEN + NEW.number_value := NEW.value::NUMERIC; + NEW.value := NULL; + WHEN 'string' THEN + NEW.text_value := (NEW.value#>>'{}')::TEXT; + NEW.value := NULL; + ELSE + -- do nothing + END CASE; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER ip_value_converter_before_insert +BEFORE INSERT +ON indexed_properties +FOR EACH ROW +EXECUTE FUNCTION ip_value_converter(); + +CREATE TRIGGER ip_value_converter_before_update +BEFORE UPDATE +ON indexed_properties +FOR EACH ROW +EXECUTE FUNCTION ip_value_converter(); + +CREATE INDEX ip_search_model_object ON indexed_properties (model, path, value) WHERE value IS NOT NULL; +CREATE INDEX ip_search_model_rev_object ON indexed_properties (model, model_revision, path, value) WHERE value IS NOT NULL; +CREATE INDEX ip_search_model_text ON indexed_properties (model, path, text_value) WHERE text_value IS NOT NULL; +CREATE INDEX ip_search_model_rev_text ON indexed_properties (model, model_revision, path, text_value) WHERE text_value IS NOT NULL; +CREATE INDEX ip_search_model_number ON indexed_properties (model, path, number_value) WHERE number_value IS NOT NULL; +CREATE INDEX ip_search_model_rev_number ON indexed_properties (model, model_revision, path, number_value) WHERE number_value IS NOT NULL; +CREATE INDEX ip_search_model_boolean ON indexed_properties (model, path, boolean_value) WHERE boolean_value IS NOT NULL; +CREATE INDEX ip_search_model_rev_boolean ON indexed_properties (model, model_revision, path, boolean_value) WHERE boolean_value IS NOT NULL; + +CREATE TABLE IF NOT EXISTS parents ( + parent_model TEXT NOT NULL + , parent_id TEXT NOT NULL + , child_model TEXT NOT NULL + , child_id TEXT NOT NULL + + , PRIMARY KEY (parent_model, parent_id, child_model, child_id) + , FOREIGN KEY (parent_model, parent_id) REFERENCES objects(model, id) ON DELETE CASCADE + , FOREIGN KEY (child_model, child_id) REFERENCES objects(model, id) ON DELETE CASCADE +); + +INSERT INTO models VALUES + ('instance', 1, ARRAY['name', 'domain.name']) + , ('organization', 1, ARRAY['name']) + , ('user', 1, ARRAY['username', 'email', 'firstname', 'lastname']) +; + +CREATE OR REPLACE FUNCTION jsonb_to_rows(j jsonb, _path text[] DEFAULT ARRAY[]::text[]) +RETURNS TABLE (path text[], value jsonb) +LANGUAGE plpgsql +AS $$ +DECLARE + k text; + v jsonb; +BEGIN + FOR k, v IN SELECT * FROM jsonb_each(j) LOOP + IF jsonb_typeof(v) = 'object' THEN + -- Recursive call for nested objects, appending the key to the path + RETURN QUERY SELECT * FROM jsonb_to_rows(v, _path || k) + UNION VALUES (_path, '{}'::JSONB); + ELSE + -- Base case: return the key path and value + CASE WHEN jsonb_typeof(v) = 'null' THEN + RETURN QUERY SELECT _path || k, NULL::jsonb; + ELSE + RETURN QUERY SELECT _path || k, v; + END CASE; + END IF; + END LOOP; +END; +$$; + +CREATE OR REPLACE FUNCTION merge_payload(_old JSONB, _new JSONB) +RETURNS JSONB +LANGUAGE plpgsql +AS $$ +DECLARE + _fields CURSOR FOR SELECT DISTINCT ON (path) + path + , last_value(value) over (partition by path) as value + FROM ( + SELECT path, value FROM jsonb_to_rows(_old) + UNION ALL + SELECT path, value FROM jsonb_to_rows(_new) + ); + _path text[]; + _value jsonb; +BEGIN + OPEN _fields; + LOOP + FETCH _fields INTO _path, _value; + EXIT WHEN NOT FOUND; + IF jsonb_typeof(_value) = 'object' THEN + IF _old #> _path IS NOT NULL THEN + CONTINUE; + END IF; + _old = jsonb_set_lax(_old, _path, '{}'::jsonb, TRUE); + CONTINUE; + END IF; + + _old = jsonb_set_lax(_old, _path, _value, TRUE, 'delete_key'); + END LOOP; + + RETURN _old; +END; +$$; + +CREATE OR REPLACE FUNCTION set_object(_object object) +RETURNS VOID AS $$ +DECLARE + _parent model; +BEGIN + INSERT INTO objects (model, model_revision, id, payload) + VALUES (_object.model, _object.model_revision, _object.id, _object.payload) + ON CONFLICT (model, id) DO UPDATE + SET + payload = merge_payload(objects.payload, EXCLUDED.payload) + , model_revision = EXCLUDED.model_revision; + + INSERT INTO indexed_properties (model, model_revision, object_id, path, value) + SELECT + * + FROM ( + SELECT + _object.model + , _object.model_revision + , _object.id + , UNNEST(m.indexed_paths) AS "path" + , _object.payload #> string_to_array(UNNEST(m.indexed_paths), '.') AS "value" + FROM + models m + WHERE + m.name = _object.model + AND m.revision = _object.model_revision + GROUP BY + m.name + , m.revision + ) + WHERE + "value" IS NOT NULL + ON CONFLICT (model, object_id, path) DO UPDATE + SET + value = EXCLUDED.value + , text_value = EXCLUDED.text_value + , number_value = EXCLUDED.number_value + , boolean_value = EXCLUDED.boolean_value + ; + + INSERT INTO parents (parent_model, parent_id, child_model, child_id) + VALUES + (_object.model, _object.id, _object.model, _object.id) + ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING; + + IF _object.parents IS NULL THEN + RETURN; + END IF; + + FOREACH _parent IN ARRAY _object.parents + LOOP + INSERT INTO parents (parent_model, parent_id, child_model, child_id) + SELECT + p.parent_model + , p.parent_id + , _object.model + , _object.id + FROM parents p + WHERE + p.child_model = _parent.name + AND p.child_id = _parent.id + ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING + ; + + INSERT INTO parents (parent_model, parent_id, child_model, child_id) + VALUES + (_parent.name, _parent.id, _object.model, _object.id) + ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING; + END LOOP; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION set_objects(_objects object[]) +RETURNS VOID AS $$ +DECLARE + _object object; +BEGIN + FOREACH _object IN ARRAY _objects + LOOP + PERFORM set_object(_object); + END LOOP; +END; +$$ LANGUAGE plpgsql; + +-- CREATE OR REPLACE FUNCTION set_objects(VARIADIC _objects object[]) +-- RETURNS VOID AS $$ +-- BEGIN +-- PERFORM set_objectS(_objects); +-- END; +-- $$ LANGUAGE plpgsql; + +SELECT set_objects( + ARRAY[ + ROW('instance', 1::smallint, 'i1', '{"name": "i1", "domain": {"name": "example2.com", "isVerified": false}}', NULL)::object + , ROW('organization', 1::smallint, 'o1', '{"name": "o1", "description": "something useful"}', ARRAY[ + ROW('instance', 'i1')::model + ])::object + , ROW('user', 1::smallint, 'u1', '{"username": "u1", "description": "something useful", "firstname": "Silvan"}', ARRAY[ + ROW('instance', 'i1')::model + , ROW('organization', 'o1')::model + ])::object + ] +); + +SELECT set_objects( + ARRAY[ + ROW('instance', 1::smallint, 'i1', '{"domain": {"isVerified": true}}', NULL)::object + ] +); + + +SELECT + o.* +FROM + indexed_properties ip +JOIN + objects o +ON + ip.model = o.model + AND ip.object_id = o.id +WHERE + ip.model = 'instance' + AND ip.path = 'name' + AND ip.text_value = 'i1'; +; + +select * from merge_payload( + '{"a": "asdf", "b": {"c":{"d": 1, "g": {"h": [4,5,6]}}}, "f": [1,2,3]}'::jsonb + , '{"b": {"c":{"d": 1, "g": {"i": [4,5,6]}}}, "a": null}'::jsonb +); \ No newline at end of file diff --git a/backend/storage/03_properties.sql b/backend/storage/03_properties.sql new file mode 100644 index 0000000000..edc181ada5 --- /dev/null +++ b/backend/storage/03_properties.sql @@ -0,0 +1,272 @@ +-- postgres +DROP TABLE IF EXISTS properties; +DROP TABLE IF EXISTS parents CASCADE; +DROP TABLE IF EXISTS objects CASCADE; +DROP TABLE IF EXISTS indexed_properties; +DROP TABLE IF EXISTS events; +DROP TABLE IF EXISTS models; + +DROP TYPE IF EXISTS object CASCADe; +DROP TYPE IF EXISTS model CASCADE; + +CREATE TYPE model AS ( + name TEXT + , id TEXT +); + +CREATE TYPE object AS ( + model TEXT + , model_revision SMALLINT + , id TEXT + , payload JSONB + , parents model[] +); + +CREATE TABLE models ( + name TEXT + , revision SMALLINT NOT NULL CONSTRAINT positive_revision CHECK (revision > 0) + , indexed_paths TEXT[] + + , PRIMARY KEY (name, revision) +); + +CREATE TABLE objects ( + model TEXT NOT NULL + , model_revision SMALLINT NOT NULL + + , id TEXT NOT NULL + , payload JSONB + + , PRIMARY KEY (model, id) + , FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT +); + +CREATE TABLE indexed_properties ( + model TEXT NOT NULL + , model_revision SMALLINT NOT NULL + , object_id TEXT NOT NULL + + , path TEXT[] NOT NULL + + , value JSONB + , text_value TEXT + , number_value NUMERIC + , boolean_value BOOLEAN + + , PRIMARY KEY (model, object_id, path) + , FOREIGN KEY (model, object_id) REFERENCES objects(model, id) ON DELETE CASCADE + , FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT +); + +CREATE TABLE IF NOT EXISTS parents ( + parent_model TEXT NOT NULL + , parent_id TEXT NOT NULL + , child_model TEXT NOT NULL + , child_id TEXT NOT NULL + , PRIMARY KEY (parent_model, parent_id, child_model, child_id) + , FOREIGN KEY (parent_model, parent_id) REFERENCES objects(model, id) ON DELETE CASCADE + , FOREIGN KEY (child_model, child_id) REFERENCES objects(model, id) ON DELETE CASCADE +); + +CREATE OR REPLACE FUNCTION jsonb_to_rows(j jsonb, _path text[] DEFAULT ARRAY[]::text[]) +RETURNS TABLE (path text[], value jsonb) +LANGUAGE plpgsql +AS $$ +DECLARE + k text; + v jsonb; +BEGIN + FOR k, v IN SELECT * FROM jsonb_each(j) LOOP + IF jsonb_typeof(v) = 'object' THEN + -- Recursive call for nested objects, appending the key to the path + RETURN QUERY SELECT * FROM jsonb_to_rows(v, _path || k); + ELSE + -- Base case: return the key path and value + CASE WHEN jsonb_typeof(v) = 'null' THEN + RETURN QUERY SELECT _path || k, NULL::jsonb; + ELSE + RETURN QUERY SELECT _path || k, v; + END CASE; + END IF; + END LOOP; +END; +$$; + +-- after insert trigger which is called after the object was inserted and then inserts the properties +CREATE OR REPLACE FUNCTION set_ip_from_object_insert() +RETURNS TRIGGER +LANGUAGE plpgsql +AS $$ +DECLARE + _property RECORD; + _model models; +BEGIN + SELECT * INTO _model FROM models WHERE name = NEW.model AND revision = NEW.model_revision; + + FOR _property IN SELECT * FROM jsonb_to_rows(NEW.payload) LOOP + IF ARRAY_TO_STRING(_property.path, '.') = ANY(_model.indexed_paths) THEN + INSERT INTO indexed_properties (model, model_revision, object_id, path, value) + VALUES (NEW.model, NEW.model_revision, NEW.id, _property.path, _property.value); + END IF; + END LOOP; + RETURN NULL; +END; +$$; + +CREATE TRIGGER set_ip_from_object_insert +AFTER INSERT ON objects +FOR EACH ROW +EXECUTE FUNCTION set_ip_from_object_insert(); + +-- before update trigger with is called before an object is updated +-- it updates the properties table first +-- and computes the correct payload for the object +-- partial update of the object is allowed +-- if the value of a property is set to null the properties and all its children are deleted +CREATE OR REPLACE FUNCTION set_ip_from_object_update() +RETURNS TRIGGER +LANGUAGE plpgsql +AS $$ +DECLARE + _property RECORD; + _payload JSONB; + _model models; + _path_index INT; +BEGIN + _payload := OLD.payload; + SELECT * INTO _model FROM models WHERE name = NEW.model AND revision = NEW.model_revision; + + FOR _property IN SELECT * FROM jsonb_to_rows(NEW.payload) ORDER BY array_length(path, 1) LOOP + -- set the properties + CASE WHEN _property.value IS NULL THEN + RAISE NOTICE 'DELETE PROPERTY: %', _property; + DELETE FROM indexed_properties + WHERE model = NEW.model + AND model_revision = NEW.model_revision + AND object_id = NEW.id + AND path[:ARRAY_LENGTH(_property.path, 1)] = _property.path; + ELSE + RAISE NOTICE 'UPSERT PROPERTY: %', _property; + DELETE FROM indexed_properties + WHERE + model = NEW.model + AND model_revision = NEW.model_revision + AND object_id = NEW.id + AND ( + _property.path[:array_length(path, 1)] = path + OR path[:array_length(_property.path, 1)] = _property.path + ) + AND array_length(path, 1) <> array_length(_property.path, 1); + + -- insert property if should be indexed + IF ARRAY_TO_STRING(_property.path, '.') = ANY(_model.indexed_paths) THEN + RAISE NOTICE 'path should be indexed: %', _property.path; + INSERT INTO indexed_properties (model, model_revision, object_id, path, value) + VALUES (NEW.model, NEW.model_revision, NEW.id, _property.path, _property.value) + ON CONFLICT (model, object_id, path) DO UPDATE + SET value = EXCLUDED.value; + END IF; + END CASE; + + -- if property is updated we can set it directly + IF _payload #> _property.path IS NOT NULL THEN + _payload = jsonb_set_lax(COALESCE(_payload, '{}'::JSONB), _property.path, _property.value, TRUE); + EXIT; + END IF; + -- ensure parent object exists exists + FOR _path_index IN 1..(array_length(_property.path, 1)-1) LOOP + IF _payload #> _property.path[:_path_index] IS NOT NULL AND jsonb_typeof(_payload #> _property.path[:_path_index]) = 'object' THEN + CONTINUE; + END IF; + + _payload = jsonb_set(_payload, _property.path[:_path_index], '{}'::JSONB, TRUE); + EXIT; + END LOOP; + _payload = jsonb_set_lax(_payload, _property.path, _property.value, TRUE, 'delete_key'); + + END LOOP; + + -- update the payload + NEW.payload = _payload; + + RETURN NEW; +END; +$$; + +CREATE OR REPLACE TRIGGER set_ip_from_object_update +BEFORE UPDATE ON objects +FOR EACH ROW +EXECUTE FUNCTION set_ip_from_object_update(); + + +CREATE OR REPLACE FUNCTION set_object(_object object) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +BEGIN + INSERT INTO objects (model, model_revision, id, payload) + VALUES (_object.model, _object.model_revision, _object.id, _object.payload) + ON CONFLICT (model, id) DO UPDATE + SET + payload = EXCLUDED.payload + , model_revision = EXCLUDED.model_revision + ; + + INSERT INTO parents (parent_model, parent_id, child_model, child_id) + SELECT + p.name + , p.id + , _object.model + , _object.id + FROM UNNEST(_object.parents) AS p + ON CONFLICT DO NOTHING; +END; +$$; + +CREATE OR REPLACE FUNCTION set_objects(_objects object[]) +RETURNS VOID +LANGUAGE plpgsql +AS $$ +DECLARE + _object object; +BEGIN + FOREACH _object IN ARRAY _objects LOOP + PERFORM set_object(_object); + END LOOP; +END; +$$; + + + + + +INSERT INTO models VALUES + ('instance', 1, ARRAY['name', 'domain.name']) + , ('organization', 1, ARRAY['name']) + , ('user', 1, ARRAY['username', 'email', 'firstname', 'lastname']) +; + +INSERT INTO objects VALUES + ('instance', 1, 'i2', '{"name": "i2", "domain": {"name": "example2.com", "isVerified": false}}') + , ('instance', 1, 'i3', '{"name": "i3", "domain": {"name": "example3.com", "isVerified": false}}') + , ('instance', 1, 'i4', '{"name": "i4", "domain": {"name": "example4.com", "isVerified": false}}') +; + + +begin; +UPDATE objects SET payload = '{"domain": {"isVerified": true}}' WHERE model = 'instance'; +rollback; + + +SELECT set_objects( + ARRAY[ + ROW('instance', 1::smallint, 'i1', '{"name": "i1", "domain": {"name": "example2.com", "isVerified": false}}', NULL)::object + , ROW('organization', 1::smallint, 'o1', '{"name": "o1", "description": "something useful"}', ARRAY[ + ROW('instance', 'i1')::model + ])::object + , ROW('user', 1::smallint, 'u1', '{"username": "u1", "description": "something useful", "firstname": "Silvan"}', ARRAY[ + ROW('instance', 'i1')::model + , ROW('organization', 'o1')::model + ])::object + ] +); \ No newline at end of file diff --git a/backend/storage/04_operations.sql b/backend/storage/04_operations.sql new file mode 100644 index 0000000000..1fcd725499 --- /dev/null +++ b/backend/storage/04_operations.sql @@ -0,0 +1,280 @@ +-- postgres +DROP TABLE IF EXISTS properties; +DROP TABLE IF EXISTS parents CASCADE; +DROP TABLE IF EXISTS objects CASCADE; +DROP TABLE IF EXISTS indexed_properties; +DROP TABLE IF EXISTS events; +DROP TABLE IF EXISTS models; + +DROP TYPE IF EXISTS object CASCADe; +DROP TYPE IF EXISTS model CASCADE; + +CREATE TABLE models ( + name TEXT + , revision SMALLINT NOT NULL CONSTRAINT positive_revision CHECK (revision > 0) + , indexed_paths TEXT[] + + , PRIMARY KEY (name, revision) +); + +CREATE TABLE objects ( + model TEXT NOT NULL + , model_revision SMALLINT NOT NULL + + , id TEXT NOT NULL + , payload JSONB + + , PRIMARY KEY (model, id) + , FOREIGN KEY (model, model_revision) REFERENCES models(name, revision) ON DELETE RESTRICT +); + +CREATE TYPE operation_type AS ENUM ( + -- inserts a new object, if the object already exists the operation will fail + -- path is ignored + 'create' + -- if path is null an upsert is performed and the payload is overwritten + -- if path is not null the value is set at the given path + , 'set' + -- drops an object if path is null + -- if path is set but no value, the field at the given path is dropped + -- if path and value are set and the field is an array the value is removed from the array + , 'delete' + -- adds a value to an array + -- or a field if it does not exist, if the field exists the operation will fail + , 'add' +); + +CREATE TYPE object_manipulation AS ( + path TEXT[] + , operation operation_type + , value JSONB +); + +CREATE TABLE IF NOT EXISTS parents ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid() + , parent_model TEXT NOT NULL + , parent_id TEXT NOT NULL + , child_model TEXT NOT NULL + , child_id TEXT NOT NULL + + , FOREIGN KEY (parent_model, parent_id) REFERENCES objects(model, id) ON DELETE CASCADE + , FOREIGN KEY (child_model, child_id) REFERENCES objects(model, id) ON DELETE CASCADE +); + +CREATE TYPE parent_operation AS ENUM ( + 'add' + , 'remove' +); + +CREATE TYPE parent_manipulation AS ( + model TEXT + , id TEXT + , operation parent_operation +); + +CREATE OR REPLACE FUNCTION jsonb_set_ensure_path(_jsonb JSONB, _path TEXT[], _value JSONB) +RETURNS JSONB +LANGUAGE plpgsql +AS $$ +DECLARE + i INT; +BEGIN + IF _jsonb #> _path IS NOT NULL THEN + RETURN JSONB_SET(_jsonb, _path, _value); + END IF; + + FOR i IN REVERSE ARRAY_LENGTH(_path, 1)..1 LOOP + _value := JSONB_BUILD_OBJECT(_path[i], _value); + IF _jsonb #> _path[:i] IS NOT NULL THEN + EXIT; + END IF; + END LOOP; + + RETURN _jsonb || _value; +END; +$$; + +-- current: {} +-- '{"a": {"b": {"c": {"d": {"e": 1}}}}}'::JSONB #> '{a,b,c}' = 1 + +drop function manipulate_object; +drop function object_set; +CREATE OR REPLACE FUNCTION object_set( + _model TEXT + , _model_revision SMALLINT + , _id TEXT + + , _manipulations object_manipulation[] + , _parents parent_manipulation[] +) +RETURNS objects +LANGUAGE plpgsql +AS $$ +DECLARE + _manipulation object_manipulation; +BEGIN + FOREACH _manipulation IN ARRAY _manipulations LOOP + CASE _manipulation.operation + WHEN 'create' THEN + INSERT INTO objects (model, model_revision, id, payload) + VALUES (_model, _model_revision, _id, _manipulation.value); + WHEN 'delete' THEN + CASE + WHEN _manipulation.path IS NULL THEN + DELETE FROM objects + WHERE + model = _model + AND model_revision = _model_revision + AND id = _id; + WHEN _manipulation.value IS NULL THEN + UPDATE + objects + SET + payload = payload #- _manipulation.path + WHERE + model = _model + AND model_revision = _model_revision + AND id = _id; + ELSE + UPDATE + objects + SET + payload = jsonb_set(payload, _manipulation.path, (SELECT JSONB_AGG(v) FROM JSONB_PATH_QUERY(payload, ('$.' || ARRAY_TO_STRING(_manipulation.path, '.') || '[*]')::jsonpath) AS v WHERE v <> _manipulation.value)) + WHERE + model = _model + AND model_revision = _model_revision + AND id = _id; + END CASE; + WHEN 'set' THEN + IF _manipulation.path IS NULL THEN + INSERT INTO objects (model, model_revision, id, payload) + VALUES (_model, _model_revision, _id, _manipulation.value) + ON CONFLICT (model, model_revision, id) + DO UPDATE SET payload = _manipulation.value; + ELSE + UPDATE + objects + SET + payload = jsonb_set_ensure_path(payload, _manipulation.path, _manipulation.value) + WHERE + model = _model + AND model_revision = _model_revision + AND id = _id; + END IF; + WHEN 'add' THEN + UPDATE + objects + SET + -- TODO: parent field must exist + payload = CASE + WHEN jsonb_typeof(payload #> _manipulation.path) IS NULL THEN + jsonb_set_ensure_path(payload, _manipulation.path, _manipulation.value) + WHEN jsonb_typeof(payload #> _manipulation.path) = 'array' THEN + jsonb_set(payload, _manipulation.path, COALESCE(payload #> _manipulation.path, '[]'::JSONB) || _manipulation.value) + -- ELSE + -- RAISE EXCEPTION 'Field at path % is not an array', _manipulation.path; + END + WHERE + model = _model + AND model_revision = _model_revision + AND id = _id; + -- TODO: RAISE EXCEPTION 'Field at path % is not an array', _manipulation.path; + END CASE; + END LOOP; + + FOREACH _parent IN ARRAY _parents LOOP + CASE _parent.operation + WHEN 'add' THEN + -- insert the new parent and all its parents + INSERT INTO parents (parent_model, parent_id, child_model, child_id) + ( + SELECT + id + FROM parents p + WHERE + p.child_model = _parent.model + AND p.child_id = _parent.id + UNION + SELECT + _parent.model + , _parent.id + , _model + , _id + ) + ON CONFLICT (parent_model, parent_id, child_model, child_id) DO NOTHING; + WHEN 'remove' THEN + -- remove the parent including the objects childs parent + DELETE FROM parents + WHERE id IN ( + SELECT + id + FROM + parents p + WHERE + p.child_model = _model + AND p.child_id = _id + AND p.parent_model = _parent.model + AND p.parent_id = _parent.id + UNION + SELECT + id + FROM ( + SELECT + id + FROM + parents p + WHERE + p.parent_model = _model + AND p.parent_id = _id + ) + WHERE + + ); + END CASE; + END LOOP; + RETURN NULL; +END; +$$; + +INSERT INTO models VALUES + ('instance', 1, ARRAY['name', 'domain.name']) + , ('organization', 1, ARRAY['name']) + , ('user', 1, ARRAY['username', 'email', 'firstname', 'lastname']) +; + +rollback; +BEGIN; +SELECT * FROM manipulate_object( + 'instance' + , 1::SMALLINT + , 'i1' + , ARRAY[ + ROW(NULL, 'create', '{"name": "i1"}'::JSONB)::object_manipulation + , ROW(ARRAY['domain'], 'set', '{"name": "example.com", "isVerified": false}'::JSONB)::object_manipulation + , ROW(ARRAY['domain', 'isVerified'], 'set', 'true'::JSONB)::object_manipulation + , ROW(ARRAY['domain', 'name'], 'delete', NULL)::object_manipulation + , ROW(ARRAY['domain', 'name'], 'add', '"i1.com"')::object_manipulation + , ROW(ARRAY['managers'], 'set', '[]'::JSONB)::object_manipulation + , ROW(ARRAY['managers', 'objects'], 'add', '[{"a": "asdf"}, {"a": "qewr"}]'::JSONB)::object_manipulation + , ROW(ARRAY['managers', 'objects'], 'delete', '{"a": "asdf"}'::JSONB)::object_manipulation + , ROW(ARRAY['some', 'objects'], 'set', '{"a": "asdf"}'::JSONB)::object_manipulation + -- , ROW(NULL, 'delete', NULL)::object_manipulation + ] +); +select * from objects; +ROLLBACK; + +BEGIN; +SELECT * FROM manipulate_object( + 'instance' + , 1::SMALLINT + , 'i1' + , ARRAY[ + ROW(NULL, 'create', '{"name": "i1"}'::JSONB)::object_manipulation + , ROW(ARRAY['domain', 'name'], 'set', '"example.com"'::JSONB)::object_manipulation + ] +); +select * from objects; +ROLLBACK; + +select jsonb_path_query_array('{"a": [12, 13, 14, 15]}'::JSONB, ('$.a ? (@ != $val)')::jsonpath, jsonb_build_object('val', '12')); \ No newline at end of file diff --git a/backend/storage/05_event_from_manipulation.sql b/backend/storage/05_event_from_manipulation.sql new file mode 100644 index 0000000000..371aa1bb7a --- /dev/null +++ b/backend/storage/05_event_from_manipulation.sql @@ -0,0 +1,62 @@ +CREATE TABLE IF NOT EXISTS aggregates( + id TEXT NOT NULL + , type TEXT NOT NULL + , instance_id TEXT NOT NULL + + , current_sequence INT NOT NULL DEFAULT 0 + + , PRIMARY KEY (instance_id, type, id) +); + +CREATE TABLE IF NOT EXISTS events ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid() + + -- object type that the event is related to + , aggregate TEXT NOT NULL + -- id of the object that the event is related to + , aggregate_id TEXT NOT NULL + , instance_id TEXT NOT NULL + + -- time the event was created + , created_at TIMESTAMPTZ NOT NULL DEFAULT now() + -- user that created the event + , creator TEXT + -- type of the event + , type TEXT NOT NULL + -- version of the event + , revision SMALLINT NOT NULL + -- changed fields or NULL + , payload JSONB + + , position NUMERIC NOT NULL DEFAULT pg_current_xact_id()::TEXT::NUMERIC + , in_position_order INT2 NOT NULL + + , FOREIGN KEY (instance_id, aggregate, aggregate_id) REFERENCES aggregates(instance_id, type, id) ON DELETE CASCADE +); + +CREATE TABLE IF NOT EXISTS instances( + id TEXT + , name TEXT NOT NULL + , created_at TIMESTAMPTZ NOT NULL + , updated_at TIMESTAMPTZ NOT NULL + + , default_org_id TEXT + , iam_project_id TEXT + , console_client_id TEXT + , console_app_id TEXT + , default_language VARCHAR(10) + + , PRIMARY KEY (id) +); + +CREATE TABLE IF NOT EXISTS instance_domains( + instance_id TEXT NOT NULL + , domain TEXT NOT NULL + , is_primary BOOLEAN NOT NULL DEFAULT FALSE + , is_verified BOOLEAN NOT NULL DEFAULT FALSE + + , PRIMARY KEY (instance_id, domain) + , FOREIGN KEY (instance_id) REFERENCES instances(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS instance_domain_search_idx ON instance_domains (domain); \ No newline at end of file diff --git a/backend/storage/cache/cache.go b/backend/storage/cache/cache.go new file mode 100644 index 0000000000..e0d46ec39f --- /dev/null +++ b/backend/storage/cache/cache.go @@ -0,0 +1,8 @@ +package cache + +type Cache[K comparable, V any] interface { + Get(key K) (V, bool) + Set(key K, value V) + Delete(key K) + Clear() +} diff --git a/backend/storage/cache/gomap/map.go b/backend/storage/cache/gomap/map.go new file mode 100644 index 0000000000..26e9eaa772 --- /dev/null +++ b/backend/storage/cache/gomap/map.go @@ -0,0 +1,47 @@ +package gomap + +import ( + "sync" + + "github.com/zitadel/zitadel/backend/storage/cache" +) + +type Map[K comparable, V any] struct { + mu sync.RWMutex + items map[K]V +} + +// Clear implements cache.Cache. +func (m *Map[K, V]) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + + m.items = make(map[K]V, len(m.items)) +} + +// Delete implements cache.Cache. +func (m *Map[K, V]) Delete(key K) { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.items, key) +} + +// Get implements cache.Cache. +func (m *Map[K, V]) Get(key K) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + + value, exists := m.items[key] + return value, exists +} + +// Set implements cache.Cache. +func (m *Map[K, V]) Set(key K, value V) { + m.mu.Lock() + defer m.mu.Unlock() + + m.items[key] = value +} + +var _ cache.Cache[string, string] = &Map[string, string]{} diff --git a/backend/storage/database/config.go b/backend/storage/database/config.go new file mode 100644 index 0000000000..02ab4c8fff --- /dev/null +++ b/backend/storage/database/config.go @@ -0,0 +1,7 @@ +package database + +var Config = make(map[string]any) + +func AddDatabaseConfig(name string, configure func(map[string]any) error) { + Config[name] = configure +} diff --git a/backend/storage/database/database.go b/backend/storage/database/database.go new file mode 100644 index 0000000000..b8a146fbe5 --- /dev/null +++ b/backend/storage/database/database.go @@ -0,0 +1,83 @@ +package database + +import ( + "context" + "io/fs" +) + +type Row interface { + Scan(dest ...any) error +} + +type Rows interface { + Row + Next() bool + Close() + Err() error +} + +type Transaction interface { + Commit(ctx context.Context) error + Rollback(ctx context.Context) error + End(ctx context.Context, err error) error + + QueryExecutor +} + +type Client interface { + Beginner + QueryExecutor + + Release(ctx context.Context) error +} + +type Pool interface { + Beginner + QueryExecutor + + Acquire(ctx context.Context) (Client, error) + Close(ctx context.Context) error +} + +type TransactionOptions struct { + IsolationLevel IsolationLevel + AccessMode AccessMode +} + +type IsolationLevel uint8 + +const ( + IsolationLevelSerializable IsolationLevel = iota + IsolationLevelReadCommitted +) + +type AccessMode uint8 + +const ( + AccessModeReadWrite AccessMode = iota + AccessModeReadOnly +) + +type Beginner interface { + Begin(ctx context.Context, opts *TransactionOptions) (Transaction, error) +} + +type QueryExecutor interface { + Querier + Executor +} + +type Querier interface { + Query(ctx context.Context, sql string, args ...any) (Rows, error) + QueryRow(ctx context.Context, sql string, args ...any) Row +} + +type Executor interface { + Exec(ctx context.Context, sql string, args ...any) error +} + +// LoadStatements sets the sql statements strings +// TODO: implement +func LoadStatements(fs.FS) error { + return nil +} diff --git a/backend/storage/database/postgres/config.go b/backend/storage/database/postgres/config.go new file mode 100644 index 0000000000..36797ca3f7 --- /dev/null +++ b/backend/storage/database/postgres/config.go @@ -0,0 +1,4 @@ +package postgres + +type Config struct { +} diff --git a/backend/storage/database/postgres/conn.go b/backend/storage/database/postgres/conn.go new file mode 100644 index 0000000000..b36cc76403 --- /dev/null +++ b/backend/storage/database/postgres/conn.go @@ -0,0 +1,46 @@ +package postgres + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/zitadel/zitadel/backend/storage/database" +) + +type pgxConn struct{ *pgxpool.Conn } + +var _ database.Client = (*pgxConn)(nil) + +// Release implements [database.Client]. +func (c *pgxConn) Release(_ context.Context) error { + c.Conn.Release() + return nil +} + +// Begin implements [database.Client]. +func (c *pgxConn) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) { + tx, err := c.Conn.BeginTx(ctx, transactionOptionsToPgx(opts)) + if err != nil { + return nil, err + } + return &pgxTx{tx}, nil +} + +// Query implements sql.Client. +// Subtle: this method shadows the method (*Conn).Query of pgxConn.Conn. +func (c *pgxConn) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + return c.Conn.Query(ctx, sql, args...) +} + +// QueryRow implements sql.Client. +// Subtle: this method shadows the method (*Conn).QueryRow of pgxConn.Conn. +func (c *pgxConn) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return c.Conn.QueryRow(ctx, sql, args...) +} + +// Exec implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (c *pgxConn) Exec(ctx context.Context, sql string, args ...any) error { + _, err := c.Conn.Exec(ctx, sql, args...) + return err +} diff --git a/backend/storage/database/postgres/pool.go b/backend/storage/database/postgres/pool.go new file mode 100644 index 0000000000..ff8dc5dc16 --- /dev/null +++ b/backend/storage/database/postgres/pool.go @@ -0,0 +1,55 @@ +package postgres + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/zitadel/zitadel/backend/storage/database" +) + +type pgxPool struct{ pgxpool.Pool } + +var _ database.Pool = (*pgxPool)(nil) + +// Acquire implements [database.Pool]. +func (c *pgxPool) Acquire(ctx context.Context) (database.Client, error) { + conn, err := c.Pool.Acquire(ctx) + if err != nil { + return nil, err + } + return &pgxConn{conn}, nil +} + +// Query implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Query of pgxPool.Pool. +func (c *pgxPool) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + return c.Pool.Query(ctx, sql, args...) +} + +// QueryRow implements [database.Pool]. +// Subtle: this method shadows the method (Pool).QueryRow of pgxPool.Pool. +func (c *pgxPool) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return c.Pool.QueryRow(ctx, sql, args...) +} + +// Exec implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (c *pgxPool) Exec(ctx context.Context, sql string, args ...any) error { + _, err := c.Pool.Exec(ctx, sql, args...) + return err +} + +// Begin implements [database.Pool]. +func (c *pgxPool) Begin(ctx context.Context, opts *database.TransactionOptions) (database.Transaction, error) { + tx, err := c.Pool.BeginTx(ctx, transactionOptionsToPgx(opts)) + if err != nil { + return nil, err + } + return &pgxTx{tx}, nil +} + +// Close implements [database.Pool]. +func (c *pgxPool) Close(_ context.Context) error { + c.Pool.Close() + return nil +} diff --git a/backend/storage/database/postgres/tx.go b/backend/storage/database/postgres/tx.go new file mode 100644 index 0000000000..779d48f8b9 --- /dev/null +++ b/backend/storage/database/postgres/tx.go @@ -0,0 +1,83 @@ +package postgres + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/zitadel/zitadel/backend/storage/database" +) + +type pgxTx struct{ pgx.Tx } + +var _ database.Transaction = (*pgxTx)(nil) + +// Commit implements [database.Transaction]. +func (tx *pgxTx) Commit(ctx context.Context) error { + return tx.Tx.Commit(ctx) +} + +// Rollback implements [database.Transaction]. +func (tx *pgxTx) Rollback(ctx context.Context) error { + return tx.Tx.Rollback(ctx) +} + +// End implements [database.Transaction]. +func (tx *pgxTx) End(ctx context.Context, err error) error { + if err != nil { + tx.Rollback(ctx) + return err + } + return tx.Commit(ctx) +} + +// Query implements [database.Transaction]. +// Subtle: this method shadows the method (Tx).Query of pgxTx.Tx. +func (tx *pgxTx) Query(ctx context.Context, sql string, args ...any) (database.Rows, error) { + return tx.Tx.Query(ctx, sql, args...) +} + +// QueryRow implements [database.Transaction]. +// Subtle: this method shadows the method (Tx).QueryRow of pgxTx.Tx. +func (tx *pgxTx) QueryRow(ctx context.Context, sql string, args ...any) database.Row { + return tx.Tx.QueryRow(ctx, sql, args...) +} + +// Exec implements [database.Pool]. +// Subtle: this method shadows the method (Pool).Exec of pgxPool.Pool. +func (tx *pgxTx) Exec(ctx context.Context, sql string, args ...any) error { + _, err := tx.Tx.Exec(ctx, sql, args...) + return err +} + +func transactionOptionsToPgx(opts *database.TransactionOptions) pgx.TxOptions { + if opts == nil { + return pgx.TxOptions{} + } + + return pgx.TxOptions{ + IsoLevel: isolationToPgx(opts.IsolationLevel), + AccessMode: accessModeToPgx(opts.AccessMode), + } +} + +func isolationToPgx(isolation database.IsolationLevel) pgx.TxIsoLevel { + switch isolation { + case database.IsolationLevelSerializable: + return pgx.Serializable + case database.IsolationLevelReadCommitted: + return pgx.ReadCommitted + default: + return pgx.Serializable + } +} + +func accessModeToPgx(accessMode database.AccessMode) pgx.TxAccessMode { + switch accessMode { + case database.AccessModeReadWrite: + return pgx.ReadWrite + case database.AccessModeReadOnly: + return pgx.ReadOnly + default: + return pgx.ReadWrite + } +} diff --git a/backend/storage/event_store.go b/backend/storage/event_store.go new file mode 100644 index 0000000000..cfe7e1ae96 --- /dev/null +++ b/backend/storage/event_store.go @@ -0,0 +1,155 @@ +package storage + +// import ( +// "context" +// "time" + +// "github.com/jackc/pgx/v5" +// "github.com/jackc/pgx/v5/pgconn" +// "github.com/jackc/pgx/v5/pgxpool" +// ) + +// type EventStore interface { +// Write(ctx context.Context, commands ...Command) error +// } + +// type Command interface { +// Aggregate() Aggregate + +// Type() string +// Payload() any +// Revision() uint8 +// Creator() string + +// Operations() []Operation +// } + +// type Event interface { +// ID() string + +// Aggregate() Aggregate + +// CreatedAt() time.Time +// Type() string +// UnmarshalPayload(ptr any) error + +// Revision() uint8 +// Creator() string + +// Position() float64 +// InPositionOrder() uint16 +// } + +// type Aggregate struct { +// Type string +// ID string +// Instance string +// } + +// type Operation interface { +// Exec(ctx context.Context, tx TX) error +// } + +// type TX interface { +// Query() // Placeholder for future query methods +// } + +// type createInstance struct { +// id string +// creator string +// Name string +// } + +// var ( +// _ Command = (*createInstance)(nil) +// ) + +// func (c *createInstance) Aggregate() Aggregate { +// return Aggregate{ +// Type: "instance", +// ID: c.id, +// } +// } + +// func (c *createInstance) Type() string { +// return "instance.created" +// } + +// func (c *createInstance) Payload() any { +// return c +// } + +// func (c *createInstance) Revision() uint8 { +// return 1 +// } + +// func (c *createInstance) Creator() string { +// return c.creator +// } + +// func (c *createInstance) Operations() []Operation { +// return []Operation{} +// } + +// type executor[R any] interface { +// Exec(ctx context.Context, query string, args ...any) (R, error) +// } + +// type querier[R any, Q Query[R]] interface { +// Query(ctx context.Context, query Q) (R, error) +// // Query(ctx context.Context, query string, args ...any) (R, error) +// } + +// type Query[R any] interface { +// Query(ctx context.Context) (R, error) +// } + +// var _ executor[pgconn.CommandTag] = (*pgxExecutor)(nil) + +// type pgxExecutor struct { +// conn *pgx.Conn +// } + +// func (e *pgxExecutor) Exec(ctx context.Context, query string, args ...any) (pgconn.CommandTag, error) { +// return e.conn.Exec(ctx, query, args...) +// } + +// type pgxQuerier struct { +// conn *pgx.Conn +// } + +// type pgxCreateInstanceQuery struct { +// Name string +// } + +// type instance struct{ +// id string +// name string +// } + +// type instanceByIDQuery[R Rows] struct{ +// id string +// } + +// type Rows interface { +// Next() bool +// Scan(dest ...any) error +// } + +// func (q *instanceByIDQuery[R]) Query(ctx context.Context) (*instance, error) { +// return nil, nil +// } + +// type pgxInstanceRepository struct { +// pool pgxpool.Pool +// } + +// func (r *pgxInstanceRepository) InstanceByID(ctx context.Context, id string) (*instance, error) { +// query := &instanceByIDQuery[pgx.Rows]{} +// query. +// return nil, nil +// } + +// func (q *pgxQuerier) Query(ctx context.Context, query string, args ...any) (pgx.Rows, error) { +// return q.conn.Query(ctx, query, args...) +// } diff --git a/backend/storage/eventstore/event_store.go b/backend/storage/eventstore/event_store.go new file mode 100644 index 0000000000..4fe990ee2a --- /dev/null +++ b/backend/storage/eventstore/event_store.go @@ -0,0 +1,21 @@ +package eventstore + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/database" +) + +type Eventstore struct { + executor database.Executor +} + +func New(executor database.Executor) *Eventstore { + return &Eventstore{executor: executor} +} + +type Event interface{} + +func (e *Eventstore) Push(ctx context.Context, events ...Event) error { + return nil +} diff --git a/backend/storage/instance_repository.go b/backend/storage/instance_repository.go new file mode 100644 index 0000000000..2b6fc479ad --- /dev/null +++ b/backend/storage/instance_repository.go @@ -0,0 +1,111 @@ +package storage + +// import ( +// "context" + +// "github.com/jackc/pgx/v5" +// "github.com/jackc/pgx/v5/pgxpool" +// ) + +// type Instance struct { +// ID string +// Name string +// } + +// // type InstanceRepository interface { +// // InstanceByID(id string) (*Instance, error) +// // } + +// type Row interface { +// Scan(dest ...interface{}) error +// } + +// type RowQuerier[R Row] interface { +// QueryRow(ctx context.Context, query string, args ...any) R +// } + +// type Rows interface { +// Row +// Next() bool +// Close() +// Err() error +// } + +// type RowsQuerier[R Rows] interface { +// Query(ctx context.Context, query string, args ...any) (R, error) +// } + +// type Executor interface { +// Exec(ctx context.Context, query string, args ...any) error +// } + +// type instanceByIDQuery[R Row, Q RowQuerier[R]] struct { +// id string +// querier Q +// } + +// func (q instanceByIDQuery[R, Q]) Query(ctx context.Context) (*Instance, error) { +// row := q.querier.QueryRow(ctx, "SELECT * FROM instances WHERE id = $1", q.id) +// var instance Instance +// if err := row.Scan(&instance.ID, &instance.Name); err != nil { +// return nil, err +// } +// return &instance, nil +// } + +// type InstanceRepository interface { +// ByIDQuery(ctx context.Context, id string) (*Instance, error) +// ByDomainQuery(ctx context.Context, domain string) (*Instance, error) +// } + +// type InstanceRepositorySQL struct { +// pool *pgxpool.Pool +// } + +// type InstanceRepositoryMap struct { +// instances map[string]*Instance +// domains *InstanceDomainRepositoryMao +// } + +// type InstanceDomainRepositoryMao struct { +// domains map[string]string +// } + +// func GetInstanceByID[R Row, Q RowQuerier[R]](ctx context.Context, querier Q, id string) (*Instance, error) { +// row := querier.QueryRow(ctx, "SELECT * FROM instances WHERE id = $1", id) +// var instance Instance +// if err := row.Scan(&instance.ID, &instance.Name); err != nil { +// return nil, err +// } +// return &instance, nil +// } + +// const instanceByDomainQuery = `SELECT +// i.* +// FROM +// instances i +// JOIN +// instance_domains id +// ON +// id.instance_id = i.id +// WHERE +// id.domain = $1` + +// func GetInstanceByDomain[R Row, Q RowQuerier[R]](ctx context.Context, querier Q, domain string) (*Instance, error) { +// row := querier.QueryRow(ctx, instanceByDomainQuery, domain) +// var instance Instance +// if err := row.Scan(); err != nil { +// return nil, err +// } +// return &instance, nil +// } + +// func CreateInstance[E Executor](ctx context.Context, executor E, instance *Instance) error { +// return executor.Exec(ctx, "INSERT INTO instances (id, name) VALUES ($1, $2)", instance.ID, instance.Name) +// } + +// func bla(ctx context.Context) { +// var c *pgxpool.Tx +// instance, err := instanceByIDQuery[pgx.Row, *pgxpool.Tx]{querier: c}.Query(ctx) +// _, _ = instance, err +// } diff --git a/backend/storage/repo.go b/backend/storage/repo.go new file mode 100644 index 0000000000..d2b373202c --- /dev/null +++ b/backend/storage/repo.go @@ -0,0 +1,75 @@ +package storage + +// import ( +// "context" + +// "github.com/jackc/pgx/v5" +// "github.com/jackc/pgx/v5/pgxpool" +// ) + +// type row interface { +// Scan(dest ...any) error +// } + +// type rows interface { +// row +// Next() bool +// Close() +// Err() error +// } + +// type querier interface { +// NewRowQuery() +// NewRowsQuery() +// } + +// type Client interface { +// // querier +// InstanceRepository() instanceRepository +// } + +// type instanceRepository interface { +// ByIDQuery(ctx context.Context, id string) (*Instance, error) +// ByDomainQuery(ctx context.Context, domain string) (*Instance, error) +// } + +// type pgxClient pgxpool.Pool + +// func (c *pgxClient) Begin(ctx context.Context) (*pgxTx, error) { +// tx, err := (*pgxpool.Pool)(c).Begin(ctx) +// if err != nil { +// return nil, err +// } +// return (*pgxTx)(tx.(*pgxpool.Tx)), nil +// } + +// func (c *pgxClient) InstanceRepository() instanceRepository { +// return &pgxInstanceRepository[pgxpool.Pool]{client: (*pgxpool.Pool)(c)} +// } + +// type pgxTx pgxpool.Tx + +// func (c *pgxTx) InstanceRepository() instanceRepository { +// return &pgxInstanceRepository[pgxpool.Tx]{client: (*pgxpool.Tx)(c)} +// } + +// type pgxInstanceRepository[C pgxpool.Pool | pgxpool.Tx] struct { +// client *C +// } + +// func (r *pgxInstanceRepository[C]) ByIDQuery(ctx context.Context, id string) (*Instance, error) { +// // return r.client +// pgx.Tx +// return nil, nil +// } + +// func (r *pgxInstanceRepository[C]) ByDomainQuery(ctx context.Context, domain string) (*Instance, error) { + +// return nil, nil +// } + +// func blabla() { +// var client Client = &pgxClient{&pgxpool.Pool{}} + +// client.be +// } diff --git a/backend/storage/repository/cache/instance.go b/backend/storage/repository/cache/instance.go new file mode 100644 index 0000000000..662d54fd91 --- /dev/null +++ b/backend/storage/repository/cache/instance.go @@ -0,0 +1,86 @@ +package cache + +import ( + "context" + "sync" + + "github.com/zitadel/zitadel/backend/storage/cache" + "github.com/zitadel/zitadel/backend/storage/repository" +) + +type Instance struct { + mu *sync.RWMutex + byID cache.Cache[string, *repository.Instance] + byDomain cache.Cache[string, *repository.Instance] + + next repository.InstanceRepository +} + +func (i *Instance) SetNext(next repository.InstanceRepository) *Instance { + return &Instance{ + mu: i.mu, + byID: i.byID, + byDomain: i.byDomain, + next: next, + } +} + +// ByDomain implements repository.InstanceRepository. +func (i *Instance) ByDomain(ctx context.Context, domain string) (instance *repository.Instance, err error) { + i.mu.RLock() + defer i.mu.RUnlock() + + if instance, ok := i.byDomain.Get(domain); ok { + return instance, nil + } + + instance, err = i.next.ByDomain(ctx, domain) + if err != nil { + return nil, err + } + + i.set(instance, domain) + + return instance, nil +} + +// ByID implements repository.InstanceRepository. +func (i *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) { + i.mu.RLock() + defer i.mu.RUnlock() + + if instance, ok := i.byID.Get(id); ok { + return instance, nil + } + + instance, err := i.next.ByID(ctx, id) + if err != nil { + return nil, err + + } + + i.set(instance, "") + return instance, nil +} + +// SetUp implements repository.InstanceRepository. +func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) error { + err := i.next.SetUp(ctx, instance) + if err != nil { + return err + } + + i.set(instance, "") + return nil +} + +var _ repository.InstanceRepository = (*Instance)(nil) + +func (i *Instance) set(instance *repository.Instance, domain string) { + i.mu.Lock() + defer i.mu.Unlock() + if domain != "" { + i.byDomain.Set(domain, instance) + } + i.byID.Set(instance.ID, instance) +} diff --git a/backend/storage/repository/cache/user.go b/backend/storage/repository/cache/user.go new file mode 100644 index 0000000000..b8632c1cc8 --- /dev/null +++ b/backend/storage/repository/cache/user.go @@ -0,0 +1,45 @@ +package cache + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/cache" + "github.com/zitadel/zitadel/backend/storage/repository" +) + +type User struct { + cache.Cache[string, *repository.User] + + next repository.UserRepository +} + +// ByID implements repository.UserRepository. +func (u *User) ByID(ctx context.Context, id string) (*repository.User, error) { + if user, ok := u.Get(id); ok { + return user, nil + } + + user, err := u.next.ByID(ctx, id) + if err != nil { + return nil, err + } + + u.set(user) + return user, nil +} + +// Create implements repository.UserRepository. +func (u *User) Create(ctx context.Context, user *repository.User) error { + err := u.next.Create(ctx, user) + if err != nil { + return err + } + u.set(user) + return nil +} + +var _ repository.UserRepository = (*User)(nil) + +func (u *User) set(user *repository.User) { + u.Cache.Set(user.ID, user) +} diff --git a/backend/storage/repository/event/instance.go b/backend/storage/repository/event/instance.go new file mode 100644 index 0000000000..1944b40881 --- /dev/null +++ b/backend/storage/repository/event/instance.go @@ -0,0 +1,37 @@ +package event + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/eventstore" + "github.com/zitadel/zitadel/backend/storage/repository" +) + +var _ repository.InstanceRepository = (*Instance)(nil) + +type Instance struct { + *eventstore.Eventstore + + next repository.InstanceRepository +} + +func NewInstance(eventstore *eventstore.Eventstore, next repository.InstanceRepository) *Instance { + return &Instance{next: next, Eventstore: eventstore} +} + +func (i *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) { + return i.next.ByID(ctx, id) +} + +func (i *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) { + return i.next.ByDomain(ctx, domain) +} + +func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) error { + err := i.next.SetUp(ctx, instance) + if err != nil { + return err + } + + return i.Push(ctx, instance) +} diff --git a/backend/storage/repository/event/user.go b/backend/storage/repository/event/user.go new file mode 100644 index 0000000000..0a3ffe1adf --- /dev/null +++ b/backend/storage/repository/event/user.go @@ -0,0 +1,33 @@ +package event + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/eventstore" + "github.com/zitadel/zitadel/backend/storage/repository" +) + +var _ repository.UserRepository = (*User)(nil) + +type User struct { + *eventstore.Eventstore + + next repository.UserRepository +} + +func NewUser(eventstore *eventstore.Eventstore, next repository.UserRepository) *User { + return &User{next: next, Eventstore: eventstore} +} + +func (i *User) ByID(ctx context.Context, id string) (*repository.User, error) { + return i.next.ByID(ctx, id) +} + +func (i *User) Create(ctx context.Context, user *repository.User) error { + err := i.next.Create(ctx, user) + if err != nil { + return err + } + + return i.Push(ctx, user) +} diff --git a/backend/storage/repository/instance.go b/backend/storage/repository/instance.go new file mode 100644 index 0000000000..41c4c0d247 --- /dev/null +++ b/backend/storage/repository/instance.go @@ -0,0 +1,14 @@ +package repository + +import "context" + +type InstanceRepository interface { + SetUp(ctx context.Context, instance *Instance) error + ByID(ctx context.Context, id string) (*Instance, error) + ByDomain(ctx context.Context, domain string) (*Instance, error) +} + +type Instance struct { + ID string + Name string +} diff --git a/backend/storage/repository/sql/instance.go b/backend/storage/repository/sql/instance.go new file mode 100644 index 0000000000..97f89c61e6 --- /dev/null +++ b/backend/storage/repository/sql/instance.go @@ -0,0 +1,45 @@ +package sql + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/database" + "github.com/zitadel/zitadel/backend/storage/repository" +) + +func NewInstance(client database.QueryExecutor) repository.InstanceRepository { + return &Instance{client: client} +} + +type Instance struct { + client database.QueryExecutor +} + +const instanceByDomainQuery = `SELECT i.id, i.name FROM instances i JOIN instance_domains id ON i.id = id.instance_id WHERE id.domain = $1` + +// ByDomain implements [InstanceRepository]. +func (r *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) { + row := r.client.QueryRow(ctx, instanceByDomainQuery, domain) + var instance repository.Instance + if err := row.Scan(&instance.ID, &instance.Name); err != nil { + return nil, err + } + return &instance, nil +} + +const instanceByIDQuery = `SELECT id, name FROM instances WHERE id = $1` + +// ByID implements [InstanceRepository]. +func (r *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) { + row := r.client.QueryRow(ctx, instanceByIDQuery, id) + var instance repository.Instance + if err := row.Scan(&instance.ID, &instance.Name); err != nil { + return nil, err + } + return &instance, nil +} + +// SetUp implements [InstanceRepository]. +func (r *Instance) SetUp(ctx context.Context, instance *repository.Instance) error { + return r.client.Exec(ctx, "INSERT INTO instances (id, name) VALUES ($1, $2)", instance.ID, instance.Name) +} diff --git a/backend/storage/repository/sql/user.go b/backend/storage/repository/sql/user.go new file mode 100644 index 0000000000..477acc8a01 --- /dev/null +++ b/backend/storage/repository/sql/user.go @@ -0,0 +1,33 @@ +package sql + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/database" + "github.com/zitadel/zitadel/backend/storage/repository" +) + +func NewUser(client database.QueryExecutor) repository.UserRepository { + return &User{client: client} +} + +type User struct { + client database.QueryExecutor +} + +const userByIDQuery = `SELECT id, username FROM users WHERE id = $1` + +// ByID implements [UserRepository]. +func (r *User) ByID(ctx context.Context, id string) (*repository.User, error) { + row := r.client.QueryRow(ctx, userByIDQuery, id) + var user repository.User + if err := row.Scan(&user.ID, &user.Username); err != nil { + return nil, err + } + return &user, nil +} + +// Create implements [UserRepository]. +func (r *User) Create(ctx context.Context, user *repository.User) error { + return r.client.Exec(ctx, "INSERT INTO users (id, username) VALUES ($1, $2)", user.ID, user.Username) +} diff --git a/backend/storage/repository/telemetry/logged/instance.go b/backend/storage/repository/telemetry/logged/instance.go new file mode 100644 index 0000000000..ce8bda023d --- /dev/null +++ b/backend/storage/repository/telemetry/logged/instance.go @@ -0,0 +1,40 @@ +package logged + +import ( + "context" + "log/slog" + + "github.com/zitadel/zitadel/backend/storage/repository" +) + +type Instance struct { + *slog.Logger + + next repository.InstanceRepository +} + +func NewInstance(logger *slog.Logger, next repository.InstanceRepository) *Instance { + return &Instance{Logger: logger, next: next} +} + +var _ repository.InstanceRepository = (*Instance)(nil) + +func (i *Instance) ByID(ctx context.Context, id string) (*repository.Instance, error) { + i.Logger.InfoContext(ctx, "By ID Query", slog.String("id", id)) + return i.next.ByID(ctx, id) +} + +func (i *Instance) ByDomain(ctx context.Context, domain string) (*repository.Instance, error) { + i.Logger.InfoContext(ctx, "By Domain Query", slog.String("domain", domain)) + return i.next.ByDomain(ctx, domain) +} + +func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) error { + err := i.next.SetUp(ctx, instance) + if err != nil { + i.Logger.ErrorContext(ctx, "Failed to set up instance", slog.Any("instance", instance), slog.Any("cause", err)) + return err + } + i.Logger.InfoContext(ctx, "Instance set up", slog.Any("instance", instance)) + return nil +} diff --git a/backend/storage/repository/telemetry/logged/user.go b/backend/storage/repository/telemetry/logged/user.go new file mode 100644 index 0000000000..a8a59cfdd2 --- /dev/null +++ b/backend/storage/repository/telemetry/logged/user.go @@ -0,0 +1,35 @@ +package logged + +import ( + "context" + "log/slog" + + "github.com/zitadel/zitadel/backend/storage/repository" +) + +type User struct { + *slog.Logger + + next repository.UserRepository +} + +func NewUser(logger *slog.Logger, next repository.UserRepository) *User { + return &User{Logger: logger, next: next} +} + +var _ repository.UserRepository = (*User)(nil) + +func (i *User) ByID(ctx context.Context, id string) (*repository.User, error) { + i.Logger.InfoContext(ctx, "By ID Query", slog.String("id", id)) + return i.next.ByID(ctx, id) +} + +func (i *User) Create(ctx context.Context, user *repository.User) error { + err := i.next.Create(ctx, user) + if err != nil { + i.Logger.ErrorContext(ctx, "Failed to create user", slog.Any("user", user), slog.Any("cause", err)) + return err + } + i.Logger.InfoContext(ctx, "User created successfully", slog.Any("user", user)) + return nil +} diff --git a/backend/storage/repository/telemetry/traced/instance.go b/backend/storage/repository/telemetry/traced/instance.go new file mode 100644 index 0000000000..8466ff28a5 --- /dev/null +++ b/backend/storage/repository/telemetry/traced/instance.go @@ -0,0 +1,54 @@ +package traced + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/repository" + "github.com/zitadel/zitadel/backend/telemetry/tracing" +) + +var _ repository.InstanceRepository = (*Instance)(nil) + +type Instance struct { + *tracing.Tracer + + next repository.InstanceRepository +} + +func NewInstance(tracer *tracing.Tracer, next repository.InstanceRepository) *Instance { + return &Instance{Tracer: tracer, next: next} +} + +func (i *Instance) SetNext(next repository.InstanceRepository) *Instance { + return &Instance{Tracer: i.Tracer, next: next} +} + +// ByDomain implements [repository.InstanceRepository]. +func (i *Instance) ByDomain(ctx context.Context, domain string) (instance *repository.Instance, err error) { + i.Tracer.Decorate(ctx, func(ctx context.Context) error { + instance, err = i.next.ByDomain(ctx, domain) + return err + }) + + return instance, err +} + +// ByID implements [repository.InstanceRepository]. +func (i *Instance) ByID(ctx context.Context, id string) (instance *repository.Instance, err error) { + i.Tracer.Decorate(ctx, func(ctx context.Context) error { + instance, err = i.next.ByID(ctx, id) + return err + }) + + return instance, err +} + +// SetUp implements [repository.InstanceRepository]. +func (i *Instance) SetUp(ctx context.Context, instance *repository.Instance) (err error) { + i.Tracer.Decorate(ctx, func(ctx context.Context) error { + err = i.next.SetUp(ctx, instance) + return err + }) + + return err +} diff --git a/backend/storage/repository/telemetry/traced/user.go b/backend/storage/repository/telemetry/traced/user.go new file mode 100644 index 0000000000..4de197363e --- /dev/null +++ b/backend/storage/repository/telemetry/traced/user.go @@ -0,0 +1,44 @@ +package traced + +import ( + "context" + + "github.com/zitadel/zitadel/backend/storage/repository" + "github.com/zitadel/zitadel/backend/telemetry/tracing" +) + +var _ repository.UserRepository = (*User)(nil) + +type User struct { + *tracing.Tracer + + next repository.UserRepository +} + +func NewUser(tracer *tracing.Tracer, next repository.UserRepository) *User { + return &User{Tracer: tracer, next: next} +} + +func (i *User) SetNext(next repository.UserRepository) *User { + return &User{Tracer: i.Tracer, next: next} +} + +// ByID implements [repository.UserRepository]. +func (i *User) ByID(ctx context.Context, id string) (user *repository.User, err error) { + i.Tracer.Decorate(ctx, func(ctx context.Context) error { + user, err = i.next.ByID(ctx, id) + return err + }) + + return user, err +} + +// Create implements [repository.UserRepository]. +func (i *User) Create(ctx context.Context, user *repository.User) (err error) { + i.Tracer.Decorate(ctx, func(ctx context.Context) error { + err = i.next.Create(ctx, user) + return err + }) + + return err +} diff --git a/backend/storage/repository/user.go b/backend/storage/repository/user.go new file mode 100644 index 0000000000..53dd417ff3 --- /dev/null +++ b/backend/storage/repository/user.go @@ -0,0 +1,13 @@ +package repository + +import "context" + +type UserRepository interface { + Create(ctx context.Context, user *User) error + ByID(ctx context.Context, id string) (*User, error) +} + +type User struct { + ID string + Username string +} diff --git a/backend/storage/storage.go b/backend/storage/storage.go new file mode 100644 index 0000000000..e1fd431a6e --- /dev/null +++ b/backend/storage/storage.go @@ -0,0 +1,43 @@ +package storage + +// import "context" + +// type Storage interface { +// Write(ctx context.Context, commands ...Command) error +// } + +// type Command interface { +// // Subjects returns a list of subjects which describe the command +// // the type should always be in past tense +// // "." is used as separator +// // e.g. "user.created" +// Type() string +// // Payload returns the payload of the command +// // The payload can either be +// // - a struct +// // - a map +// // - nil +// Payload() any +// // Revision returns the revision of the command +// Revision() uint8 +// // Creator returns the user id who created the command +// Creator() string + +// // Object returns the object the command belongs to +// Object() Model +// // Parents returns the parents of the object +// // If the list is empty there are no parents +// Parents() []Model + +// // Objects returns the models to update during inserting the commands +// Objects() []Object +// } + +// type Model struct { +// Name string +// ID string +// } + +// type Object struct { +// Model Model +// } diff --git a/backend/telemetry/tracing/tracer.go b/backend/telemetry/tracing/tracer.go new file mode 100644 index 0000000000..434a364428 --- /dev/null +++ b/backend/telemetry/tracing/tracer.go @@ -0,0 +1,70 @@ +package tracing + +import ( + "context" + "runtime" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +type Tracer struct{ trace.Tracer } + +func NewTracer(name string) Tracer { + return Tracer{otel.Tracer(name)} +} + +type DecorateOption func(*decorateOptions) + +type decorateOptions struct { + startOpts []trace.SpanStartOption + endOpts []trace.SpanEndOption + + spanName string +} + +func WithSpanName(name string) DecorateOption { + return func(o *decorateOptions) { + o.spanName = name + } +} + +func WithSpanStartOptions(opts ...trace.SpanStartOption) DecorateOption { + return func(o *decorateOptions) { + o.startOpts = append(o.startOpts, opts...) + } +} + +func WithSpanEndOptions(opts ...trace.SpanEndOption) DecorateOption { + return func(o *decorateOptions) { + o.endOpts = append(o.endOpts, opts...) + } +} + +func (t Tracer) Decorate(ctx context.Context, fn func(ctx context.Context) error, opts ...DecorateOption) { + o := new(decorateOptions) + for _, opt := range opts { + opt(o) + } + + if o.spanName == "" { + o.spanName = functionName() + } + + _, span := t.Tracer.Start(ctx, o.spanName, o.startOpts...) + defer span.End(o.endOpts...) + + if err := fn(ctx); err != nil { + span.RecordError(err) + } +} + +func functionName() string { + counter, _, _, success := runtime.Caller(2) + + if !success { + return "zitadel" + } + + return runtime.FuncForPC(counter).Name() +}