diff --git a/.releaserc.js b/.releaserc.js index f0238e0dc9..74da457461 100644 --- a/.releaserc.js +++ b/.releaserc.js @@ -2,7 +2,6 @@ module.exports = { branches: [ {name: 'main'}, {name: '1.87.x', range: '1.87.x', channel: '1.87.x'}, - {name: 'startup-times', prerelease: 'beta'} ], plugins: [ "@semantic-release/commit-analyzer" diff --git a/cmd/defaults.yaml b/cmd/defaults.yaml index 1ce991644d..76c6def756 100644 --- a/cmd/defaults.yaml +++ b/cmd/defaults.yaml @@ -160,7 +160,7 @@ Auth: SearchLimit: 1000 Spooler: ConcurrentWorkers: 1 - ConcurrentInstances: 10 + ConcurrentInstances: 1 BulkLimit: 10000 FailureCountUntilSkip: 5 @@ -168,7 +168,7 @@ Admin: SearchLimit: 1000 Spooler: ConcurrentWorkers: 1 - ConcurrentInstances: 10 + ConcurrentInstances: 1 BulkLimit: 10000 FailureCountUntilSkip: 5 diff --git a/cmd/setup/05.sql b/cmd/setup/05.sql index c30f74081d..e3d09c6fb5 100644 --- a/cmd/setup/05.sql +++ b/cmd/setup/05.sql @@ -1,10 +1,10 @@ -CREATE INDEX instance_id_idx ON adminapi.current_sequences (instance_id); -CREATE INDEX instance_id_idx ON auth.current_sequences (instance_id); -CREATE INDEX instance_id_idx ON projections.current_sequences (instance_id); +CREATE INDEX current_sequences_instance_id_idx ON adminapi.current_sequences (instance_id); +CREATE INDEX current_sequences_instance_id_idx ON auth.current_sequences (instance_id); +CREATE INDEX current_sequences_instance_id_idx ON projections.current_sequences (instance_id); -CREATE INDEX instance_id_idx ON adminapi.failed_events (instance_id); -CREATE INDEX instance_id_idx ON auth.failed_events (instance_id); -CREATE INDEX instance_id_idx ON projections.failed_events (instance_id); +CREATE INDEX failed_events_instance_id_idx ON adminapi.failed_events (instance_id); +CREATE INDEX failed_events_instance_id_idx ON auth.failed_events (instance_id); +CREATE INDEX failed_events_instance_id_idx ON projections.failed_events (instance_id); ALTER TABLE adminapi.failed_events ADD COLUMN last_failed TIMESTAMPTZ; ALTER TABLE auth.failed_events ADD COLUMN last_failed TIMESTAMPTZ; diff --git a/cmd/start/start.go b/cmd/start/start.go index 3ce7d759ac..486d551689 100644 --- a/cmd/start/start.go +++ b/cmd/start/start.go @@ -173,11 +173,11 @@ func startAPIs(ctx context.Context, router *mux.Router, commands *command.Comman return err } apis := api.New(config.Port, router, queries, verifier, config.InternalAuthZ, config.ExternalSecure, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader) - authRepo, err := auth_es.Start(config.Auth, config.SystemDefaults, commands, queries, dbClient, keys.OIDC, keys.User) + authRepo, err := auth_es.Start(ctx, config.Auth, config.SystemDefaults, commands, queries, dbClient, eventstore, keys.OIDC, keys.User) if err != nil { return fmt.Errorf("error starting auth repo: %w", err) } - adminRepo, err := admin_es.Start(config.Admin, store, dbClient) + adminRepo, err := admin_es.Start(ctx, config.Admin, store, dbClient, eventstore) if err != nil { return fmt.Errorf("error starting admin repo: %w", err) } diff --git a/docs/docs/examples/introduction.mdx b/docs/docs/examples/introduction.mdx index f9ff8500b6..1e006232c8 100644 --- a/docs/docs/examples/introduction.mdx +++ b/docs/docs/examples/introduction.mdx @@ -2,30 +2,71 @@ title: Overview --- -import Tabs from '@theme/Tabs'; -import TabItem from '@theme/TabItem'; -import {Card, CardWrapper} from '../../src/components/card'; +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import { Card, CardWrapper } from "../../src/components/card"; -Get started with ZITADEL quickly by reading a quickstart or by cloning an example from our [ZITADEL examples](https://github.com/zitadel/zitadel-examples) repo. +Get started with ZITADEL quickly by reading a quickstart or by cloning a [ZITADEL example](https://github.com/search?q=topic%3Aexamples+org%3Azitadel) repo. - - - - + + + + + - - + + - + @@ -33,18 +74,33 @@ Get started with ZITADEL quickly by reading a quickstart or by cloning an exampl ## Clone a sample project - - - - + + + + ## Libraries - -| Language | Description | Link | -| ------------ | ---------------------|-------------| -| Go | Go client library for ZITADEL. | [https://github.com/zitadel/zitadel-go](https://github.com/zitadel/zitadel-go) -| .Net | Authentication / Authorization library written in dotnet for the asp.net web application package. | [https://github.com/zitadel/zitadel-net](https://github.com/zitadel/zitadel-net) -| Dart | Dart library for ZITADEL, contains gRPC and API access elements. | [https://github.com/zitadel/zitadel-dart](https://github.com/zitadel/zitadel-dart) | -| Elixir | API Client for the ZITADEL API. | [https://github.com/jshmrtn/zitadel_api](https://github.com/jshmrtn/zitadel_api) | +| Language | Description | Link | +| -------- | ------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------- | +| Go | Go client library for ZITADEL. | [https://github.com/zitadel/zitadel-go](https://github.com/zitadel/zitadel-go) | +| .Net | Authentication / Authorization library written in dotnet for the asp.net web application package. | [https://github.com/zitadel/zitadel-net](https://github.com/zitadel/zitadel-net) | +| Dart | Dart library for ZITADEL, contains gRPC and API access elements. | [https://github.com/zitadel/zitadel-dart](https://github.com/zitadel/zitadel-dart) | +| Elixir | API Client for the ZITADEL API. | [https://github.com/jshmrtn/zitadel_api](https://github.com/jshmrtn/zitadel_api) | diff --git a/docs/docs/examples/login/nextjs-b2b.md b/docs/docs/examples/login/nextjs-b2b.md new file mode 100644 index 0000000000..51a2e4fdbc --- /dev/null +++ b/docs/docs/examples/login/nextjs-b2b.md @@ -0,0 +1,177 @@ +--- +title: Next.js B2B Scenario +--- + +This is our Zitadel [Next.js](https://nextjs.org/) B2B template. It shows how to authenticate as a user with multiple organizations. The application shows your users roles on the selected organizations, other projects your organization is allowed to use and other users having a grant to use the application. + +If you need more info on B2B use cases consider reading our guide for the [B2B solution scenario](../../guides/solution-scenarios/b2b.mdx). + +> You can follow along with the template code in our [zitadel-nextjs-b2b](https://github.com/zitadel/zitadel-nextjs-b2b) repo. + +![B2B Application](/img/nextjs-b2b/home.png) + +## What does it do? + +Users with `view` role can view granted projects on their organization which were granted by your organization (owning this portal application). +Users with `admin` role can view granted projects and list users of the selected organization who are granted to use the portal application too. + +## Setup Vendor application and users in ZITADEL + +First we need to create an organization that holds the Vendor's users, projects and applications. + +### Vendor Organization + +Navigate to `https://{YourDomain}.zitadel.cloud/ui/console/orgs` (replace {YourDomain}), and click on the button "New". +Toggle the setting "Use your personal account as organization owner". + +Enter the name `Demo-Vendor`, and click "Create". Then click on that organization. + +### Portal Web Application + +To setup this sample you have to create a project and an application in the vendor organization (`Demo-Vendor`) first. + +Open the Console (`https://{YourDomain}.zitadel.cloud/ui/console/projects`) and create a new project. Let's call it `Portal`. + +Then on the project detail page click on new application and enter a name for this app. +Let's call this one `portal-web`. +Select `Web`, continue, `PKCE`, then enter `http://localhost:3000/api/auth/callback/zitadel` for the redirect, and `http://localhost:3000` for the post redirect. Then press on `create`. + +Copy the "Resource Id" of the project `Portal` as you will need this in your environment configuration file later. + +Click on the application `portal-web`. +On the application detail page click on the section under redirect settings and enable `Development Mode`. This will allow you application to work on `localhost:3000`. +To read the user data and roles from ID Token, go to the section Token Settings and make sure both checkboxes, `User roles inside ID Token` and `User Info inside ID Token` are enabled. +Make sure to save your changes. + +Copy the "Resource Id" of the application `portal-web` as you will need this in your environment configuration file later. + +### Roles + +To setup the needed roles for your project, navigate to your `Portal` project, and add the following roles + +| Key | Display Name | Group | Description | +| :----- | :------------ | :---- | ---------------------------------------------------------------------- | +| admin | Administrator | | The administrator, allowed to read granted projects and to user grants | +| reader | Reader | | A user who is allowed to read his organizations granted projects only | + +Now in the `General` section of the Portal project, make sure to enable `Assert Roles on Authentication`. +This makes sure that roles, which is used by the application to enable UI components, are set in your OIDC ID Token. + +### Service User + +To make the application work you need a service user which loads granted-projects and user-grants for you. +In the B2B-Demo organization, navigate to `Users` in navigation of Console, click on `Service Users` and create a new user. +Let's set its username to `nextjs` and its name to `NextJS`. Then press `create`. + +On the detail page of that user, navigate to "Personal Access Tokens" and add a new entry, set an optional expiration date. + +Copy the generated Token as you will need this in your environment configuration file later. + +Go back to the `Portal` project and add the Service User as Manager (top right). +Make sure to select `Project Owner Viewer` as the management role. + +To show granted projects, go to the `Demo-Vendor` organization and add the Service User as `Org Project Permission Editor` Manager. + +## Configuration + +Now clone this project and navigate to its root folder. +Create a file `.env.local` and copy paste the following: + +```text +NEXTAUTH_URL=http://localhost:3000 +NEXT_PUBLIC_ZITADEL_ISSUER=https://{YourDomain}.zitadel.cloud +ZITADEL_API=https://{YourDomain}.zitadel.cloud +ORG_ID={YourOrgId} +PROJECT_ID={YourProjectId} +ZITADEL_CLIENT_ID={YourClientID} +SERVICE_ACCOUNT_ACCESS_TOKEN={YourServiceAccountSecret} +NEXTAUTH_SECRET=randomsecret +``` + +Replace the values as follows + +`NEXTAUTH_URL`: Base url of this demo app (B2B portal); runs per default on [http://localhost:3000](http://localhost:3000) + +`NEXT_PUBLIC_ZITADEL_ISSUER`: The url to your zitadel instance. When using zitadel.cloud for this demo you can find the domain of your ZITADEL instance in the customer portal. You can also find this information by going to your application `portal-web` and click 'Urls' in the navigation. The variable is prefixed with `NEXT_PUBLIC_` such that it can be accessed from the client. + +`ZITADEL_API`: URL of the Management API. Typically the same as `ZITADEL_ISSUER`. + +`ORG_ID`: We will create an organization during later steps. You can find `{YourOrgId}` by selecting the `Demo-Vendor` organization in Console. `{YourOrgId}` is displayed on top of the organization detail page as "Resource Id". + +`PROJECT_ID`: You can find `{YourProjectId}` by clicking on "Projects" in the navigation and select the Project `Portal`. `{YourProjectId}` is displayed on the top as "Resource Id". + +`ZITADEL_CLIENT_ID`: Having the project `Portal` selected, click on the Application `portal-web`. `{YourClientID}` is displayed as a field in the OIDC configuration, labelled "Client ID" and has the format `12345678@portal`. + +`SERVICE_ACCOUNT_ACCESS_TOKEN`: Setup a service user, add a Personal Access Token and copy the secret here (see below). + +## Install and Run + +To run this sample locally you need to install dependencies first. + +Type and execute: + +```bash +yarn install +``` + +then, to run the development server: + +```bash +npm run dev +# or +yarn dev +``` + +and open [http://localhost:3000](http://localhost:3000) with your browser to see the result. + +## Create a customer organization + +### Customer organization + +Create a new organization in Console. Easiest way is to use the organization dropdown on the top left. +Let's call this new organization `Demo-Customer`. + +### Users + +Now switch back to the organization `Demo-Customer` and [create a new user](https://docs.zitadel.com/docs/manuals/user-register) in this organization. +Let's call the first user `Alice Admin`. Create a second user called `Eric Employee`. + +### Manager Role + +We want to enable Alice to assign roles to users in her organization in a self-service manner. +To make this happen, we need give Alice an [Manager Role](https://docs.zitadel.com/docs/concepts/structure/managers) within the Organization `Demo-Customer`. + +Still in the organization `Demo-Customer`, navigate to Organization. Click on the plus on the top right and give `Alice Admin` the Manager Role `Org Owner`. + +Login with your user on the customer organization to validate the setup. + +## Create a project grant + +### Organization Grant + +Switch to the `Demo-Vendor` organization, select Projects in the navigation, and click on `Portal` and then `Grants`. +[Grant all roles of the Project](https://docs.zitadel.com/docs/guides/basics/projects#exercise---grant-a-project) to the organization `demo-customer.{YourDomain}.zitadel.cloud`. + +### Authorization + +As you have guessed, these two users need to be authorized. +On the `Demo-Customer` organization, navigate to Projects and select "Granted Projects" in the sub-navigation. +Select the project portal `Portal` and navigate to "Authorizations". + +Give `Alice Admin` the roles `reader` and `admin`. +`Eric Employee` will get only the role `reader`. + +### Login + +You should be able to login to the Demo Application with `Alice Admin` and see all granted projects. + +You can log out and log in with `Eric Employee` and you should only have access to the granted projects, but not to the Authorizations tab. + +## What next + +You could create another project (eg, `Data Cube`) and grant that project to the customer organization. The granted project should appear after a reload automatically. This gives you an idea of how you could do Service Discovery with ZITADEL. + +You could also build out the code (PRs welcome :wink:) for this application, for example: + +- Create a mock `datacube-web` application and show how SSO between the portal and the application works with ZITADEL. +- Implement a feature in the Authorization tab to assign roles directly from the customer portal. diff --git a/docs/docs/examples/login/nextjs.md b/docs/docs/examples/login/nextjs.md index 0951dd288d..00a6066bce 100644 --- a/docs/docs/examples/login/nextjs.md +++ b/docs/docs/examples/login/nextjs.md @@ -8,25 +8,15 @@ This is our Zitadel [Next.js](https://nextjs.org/) template. It shows how to aut ## Getting Started -First, we start by creating a new NextJS app with `npx create-next-app`, which sets up everything automatically for you. To create a project, run: +### Install dependencies + +To install the dependencies type: ```bash -npx create-next-app --typescript -# or -yarn create next-app --typescript +yarn install ``` -### Install Authentication library - -To keep the template as easy as possible we use [next-auth](https://next-auth.js.org/) as our main authentication library. To install, run: - -```bash -npm i next-auth -# or -yarn add next-auth -``` - -To run the app, type: +then to run the app: ```bash npm run dev diff --git a/docs/docs/guides/integrate/pat.md b/docs/docs/guides/integrate/pat.md new file mode 100644 index 0000000000..102209fb49 --- /dev/null +++ b/docs/docs/guides/integrate/pat.md @@ -0,0 +1,45 @@ +--- +title: PAT (Personal Access Token) +--- + + +A Personal Access Token (PAT) is a ready to use token which can be used as _Authorization_ header. +At the moment ZITADEL only allows PATs for machine accounts (service users). + +It is an alternative to the JWT profile authentication where the service user has a key to authenticate. Read more about that [here](serviceusers) + +## Create a Service User with a PAT + + +1. Navigate to Service Users +2. Click on **New** +3. Enter a user name and a display name +4. Click on the Personal Access Token menu point in the detail of your user +5. Click on **New** +6. You can either set an expiration date or leave it empty if you don't want it to expire +7. Copy the token from the dialog (You will not see this again) + +![Create new service user](/img/guides/console-service-user-pat.gif) + +## Grant role for ZITADEL + +To be able to access the ZITADEL APIs your service user needs permissions to ZITADEL. + +1. Go to the detail page of your organization +2. Click in the top right corner the "+" button +3. Search for your service user +4. Give the user the role you need, for the example we choose Org Owner (More about [ZITADEL Permissions](../manage/console/managers)) + +![Add org owner to service user](/img/guides/console-service-user-org-owner.gif) + + +## Call ZITADEL API with PAT + +Because the PAT is a ready to use Token, you can add it as Authorization Header and send it in your requests to the ZITADEL API. +In this example we read the organization of the service user. + +```bash +curl --request GET \ + --url {your-domain}/management/v1/orgs/me \ + --header 'Authorization: Bearer {PAT}' +``` \ No newline at end of file diff --git a/docs/docs/guides/manage/console/instance-settings.mdx b/docs/docs/guides/manage/console/instance-settings.mdx index ed1528f4df..0ad29e4a49 100644 --- a/docs/docs/guides/manage/console/instance-settings.mdx +++ b/docs/docs/guides/manage/console/instance-settings.mdx @@ -173,7 +173,7 @@ To be able to use the email as username you have to disable the attribute "User This means that all your users will not be suffixed with the domain of your organization and you can enter the email as username. All usernames will then be globally unique within your instance. -You can either set this attribute on your whole ZITADEL instance or just on some specific orgnizations. +You can either set this attribute on your whole ZITADEL instance or just on some specific organizations. ## Privacy Policy and TOS diff --git a/docs/docs/guides/manage/self-hosted/configure/configure.mdx b/docs/docs/guides/manage/self-hosted/configure/configure.mdx index 221b5a163c..561ff5e286 100644 --- a/docs/docs/guides/manage/self-hosted/configure/configure.mdx +++ b/docs/docs/guides/manage/self-hosted/configure/configure.mdx @@ -19,7 +19,7 @@ See a description of all possible _runtime configuration_ options with their def The `zitadel` binary expects the `--config` flag for this configuration. ### Database Initialization -Apart from these options, ZITADEL uses a [different configuration](https://github.com/zitadel/zitadel/blob/main/cmd/admin/setup/steps.yaml) for _database initialization steps_. +Apart from these options, ZITADEL uses a [different configuration](https://github.com/zitadel/zitadel/blob/main/cmd/setup/steps.yaml) for _database initialization steps_. The `zitadel` binary expects the `--steps` flag for this configuration. ### Split Configuration diff --git a/docs/docs/guides/manage/self-hosted/production.md b/docs/docs/guides/manage/self-hosted/production.md index 9a3fca85ac..83079e7c97 100644 --- a/docs/docs/guides/manage/self-hosted/production.md +++ b/docs/docs/guides/manage/self-hosted/production.md @@ -19,6 +19,8 @@ Read [on the configure page](/docs/guides/manage/self-hosted/configure) about th - To enable and restrict access to **HTTPS**, head over to [the description of your TLS options](/docs/guides/manage/self-hosted/tls_modes). - If you want to front ZITADEL with a reverse proxy, web application firewall or content delivery network, make sure to support **[HTTP/2](/docs/guides/manage/self-hosted/http2)**. - You can also refer to some **[example reverse proxy configurations](/docs/guides/manage/self-hosted/reverseproxy/reverse_proxy)**. +- The ZITADEL Console web GUI uses many gRPC-Web stubs. This results in a fairly big JavaScript bundle. You might want to compress it using [Gzip](https://www.gnu.org/software/gzip/) or [Brotli](https://github.com/google/brotli). +- Serving and caching the assets using a content delivery network could improve network latencies and shield your ZITADEL runtime. ## Monitoring @@ -36,6 +38,16 @@ Tracing: ## Database +### Prefer CockroachDB + +ZITADEL supports [CockroachDB](https://www.cockroachlabs.com/) and [PostgreSQL](https://www.postgresql.org/). +We highly recommend using CockroachDB, +as horizontal scaling is much easier than with PostgreSQL. +Also, if you are concerned about multi-regional data locality, +[the way to go is with CockroachDB](https://www.cockroachlabs.com/docs/stable/multiregion-overview.html). + +### Configure ZITADEL + Depending on your environment, you maybe would want to tweak some settings about how ZITADEL interacts with the database in the database section of your ZITADEL configuration. Read more about your [database configuration options](/docs/guides/manage/self-hosted/database). ```yaml @@ -67,6 +79,25 @@ Projections: BulkLimit: 2000 ``` +### Manage your Data + +When designing your backup strategy, +it is worth knowing that +[ZITADEL is event sourced](/docs/concepts/eventstore/overview). +That means, ZITADEL itself is able to recompute its +whole state from the records in the table eventstore.events. +The timestamp of your last record in the events table +defines up to which point in time ZITADEL can restore its state. + +The ZITADEL binary itself is stateless, +so there is no need for a special backup job. + +Generally, for maintaining your database management system in production, +please refer to the corresponding docs +[for CockroachDB](https://www.cockroachlabs.com/docs/stable/recommended-production-settings.html) +or [for PostgreSQL](https://www.postgresql.org/docs/current/admin.html). + + ## Data Initialization - You can configure instance defaults in the DefaultInstance section. diff --git a/docs/docs/guides/solution-scenarios/b2b.mdx b/docs/docs/guides/solution-scenarios/b2b.mdx index 9760d77c99..2c8f1156c4 100644 --- a/docs/docs/guides/solution-scenarios/b2b.mdx +++ b/docs/docs/guides/solution-scenarios/b2b.mdx @@ -2,7 +2,7 @@ title: B2B --- -import { B2B } from '../../../src/components/b2b'; +import { B2B } from "../../../src/components/b2b"; ## Business to Business @@ -13,6 +13,7 @@ In ZITADEL a B2B organization represents a business partner or partner who typic B2B can be a simple scenario where an organization only shares one of its projects with another organization or have a more complex case where an organization is offering a portal application to all its partners with included (self)administration. + ## Sample scenario Octagon is a fictitious company which is used throughout this guide to explain the details and key concepts of such a B2B scenario. @@ -21,9 +22,10 @@ Octagon tries to solve multiple tasks in the banking field. Its portfolio includ ### Portal Application Octagon has a **Portal application** where its employees can access their account and list all applications they are allowed to use. -Employees work for a department within Octagon or for Octagon itself. +Employees work for a department within Octagon or for Octagon itself. Some of the users have enhanced features because they supervise certain teams. Those can onboard new employees and manage their roles and features. Target groups of the application can be split into: + - **Employees:** users who are using the application as a starting point for their work. - **Supervisors:** users who are mainly using the application to manage users and their access of their department. - **Administrators:** this users are able to grant additional organizations or departments and elect supervisors. @@ -39,7 +41,7 @@ In order to define the need of the **Portal Application** some planning consider ### Login You can decide whether a organization is preselected for the login or if the user is redirected to the default login screen. You can send the user to a specific organization by defining the organization in a custom scope. (primary domain) -Settings to the branding or the login options of the organization can be made from the organization section in [Console](https://{your_domain}.zitadel.cloud/ui/console/org). +Settings to the branding or the login options of the organization can be made from the organization section in [Console](https://{your_domain}.zitadel.cloud/ui/console/org). The behaviour of the login branding can be set in your projects detail page. You can choose the branding of the selected organization, the user resource owner, or the projects resource owner. ### Organizations @@ -56,7 +58,7 @@ In our sample scenario, we assume to have the following users: - **Bill:** is employed at Octagon as Administrator of the Portal Application. Bill also uses a Microsoft Account in combination with a Security Key to secure his account. After having determined the constellation of the organizations and its users, all the necessary data (Portal project with roles and app, users, login requirements, identity providers, branding) should be set up in [Console](https://{your_domain}.zitadel.cloud/ui/console/org). -A B2B sample application for NextJS can be found in our [Example Repo](https://github.com/zitadel/zitadel-examples). +A B2B [sample application](https://github.com/zitadel/zitadel-nextjs-b2b). for NextJS can be found [here](../../examples/login/nextjs-b2b). To allow another organization to use a project, a project grant has to be created. Upon creation, roles for a grant can be limited to a subset of the total project roles. @@ -70,7 +72,7 @@ In this scenario, Dimitri and Michael share the same organization Pentagon, wher > Note: Roles are meant for internal business logic and therefore need to be validated separately, none of the users described are allowed to create user grants, at least if they do not own a ZITADEL manager role. -If you made a dashboard where some users are able to create user grants, the Management API to do such operations should be triggered with the personal access token of the users, not with a token of a machine user, to create a meaningful audit log. +If you made a dashboard where some users are able to create user grants, the Management API to do such operations should be triggered with the personal access token of the users, not with a token of a machine user, to create a meaningful audit log. If you had such a use case, ZITADEL manager roles must be assigned to those users. ### Noteworthy @@ -82,4 +84,4 @@ In such a case with this high potential of scalability where user counts can gro - [Creating an organization](../manage/console/organizations) - [Organization Branding](../manage/customize/branding) -- [Authorization](../integrate/oauth-recommended-flows) \ No newline at end of file +- [Authorization](../integrate/oauth-recommended-flows) diff --git a/docs/docs/guides/solution-scenarios/configurations.mdx b/docs/docs/guides/solution-scenarios/configurations.mdx index d7f97b7a1e..746bbc2325 100644 --- a/docs/docs/guides/solution-scenarios/configurations.mdx +++ b/docs/docs/guides/solution-scenarios/configurations.mdx @@ -45,4 +45,32 @@ This will have the following impacts: - Only allow users from selected organization to login To request the organization send either the the organization id (`urn:zitadel:iam:org:id:{id}`) or organization primary domain (`urn:zitadel:iam:org:domain:primary:{domainname}`) scope on your authentication request from your application. -More about the [scopes](../../apis/openidoauth/scopes#reserved-scopes) \ No newline at end of file +More about the [scopes](../../apis/openidoauth/scopes#reserved-scopes) + +## Use email to login + +There are two different possibilities to achieve login with an email. + +1. Use an email address as username +2. Use the email field of the user as additional login to the username + +![Domain Policy: Organization domain as suffix](/img/guides/scenarios/domain_policy_org_domain_disabled.png) + +### Use an email address as username + +To be able to use the email as username you have to disable the attribute "User Loginname must contain orgdomain" on your domain settings. +This means that all your users will not be suffixed with the domain of your organization and you can enter the email as username. +All usernames will then be globally unique within your instance. + +You can either set this attribute on your whole ZITADEL instance or just on some specific organizations. + +### Use the email field of the user as additional login to the username + +No matter how the username of your user does look like. +You can additionally allow login with the email attribute of the user. + +You can find this in the "Login Behaviour and Security" Setting of your instance or organizations. +Go to the "Advanced" section, per default login with email address should be allowed. It is possible to disable it. + +![Login Policy Advanced Setting: Disable email for login](/img/guides/scenarios/login_policy_advanced.png) + diff --git a/docs/netlify.toml b/docs/netlify.toml index 939f53f66b..e77bce6989 100644 --- a/docs/netlify.toml +++ b/docs/netlify.toml @@ -3,7 +3,7 @@ [[redirects]] from = "/proxy/js/script.js" - to = "https://plausible.io/js/plausible.js" + to = "https://plausible.io/js/script.outbound-links.js" status = 200 force = true diff --git a/docs/sidebars.js b/docs/sidebars.js index bc82d64342..d733751cb1 100644 --- a/docs/sidebars.js +++ b/docs/sidebars.js @@ -9,6 +9,7 @@ module.exports = { "examples/login/react", "examples/login/flutter", "examples/login/nextjs", + "examples/login/nextjs-b2b", ], collapsed: false, }, @@ -132,6 +133,7 @@ module.exports = { items: [ "guides/integrate/serviceusers", "guides/integrate/access-zitadel-apis", + "guides/integrate/pat", "guides/integrate/access-zitadel-system-api", "guides/integrate/export-and-import", ], diff --git a/docs/src/css/card.module.css b/docs/src/css/card.module.css index 1a37215a96..315cb02f6a 100644 --- a/docs/src/css/card.module.css +++ b/docs/src/css/card.module.css @@ -29,6 +29,7 @@ background-position: center; padding: 0.5rem 0; pointer-events: none; + box-shadow: none !important; } .fillspace { diff --git a/docs/static/img/guides/console-service-user-org-owner.gif b/docs/static/img/guides/console-service-user-org-owner.gif new file mode 100644 index 0000000000..fc44516b9f Binary files /dev/null and b/docs/static/img/guides/console-service-user-org-owner.gif differ diff --git a/docs/static/img/guides/console-service-user-pat.gif b/docs/static/img/guides/console-service-user-pat.gif new file mode 100644 index 0000000000..932759aa87 Binary files /dev/null and b/docs/static/img/guides/console-service-user-pat.gif differ diff --git a/docs/static/img/guides/scenarios/domain_policy_org_domain_disabled.png b/docs/static/img/guides/scenarios/domain_policy_org_domain_disabled.png new file mode 100644 index 0000000000..1c44c3ce00 Binary files /dev/null and b/docs/static/img/guides/scenarios/domain_policy_org_domain_disabled.png differ diff --git a/docs/static/img/guides/scenarios/login_policy_advanced.png b/docs/static/img/guides/scenarios/login_policy_advanced.png new file mode 100644 index 0000000000..3804f76973 Binary files /dev/null and b/docs/static/img/guides/scenarios/login_policy_advanced.png differ diff --git a/docs/static/img/nextjs-b2b/home.png b/docs/static/img/nextjs-b2b/home.png new file mode 100644 index 0000000000..966ecddb1f Binary files /dev/null and b/docs/static/img/nextjs-b2b/home.png differ diff --git a/internal/admin/repository/eventsourcing/handler/handler.go b/internal/admin/repository/eventsourcing/handler/handler.go index 4a89030a09..e95e0232e3 100644 --- a/internal/admin/repository/eventsourcing/handler/handler.go +++ b/internal/admin/repository/eventsourcing/handler/handler.go @@ -1,6 +1,7 @@ package handler import ( + "context" "time" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" @@ -28,10 +29,10 @@ func (h *handler) Eventstore() v1.Eventstore { return h.es } -func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler { +func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler { handlers := []query.Handler{} if static != nil { - handlers = append(handlers, newStyling( + handlers = append(handlers, newStyling(ctx, handler{view, bulkLimit, configs.cycleDuration("Styling"), errorCount, es}, static)) } diff --git a/internal/admin/repository/eventsourcing/handler/styling.go b/internal/admin/repository/eventsourcing/handler/styling.go index 028e116cc0..8a8553b528 100644 --- a/internal/admin/repository/eventsourcing/handler/styling.go +++ b/internal/admin/repository/eventsourcing/handler/styling.go @@ -34,21 +34,21 @@ type Styling struct { subscription *v1.Subscription } -func newStyling(handler handler, static static.Storage) *Styling { +func newStyling(ctx context.Context, handler handler, static static.Storage) *Styling { h := &Styling{ handler: handler, static: static, } - h.subscribe() + h.subscribe(ctx) return h } -func (m *Styling) subscribe() { +func (m *Styling) subscribe(ctx context.Context) { m.subscription = m.es.Subscribe(m.AggregateTypes()...) go func() { for event := range m.subscription.Events { - query.ReduceEvent(m, event) + query.ReduceEvent(ctx, m, event) } }() } @@ -73,15 +73,15 @@ func (m *Styling) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := m.view.GetLatestStylingSequences(instanceIDs...) +func (m *Styling) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { + sequences, err := m.view.GetLatestStylingSequences(instanceIDs) if err != nil { return nil, err } searchQuery := models.NewSearchQuery() - for _, sequence := range sequences { + for _, instanceID := range instanceIDs { var seq uint64 - for _, instanceID := range instanceIDs { + for _, sequence := range sequences { if sequence.InstanceID == instanceID { seq = sequence.CurrentSequence break @@ -90,7 +90,7 @@ func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) searchQuery.AddQuery(). AggregateTypeFilter(m.AggregateTypes()...). LatestSequenceFilter(seq). - InstanceIDFilter(sequence.InstanceID) + InstanceIDFilter(instanceID) } return searchQuery, nil } @@ -166,12 +166,12 @@ func (m *Styling) processLabelPolicy(event *models.Event) (err error) { } func (m *Styling) OnError(event *models.Event, err error) error { - logging.LogWithFields("SPOOL-2m9fs", "id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler") return spooler.HandleError(event, err, m.view.GetLatestStylingFailedEvent, m.view.ProcessedStylingFailedEvent, m.view.ProcessedStylingSequence, m.errorCountUntilSkip) } -func (m *Styling) OnSuccess() error { - return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp) +func (m *Styling) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp, instanceIDs) } func (m *Styling) generateStylingFile(policy *iam_model.LabelPolicyView) error { diff --git a/internal/admin/repository/eventsourcing/repository.go b/internal/admin/repository/eventsourcing/repository.go index 0d8167cca8..63bcb6b15f 100644 --- a/internal/admin/repository/eventsourcing/repository.go +++ b/internal/admin/repository/eventsourcing/repository.go @@ -7,6 +7,7 @@ import ( "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/eventstore" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/spooler" admin_view "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" + eventstore2 "github.com/zitadel/zitadel/internal/eventstore" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/static" @@ -22,7 +23,7 @@ type EsRepository struct { eventstore.AdministratorRepo } -func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository, error) { +func Start(ctx context.Context, conf Config, static static.Storage, dbClient *sql.DB, esV2 *eventstore2.Eventstore) (*EsRepository, error) { es, err := v1.Start(dbClient) if err != nil { return nil, err @@ -32,7 +33,7 @@ func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository, return nil, err } - spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, static) + spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, static) return &EsRepository{ spooler: spool, diff --git a/internal/admin/repository/eventsourcing/spooler/spooler.go b/internal/admin/repository/eventsourcing/spooler/spooler.go index 6ab695fd75..98542b309f 100644 --- a/internal/admin/repository/eventsourcing/spooler/spooler.go +++ b/internal/admin/repository/eventsourcing/spooler/spooler.go @@ -1,14 +1,15 @@ package spooler import ( + "context" "database/sql" - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - "github.com/zitadel/zitadel/internal/static" - "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/handler" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" + "github.com/zitadel/zitadel/internal/eventstore" + v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" + "github.com/zitadel/zitadel/internal/static" ) type SpoolerConfig struct { @@ -19,13 +20,14 @@ type SpoolerConfig struct { Handlers handler.Configs } -func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler { +func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler { spoolerConfig := spooler.Config{ Eventstore: es, + EventstoreV2: esV2, Locker: &locker{dbClient: sql}, ConcurrentWorkers: c.ConcurrentWorkers, ConcurrentInstances: c.ConcurrentInstances, - ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static), + ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static), } spool := spoolerConfig.New() spool.Start() diff --git a/internal/admin/repository/eventsourcing/view/sequence.go b/internal/admin/repository/eventsourcing/view/sequence.go index 868d9633b3..4985fbef48 100644 --- a/internal/admin/repository/eventsourcing/view/sequence.go +++ b/internal/admin/repository/eventsourcing/view/sequence.go @@ -19,16 +19,16 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) } -func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...) +func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) { + return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) } func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) { return repository.AllCurrentSequences(v.Db, db+".current_sequences", instanceID) } -func (v *View) updateSpoolerRunSequence(viewName string) error { - currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) +func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error { + currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) if err != nil { return err } @@ -41,12 +41,6 @@ func (v *View) updateSpoolerRunSequence(viewName string) error { return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences) } -func (v *View) GetCurrentSequence(db, viewName string) ([]*repository.CurrentSequence, error) { - sequenceTable := db + ".current_sequences" - fullView := db + "." + viewName - return repository.LatestSequences(v.Db, sequenceTable, fullView) -} - func (v *View) ClearView(db, viewName string) error { truncateView := db + "." + viewName sequenceTable := db + ".current_sequences" diff --git a/internal/admin/repository/eventsourcing/view/styling.go b/internal/admin/repository/eventsourcing/view/styling.go index e0b3aa24a4..b688a5544b 100644 --- a/internal/admin/repository/eventsourcing/view/styling.go +++ b/internal/admin/repository/eventsourcing/view/styling.go @@ -35,16 +35,16 @@ func (v *View) GetLatestStylingSequence(instanceID string) (*global_view.Current return v.latestSequence(stylingTyble, instanceID) } -func (v *View) GetLatestStylingSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(stylingTyble, instanceIDs...) +func (v *View) GetLatestStylingSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(stylingTyble, instanceIDs) } func (v *View) ProcessedStylingSequence(event *models.Event) error { return v.saveCurrentSequence(stylingTyble, event) } -func (v *View) UpdateStylingSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(stylingTyble) +func (v *View) UpdateStylingSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(stylingTyble, instanceIDs) } func (v *View) GetLatestStylingFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/handler/handler.go b/internal/auth/repository/eventsourcing/handler/handler.go index 59f50554d5..6eb3a836c6 100644 --- a/internal/auth/repository/eventsourcing/handler/handler.go +++ b/internal/auth/repository/eventsourcing/handler/handler.go @@ -33,24 +33,24 @@ func (h *handler) Eventstore() v1.Eventstore { return h.es } -func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler { +func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler { return []query.Handler{ - newUser( + newUser(ctx, handler{view, bulkLimit, configs.cycleDuration("User"), errorCount, es}, queries), - newUserSession( + newUserSession(ctx, handler{view, bulkLimit, configs.cycleDuration("UserSession"), errorCount, es}, queries), - newToken( + newToken(ctx, handler{view, bulkLimit, configs.cycleDuration("Token"), errorCount, es}), - newIDPConfig( + newIDPConfig(ctx, handler{view, bulkLimit, configs.cycleDuration("IDPConfig"), errorCount, es}), - newIDPProvider( + newIDPProvider(ctx, handler{view, bulkLimit, configs.cycleDuration("IDPProvider"), errorCount, es}, systemDefaults, queries), - newExternalIDP( + newExternalIDP(ctx, handler{view, bulkLimit, configs.cycleDuration("ExternalIDP"), errorCount, es}, systemDefaults, queries), - newRefreshToken(handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}), - newOrgProjectMapping(handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}), + newRefreshToken(ctx, handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}), + newOrgProjectMapping(ctx, handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}), } } @@ -80,9 +80,9 @@ func withInstanceID(ctx context.Context, instanceID string) context.Context { func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []models.AggregateType, instanceIDs []string) *models.SearchQuery { searchQuery := models.NewSearchQuery() - for _, sequence := range sequences { + for _, instanceID := range instanceIDs { var seq uint64 - for _, instanceID := range instanceIDs { + for _, sequence := range sequences { if sequence.InstanceID == instanceID { seq = sequence.CurrentSequence break @@ -91,7 +91,7 @@ func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []mo searchQuery.AddQuery(). AggregateTypeFilter(aggregateTypes...). LatestSequenceFilter(seq). - InstanceIDFilter(sequence.InstanceID) + InstanceIDFilter(instanceID) } return searchQuery } diff --git a/internal/auth/repository/eventsourcing/handler/idp_config.go b/internal/auth/repository/eventsourcing/handler/idp_config.go index de0315d85e..c2157f1df2 100644 --- a/internal/auth/repository/eventsourcing/handler/idp_config.go +++ b/internal/auth/repository/eventsourcing/handler/idp_config.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -23,21 +25,21 @@ type IDPConfig struct { subscription *v1.Subscription } -func newIDPConfig(h handler) *IDPConfig { +func newIDPConfig(ctx context.Context, h handler) *IDPConfig { idpConfig := &IDPConfig{ handler: h, } - idpConfig.subscribe() + idpConfig.subscribe(ctx) return idpConfig } -func (i *IDPConfig) subscribe() { +func (i *IDPConfig) subscribe(ctx context.Context) { i.subscription = i.es.Subscribe(i.AggregateTypes()...) go func() { for event := range i.subscription.Events { - query.ReduceEvent(i, event) + query.ReduceEvent(ctx, i, event) } }() } @@ -62,8 +64,8 @@ func (i *IDPConfig) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (i *IDPConfig) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs...) +func (i *IDPConfig) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { + sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs) if err != nil { return nil, err } @@ -129,10 +131,10 @@ func (i *IDPConfig) processIdpConfig(providerType iam_model.IDPProviderType, eve } func (i *IDPConfig) OnError(event *models.Event, err error) error { - logging.LogWithFields("SPOOL-Ejf8s", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler") return spooler.HandleError(event, err, i.view.GetLatestIDPConfigFailedEvent, i.view.ProcessedIDPConfigFailedEvent, i.view.ProcessedIDPConfigSequence, i.errorCountUntilSkip) } -func (i *IDPConfig) OnSuccess() error { - return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp) +func (i *IDPConfig) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp, instanceIDs) } diff --git a/internal/auth/repository/eventsourcing/handler/idp_providers.go b/internal/auth/repository/eventsourcing/handler/idp_providers.go index 7d71340bad..35bec063d4 100644 --- a/internal/auth/repository/eventsourcing/handler/idp_providers.go +++ b/internal/auth/repository/eventsourcing/handler/idp_providers.go @@ -32,6 +32,7 @@ type IDPProvider struct { } func newIDPProvider( + ctx context.Context, h handler, defaults systemdefaults.SystemDefaults, queries *query2.Queries, @@ -42,16 +43,16 @@ func newIDPProvider( queries: queries, } - idpProvider.subscribe() + idpProvider.subscribe(ctx) return idpProvider } -func (i *IDPProvider) subscribe() { +func (i *IDPProvider) subscribe(ctx context.Context) { i.subscription = i.es.Subscribe(i.AggregateTypes()...) go func() { for event := range i.subscription.Events { - query.ReduceEvent(i, event) + query.ReduceEvent(ctx, i, event) } }() } @@ -76,8 +77,8 @@ func (i *IDPProvider) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (i *IDPProvider) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs...) +func (i *IDPProvider) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs) if err != nil { return nil, err } @@ -188,14 +189,14 @@ func (i *IDPProvider) OnError(event *es_models.Event, err error) error { return spooler.HandleError(event, err, i.view.GetLatestIDPProviderFailedEvent, i.view.ProcessedIDPProviderFailedEvent, i.view.ProcessedIDPProviderSequence, i.errorCountUntilSkip) } -func (i *IDPProvider) OnSuccess() error { - return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp) +func (i *IDPProvider) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp, instanceIDs) } func (i *IDPProvider) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) { return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, aggregateID) } -func (u *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) { - return u.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID) +func (i *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) { + return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID) } diff --git a/internal/auth/repository/eventsourcing/handler/org_project_mapping.go b/internal/auth/repository/eventsourcing/handler/org_project_mapping.go index 4148292829..7236c8a115 100644 --- a/internal/auth/repository/eventsourcing/handler/org_project_mapping.go +++ b/internal/auth/repository/eventsourcing/handler/org_project_mapping.go @@ -1,6 +1,8 @@ package handler import ( + "context" + "github.com/zitadel/logging" "github.com/zitadel/zitadel/internal/eventstore" @@ -23,22 +25,23 @@ type OrgProjectMapping struct { } func newOrgProjectMapping( + ctx context.Context, handler handler, ) *OrgProjectMapping { h := &OrgProjectMapping{ handler: handler, } - h.subscribe() + h.subscribe(ctx) return h } -func (k *OrgProjectMapping) subscribe() { - k.subscription = k.es.Subscribe(k.AggregateTypes()...) +func (p *OrgProjectMapping) subscribe(ctx context.Context) { + p.subscription = p.es.Subscribe(p.AggregateTypes()...) go func() { - for event := range k.subscription.Events { - query.ReduceEvent(k, event) + for event := range p.subscription.Events { + query.ReduceEvent(ctx, p, event) } }() } @@ -63,8 +66,8 @@ func (p *OrgProjectMapping) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (p *OrgProjectMapping) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs...) +func (p *OrgProjectMapping) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs) if err != nil { return nil, err } @@ -85,15 +88,21 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) { } case project.GrantAddedType: projectGrant := new(view_model.ProjectGrant) - projectGrant.SetData(event) + err := projectGrant.SetData(event) + if err != nil { + return err + } mapping.OrgID = projectGrant.GrantedOrgID mapping.ProjectID = event.AggregateID mapping.ProjectGrantID = projectGrant.GrantID mapping.InstanceID = event.InstanceID case project.GrantRemovedType: projectGrant := new(view_model.ProjectGrant) - projectGrant.SetData(event) - err := p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID) + err := projectGrant.SetData(event) + if err != nil { + return err + } + err = p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID) if err == nil { return p.view.ProcessedOrgProjectMappingSequence(event) } @@ -109,10 +118,10 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) { } func (p *OrgProjectMapping) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-2k0fS", "id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler") return spooler.HandleError(event, err, p.view.GetLatestOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingSequence, p.errorCountUntilSkip) } -func (p *OrgProjectMapping) OnSuccess() error { - return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp) +func (p *OrgProjectMapping) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp, instanceIDs) } diff --git a/internal/auth/repository/eventsourcing/handler/refresh_token.go b/internal/auth/repository/eventsourcing/handler/refresh_token.go index 7dce7d7c34..99499f73b6 100644 --- a/internal/auth/repository/eventsourcing/handler/refresh_token.go +++ b/internal/auth/repository/eventsourcing/handler/refresh_token.go @@ -1,6 +1,7 @@ package handler import ( + "context" "encoding/json" "github.com/zitadel/logging" @@ -27,22 +28,23 @@ type RefreshToken struct { } func newRefreshToken( + ctx context.Context, handler handler, ) *RefreshToken { h := &RefreshToken{ handler: handler, } - h.subscribe() + h.subscribe(ctx) return h } -func (t *RefreshToken) subscribe() { +func (t *RefreshToken) subscribe(ctx context.Context) { t.subscription = t.es.Subscribe(t.AggregateTypes()...) go func() { for event := range t.subscription.Events { - query.ReduceEvent(t, event) + query.ReduceEvent(ctx, t, event) } }() } @@ -67,8 +69,8 @@ func (t *RefreshToken) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (t *RefreshToken) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs...) +func (t *RefreshToken) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs) if err != nil { return nil, err } @@ -87,7 +89,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) { case user.HumanRefreshTokenRenewedType: e := new(user.HumanRefreshTokenRenewedEvent) if err := json.Unmarshal(event.Data, e); err != nil { - logging.Log("EVEN-DBbn4").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return caos_errs.ThrowInternal(nil, "MODEL-BHn75", "could not unmarshal data") } token, err := t.view.RefreshTokenByID(e.TokenID, event.InstanceID) @@ -102,7 +104,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) { case user.HumanRefreshTokenRemovedType: e := new(user.HumanRefreshTokenRemovedEvent) if err := json.Unmarshal(event.Data, e); err != nil { - logging.Log("EVEN-BDbh3").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return caos_errs.ThrowInternal(nil, "MODEL-Bz653", "could not unmarshal data") } return t.view.DeleteRefreshToken(e.TokenID, event.InstanceID, event) @@ -118,10 +120,10 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) { } func (t *RefreshToken) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") - return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip) + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") + return spooler.HandleError(event, err, t.view.GetLatestRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenSequence, t.errorCountUntilSkip) } -func (t *RefreshToken) OnSuccess() error { - return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp) +func (t *RefreshToken) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(t.view.UpdateRefreshTokenSpoolerRunTimestamp, instanceIDs) } diff --git a/internal/auth/repository/eventsourcing/handler/token.go b/internal/auth/repository/eventsourcing/handler/token.go index e93ef7b14e..22c531ccb4 100644 --- a/internal/auth/repository/eventsourcing/handler/token.go +++ b/internal/auth/repository/eventsourcing/handler/token.go @@ -33,22 +33,23 @@ type Token struct { } func newToken( + ctx context.Context, handler handler, ) *Token { h := &Token{ handler: handler, } - h.subscribe() + h.subscribe(ctx) return h } -func (t *Token) subscribe() { +func (t *Token) subscribe(ctx context.Context) { t.subscription = t.es.Subscribe(t.AggregateTypes()...) go func() { for event := range t.subscription.Events { - query.ReduceEvent(t, event) + query.ReduceEvent(ctx, t, event) } }() } @@ -65,16 +66,16 @@ func (_ *Token) AggregateTypes() []es_models.AggregateType { return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType} } -func (p *Token) CurrentSequence(instanceID string) (uint64, error) { - sequence, err := p.view.GetLatestTokenSequence(instanceID) +func (t *Token) CurrentSequence(instanceID string) (uint64, error) { + sequence, err := t.view.GetLatestTokenSequence(instanceID) if err != nil { return 0, err } return sequence.CurrentSequence, nil } -func (t *Token) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := t.view.GetLatestTokenSequences(instanceIDs...) +func (t *Token) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := t.view.GetLatestTokenSequences(instanceIDs) if err != nil { return nil, err } @@ -94,7 +95,10 @@ func (t *Token) Reduce(event *es_models.Event) (err error) { case user.UserV1ProfileChangedType, user.HumanProfileChangedType: user := new(view_model.UserView) - user.AppendEvent(event) + err := user.AppendEvent(event) + if err != nil { + return err + } tokens, err := t.view.TokensByUserID(event.AggregateID, event.InstanceID) if err != nil { return err @@ -153,14 +157,14 @@ func (t *Token) Reduce(event *es_models.Event) (err error) { } func (t *Token) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip) } func agentIDFromSession(event *es_models.Event) (string, error) { session := make(map[string]interface{}) if err := json.Unmarshal(event.Data, &session); err != nil { - logging.Log("EVEN-s3bq9").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return "", caos_errs.ThrowInternal(nil, "MODEL-sd325", "could not unmarshal data") } return session["userAgentID"].(string), nil @@ -169,7 +173,7 @@ func agentIDFromSession(event *es_models.Event) (string, error) { func applicationFromSession(event *es_models.Event) (*project_es_model.Application, error) { application := new(project_es_model.Application) if err := json.Unmarshal(event.Data, &application); err != nil { - logging.Log("EVEN-GRE2q").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return nil, caos_errs.ThrowInternal(nil, "MODEL-Hrw1q", "could not unmarshal data") } return application, nil @@ -178,7 +182,7 @@ func applicationFromSession(event *es_models.Event) (*project_es_model.Applicati func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) { removed := make(map[string]interface{}) if err := json.Unmarshal(event.Data, &removed); err != nil { - logging.Log("EVEN-Sdff3").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return "", caos_errs.ThrowInternal(nil, "MODEL-Sff32", "could not unmarshal data") } return removed["tokenId"].(string), nil @@ -187,14 +191,14 @@ func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) { func refreshTokenIDFromRemovedEvent(event *es_models.Event) (string, error) { removed := make(map[string]interface{}) if err := json.Unmarshal(event.Data, &removed); err != nil { - logging.Log("EVEN-Ff23g").WithError(err).Error("could not unmarshal event data") + logging.WithError(err).Error("could not unmarshal event data") return "", caos_errs.ThrowInternal(nil, "MODEL-Dfb3w", "could not unmarshal data") } return removed["tokenId"].(string), nil } -func (t *Token) OnSuccess() error { - return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp) +func (t *Token) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp, instanceIDs) } func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) { diff --git a/internal/auth/repository/eventsourcing/handler/user.go b/internal/auth/repository/eventsourcing/handler/user.go index e17ecffc9c..34130219be 100644 --- a/internal/auth/repository/eventsourcing/handler/user.go +++ b/internal/auth/repository/eventsourcing/handler/user.go @@ -34,6 +34,7 @@ type User struct { } func newUser( + ctx context.Context, handler handler, queries *query2.Queries, ) *User { @@ -42,16 +43,16 @@ func newUser( queries: queries, } - h.subscribe() + h.subscribe(ctx) return h } -func (k *User) subscribe() { - k.subscription = k.es.Subscribe(k.AggregateTypes()...) +func (u *User) subscribe(ctx context.Context) { + u.subscription = u.es.Subscribe(u.AggregateTypes()...) go func() { - for event := range k.subscription.Events { - query.ReduceEvent(k, event) + for event := range u.subscription.Events { + query.ReduceEvent(ctx, u, event) } }() } @@ -75,8 +76,8 @@ func (u *User) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (u *User) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := u.view.GetLatestUserSequences(instanceIDs...) +func (u *User) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := u.view.GetLatestUserSequences(instanceIDs) if err != nil { return nil, err } @@ -275,12 +276,12 @@ func (u *User) fillPreferredLoginNamesOnOrgUsers(event *es_models.Event) error { } func (u *User) OnError(event *es_models.Event, err error) error { - logging.LogWithFields("SPOOL-is8aAWima", "id", event.AggregateID).WithError(err).Warn("something went wrong in user handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user handler") return spooler.HandleError(event, err, u.view.GetLatestUserFailedEvent, u.view.ProcessedUserFailedEvent, u.view.ProcessedUserSequence, u.errorCountUntilSkip) } -func (u *User) OnSuccess() error { - return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp) +func (u *User) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp, instanceIDs) } func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) { diff --git a/internal/auth/repository/eventsourcing/handler/user_external_idps.go b/internal/auth/repository/eventsourcing/handler/user_external_idps.go index 101c9e05f0..08e5a5ccef 100644 --- a/internal/auth/repository/eventsourcing/handler/user_external_idps.go +++ b/internal/auth/repository/eventsourcing/handler/user_external_idps.go @@ -33,6 +33,7 @@ type ExternalIDP struct { } func newExternalIDP( + ctx context.Context, handler handler, defaults systemdefaults.SystemDefaults, queries *query2.Queries, @@ -43,16 +44,16 @@ func newExternalIDP( queries: queries, } - h.subscribe() + h.subscribe(ctx) return h } -func (i *ExternalIDP) subscribe() { +func (i *ExternalIDP) subscribe(ctx context.Context) { i.subscription = i.es.Subscribe(i.AggregateTypes()...) go func() { for event := range i.subscription.Events { - query.ReduceEvent(i, event) + query.ReduceEvent(ctx, i, event) } }() } @@ -77,8 +78,8 @@ func (i *ExternalIDP) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (i *ExternalIDP) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { - sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs...) +func (i *ExternalIDP) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) { + sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs) if err != nil { return nil, err } @@ -178,8 +179,8 @@ func (i *ExternalIDP) OnError(event *es_models.Event, err error) error { return spooler.HandleError(event, err, i.view.GetLatestExternalIDPFailedEvent, i.view.ProcessedExternalIDPFailedEvent, i.view.ProcessedExternalIDPSequence, i.errorCountUntilSkip) } -func (i *ExternalIDP) OnSuccess() error { - return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp) +func (i *ExternalIDP) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp, instanceIDs) } func (i *ExternalIDP) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) { diff --git a/internal/auth/repository/eventsourcing/handler/user_session.go b/internal/auth/repository/eventsourcing/handler/user_session.go index 2d81f189fb..f04105cbc0 100644 --- a/internal/auth/repository/eventsourcing/handler/user_session.go +++ b/internal/auth/repository/eventsourcing/handler/user_session.go @@ -33,22 +33,22 @@ type UserSession struct { queries *query2.Queries } -func newUserSession(handler handler, queries *query2.Queries) *UserSession { +func newUserSession(ctx context.Context, handler handler, queries *query2.Queries) *UserSession { h := &UserSession{ handler: handler, queries: queries, } - h.subscribe() + h.subscribe(ctx) return h } -func (k *UserSession) subscribe() { - k.subscription = k.es.Subscribe(k.AggregateTypes()...) +func (u *UserSession) subscribe(ctx context.Context) { + u.subscription = u.es.Subscribe(u.AggregateTypes()...) go func() { - for event := range k.subscription.Events { - query.ReduceEvent(k, event) + for event := range u.subscription.Events { + query.ReduceEvent(ctx, u, event) } }() } @@ -73,8 +73,8 @@ func (u *UserSession) CurrentSequence(instanceID string) (uint64, error) { return sequence.CurrentSequence, nil } -func (u *UserSession) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { - sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs...) +func (u *UserSession) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { + sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs) if err != nil { return nil, err } @@ -162,12 +162,12 @@ func (u *UserSession) Reduce(event *models.Event) (err error) { } func (u *UserSession) OnError(event *models.Event, err error) error { - logging.LogWithFields("SPOOL-sdfw3s", "id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler") + logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler") return spooler.HandleError(event, err, u.view.GetLatestUserSessionFailedEvent, u.view.ProcessedUserSessionFailedEvent, u.view.ProcessedUserSessionSequence, u.errorCountUntilSkip) } -func (u *UserSession) OnSuccess() error { - return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp) +func (u *UserSession) OnSuccess(instanceIDs []string) error { + return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp, instanceIDs) } func (u *UserSession) updateSession(session *view_model.UserSessionView, event *models.Event) error { diff --git a/internal/auth/repository/eventsourcing/repository.go b/internal/auth/repository/eventsourcing/repository.go index 4efdd041d0..a523108622 100644 --- a/internal/auth/repository/eventsourcing/repository.go +++ b/internal/auth/repository/eventsourcing/repository.go @@ -11,6 +11,7 @@ import ( "github.com/zitadel/zitadel/internal/command" sd "github.com/zitadel/zitadel/internal/config/systemdefaults" "github.com/zitadel/zitadel/internal/crypto" + eventstore2 "github.com/zitadel/zitadel/internal/eventstore" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/id" @@ -33,7 +34,7 @@ type EsRepository struct { eventstore.OrgRepository } -func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) { +func Start(ctx context.Context, conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, esV2 *eventstore2.Eventstore, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) { es, err := v1.Start(dbClient) if err != nil { return nil, err @@ -47,7 +48,7 @@ func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Comma authReq := cache.Start(dbClient) - spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, systemDefaults, queries) + spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, systemDefaults, queries) userRepo := eventstore.UserRepo{ SearchLimit: conf.SearchLimit, diff --git a/internal/auth/repository/eventsourcing/spooler/spooler.go b/internal/auth/repository/eventsourcing/spooler/spooler.go index a45c53a870..63185e2b0d 100644 --- a/internal/auth/repository/eventsourcing/spooler/spooler.go +++ b/internal/auth/repository/eventsourcing/spooler/spooler.go @@ -1,15 +1,16 @@ package spooler import ( + "context" "database/sql" - v1 "github.com/zitadel/zitadel/internal/eventstore/v1" - "github.com/zitadel/zitadel/internal/query" - "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/handler" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" sd "github.com/zitadel/zitadel/internal/config/systemdefaults" + "github.com/zitadel/zitadel/internal/eventstore" + v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" + "github.com/zitadel/zitadel/internal/query" ) type SpoolerConfig struct { @@ -20,13 +21,14 @@ type SpoolerConfig struct { Handlers handler.Configs } -func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler { +func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler { spoolerConfig := spooler.Config{ Eventstore: es, + EventstoreV2: esV2, Locker: &locker{dbClient: client}, ConcurrentWorkers: c.ConcurrentWorkers, ConcurrentInstances: c.ConcurrentInstances, - ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries), + ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries), } spool := spoolerConfig.New() spool.Start() diff --git a/internal/auth/repository/eventsourcing/view/external_idps.go b/internal/auth/repository/eventsourcing/view/external_idps.go index 5b7506c18c..103666e69d 100644 --- a/internal/auth/repository/eventsourcing/view/external_idps.go +++ b/internal/auth/repository/eventsourcing/view/external_idps.go @@ -68,16 +68,16 @@ func (v *View) GetLatestExternalIDPSequence(instanceID string) (*global_view.Cur return v.latestSequence(externalIDPTable, instanceID) } -func (v *View) GetLatestExternalIDPSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(externalIDPTable, instanceIDs...) +func (v *View) GetLatestExternalIDPSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(externalIDPTable, instanceIDs) } func (v *View) ProcessedExternalIDPSequence(event *models.Event) error { return v.saveCurrentSequence(externalIDPTable, event) } -func (v *View) UpdateExternalIDPSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(externalIDPTable) +func (v *View) UpdateExternalIDPSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(externalIDPTable, instanceIDs) } func (v *View) GetLatestExternalIDPFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/idp_configs.go b/internal/auth/repository/eventsourcing/view/idp_configs.go index 15d2f965fa..8b55d0b040 100644 --- a/internal/auth/repository/eventsourcing/view/idp_configs.go +++ b/internal/auth/repository/eventsourcing/view/idp_configs.go @@ -53,16 +53,16 @@ func (v *View) GetLatestIDPConfigSequence(instanceID string) (*global_view.Curre return v.latestSequence(idpConfigTable, instanceID) } -func (v *View) GetLatestIDPConfigSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(idpConfigTable, instanceIDs...) +func (v *View) GetLatestIDPConfigSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(idpConfigTable, instanceIDs) } func (v *View) ProcessedIDPConfigSequence(event *models.Event) error { return v.saveCurrentSequence(idpConfigTable, event) } -func (v *View) UpdateIDPConfigSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(idpConfigTable) +func (v *View) UpdateIDPConfigSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(idpConfigTable, instanceIDs) } func (v *View) GetLatestIDPConfigFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/idp_providers.go b/internal/auth/repository/eventsourcing/view/idp_providers.go index ff96e56344..7b3fbe83ae 100644 --- a/internal/auth/repository/eventsourcing/view/idp_providers.go +++ b/internal/auth/repository/eventsourcing/view/idp_providers.go @@ -73,16 +73,16 @@ func (v *View) GetLatestIDPProviderSequence(instanceID string) (*global_view.Cur return v.latestSequence(idpProviderTable, instanceID) } -func (v *View) GetLatestIDPProviderSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { - return v.latestSequences(idpProviderTable, instanceIDs...) +func (v *View) GetLatestIDPProviderSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) { + return v.latestSequences(idpProviderTable, instanceIDs) } func (v *View) ProcessedIDPProviderSequence(event *models.Event) error { return v.saveCurrentSequence(idpProviderTable, event) } -func (v *View) UpdateIDPProviderSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(idpProviderTable) +func (v *View) UpdateIDPProviderSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(idpProviderTable, instanceIDs) } func (v *View) GetLatestIDPProviderFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/org_project_mapping.go b/internal/auth/repository/eventsourcing/view/org_project_mapping.go index 6c6ed4e579..4ccac5dccd 100644 --- a/internal/auth/repository/eventsourcing/view/org_project_mapping.go +++ b/internal/auth/repository/eventsourcing/view/org_project_mapping.go @@ -52,16 +52,16 @@ func (v *View) GetLatestOrgProjectMappingSequence(instanceID string) (*repositor return v.latestSequence(orgPrgojectMappingTable, instanceID) } -func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(orgPrgojectMappingTable, instanceIDs...) +func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(orgPrgojectMappingTable, instanceIDs) } func (v *View) ProcessedOrgProjectMappingSequence(event *models.Event) error { return v.saveCurrentSequence(orgPrgojectMappingTable, event) } -func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(orgPrgojectMappingTable) +func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(orgPrgojectMappingTable, instanceIDs) } func (v *View) GetLatestOrgProjectMappingFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/refresh_token.go b/internal/auth/repository/eventsourcing/view/refresh_token.go index 638c5d44ba..5f7bd50401 100644 --- a/internal/auth/repository/eventsourcing/view/refresh_token.go +++ b/internal/auth/repository/eventsourcing/view/refresh_token.go @@ -77,16 +77,16 @@ func (v *View) GetLatestRefreshTokenSequence(instanceID string) (*repository.Cur return v.latestSequence(refreshTokenTable, instanceID) } -func (v *View) GetLatestRefreshTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(refreshTokenTable, instanceIDs...) +func (v *View) GetLatestRefreshTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(refreshTokenTable, instanceIDs) } func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error { return v.saveCurrentSequence(refreshTokenTable, event) } -func (v *View) UpdateRefreshTokenSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(refreshTokenTable) +func (v *View) UpdateRefreshTokenSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(refreshTokenTable, instanceIDs) } func (v *View) GetLatestRefreshTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/sequence.go b/internal/auth/repository/eventsourcing/view/sequence.go index 831a3e9aa2..c1e3b0b4e2 100644 --- a/internal/auth/repository/eventsourcing/view/sequence.go +++ b/internal/auth/repository/eventsourcing/view/sequence.go @@ -19,12 +19,12 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) } -func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...) +func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) { + return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) } -func (v *View) updateSpoolerRunSequence(viewName string) error { - currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) +func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error { + currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs) if err != nil { return err } diff --git a/internal/auth/repository/eventsourcing/view/token.go b/internal/auth/repository/eventsourcing/view/token.go index ffd6f2845e..dfc05a51a1 100644 --- a/internal/auth/repository/eventsourcing/view/token.go +++ b/internal/auth/repository/eventsourcing/view/token.go @@ -88,16 +88,16 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq return v.latestSequence(tokenTable, instanceID) } -func (v *View) GetLatestTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(tokenTable, instanceIDs...) +func (v *View) GetLatestTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(tokenTable, instanceIDs) } func (v *View) ProcessedTokenSequence(event *models.Event) error { return v.saveCurrentSequence(tokenTable, event) } -func (v *View) UpdateTokenSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(tokenTable) +func (v *View) UpdateTokenSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(tokenTable, instanceIDs) } func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/user.go b/internal/auth/repository/eventsourcing/view/user.go index 300e697654..8ff62f5579 100644 --- a/internal/auth/repository/eventsourcing/view/user.go +++ b/internal/auth/repository/eventsourcing/view/user.go @@ -193,16 +193,16 @@ func (v *View) GetLatestUserSequence(instanceID string) (*repository.CurrentSequ return v.latestSequence(userTable, instanceID) } -func (v *View) GetLatestUserSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(userTable, instanceIDs...) +func (v *View) GetLatestUserSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(userTable, instanceIDs) } func (v *View) ProcessedUserSequence(event *models.Event) error { return v.saveCurrentSequence(userTable, event) } -func (v *View) UpdateUserSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(userTable) +func (v *View) UpdateUserSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(userTable, instanceIDs) } func (v *View) GetLatestUserFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/auth/repository/eventsourcing/view/user_session.go b/internal/auth/repository/eventsourcing/view/user_session.go index 1947ee6e6d..8a8d71bf03 100644 --- a/internal/auth/repository/eventsourcing/view/user_session.go +++ b/internal/auth/repository/eventsourcing/view/user_session.go @@ -68,16 +68,16 @@ func (v *View) GetLatestUserSessionSequence(instanceID string) (*repository.Curr return v.latestSequence(userSessionTable, instanceID) } -func (v *View) GetLatestUserSessionSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { - return v.latestSequences(userSessionTable, instanceIDs...) +func (v *View) GetLatestUserSessionSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) { + return v.latestSequences(userSessionTable, instanceIDs) } func (v *View) ProcessedUserSessionSequence(event *models.Event) error { return v.saveCurrentSequence(userSessionTable, event) } -func (v *View) UpdateUserSessionSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(userSessionTable) +func (v *View) UpdateUserSessionSpoolerRunTimestamp(instanceIDs []string) error { + return v.updateSpoolerRunSequence(userSessionTable, instanceIDs) } func (v *View) GetLatestUserSessionFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { diff --git a/internal/authz/repository/eventsourcing/view/sequence.go b/internal/authz/repository/eventsourcing/view/sequence.go index 8f1c720e6f..6810e420d6 100644 --- a/internal/authz/repository/eventsourcing/view/sequence.go +++ b/internal/authz/repository/eventsourcing/view/sequence.go @@ -1,8 +1,6 @@ package view import ( - "time" - "github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/view/repository" ) @@ -18,21 +16,3 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error { func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) { return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) } - -func (v *View) latestSequences(viewName string) ([]*repository.CurrentSequence, error) { - return repository.LatestSequences(v.Db, sequencesTable, viewName) -} - -func (v *View) updateSpoolerRunSequence(viewName string) error { - currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) - if err != nil { - return err - } - for _, currentSequence := range currentSequences { - if currentSequence.ViewName == "" { - currentSequence.ViewName = viewName - } - currentSequence.LastSuccessfulSpoolerRun = time.Now() - } - return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences) -} diff --git a/internal/authz/repository/eventsourcing/view/token.go b/internal/authz/repository/eventsourcing/view/token.go index 2c8aead649..486d72008d 100644 --- a/internal/authz/repository/eventsourcing/view/token.go +++ b/internal/authz/repository/eventsourcing/view/token.go @@ -47,15 +47,3 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq func (v *View) ProcessedTokenSequence(event *models.Event) error { return v.saveCurrentSequence(tokenTable, event) } - -func (v *View) UpdateTokenSpoolerRunTimestamp() error { - return v.updateSpoolerRunSequence(tokenTable) -} - -func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { - return v.latestFailedEvent(tokenTable, instanceID, sequence) -} - -func (v *View) ProcessedTokenFailedEvent(failedEvent *repository.FailedEvent) error { - return v.saveFailedEvent(failedEvent) -} diff --git a/internal/eventstore/v1/query/handler.go b/internal/eventstore/v1/query/handler.go index 1aa2a27781..7e126a8f2a 100755 --- a/internal/eventstore/v1/query/handler.go +++ b/internal/eventstore/v1/query/handler.go @@ -17,10 +17,10 @@ const ( type Handler interface { ViewModel() string - EventQuery(instanceIDs ...string) (*models.SearchQuery, error) + EventQuery(instanceIDs []string) (*models.SearchQuery, error) Reduce(*models.Event) error OnError(event *models.Event, err error) error - OnSuccess() error + OnSuccess(instanceIDs []string) error MinimumCycleDuration() time.Duration LockDuration() time.Duration QueryLimit() uint64 @@ -32,7 +32,7 @@ type Handler interface { Subscription() *v1.Subscription } -func ReduceEvent(handler Handler, event *models.Event) { +func ReduceEvent(ctx context.Context, handler Handler, event *models.Event) { defer func() { err := recover() @@ -42,7 +42,7 @@ func ReduceEvent(handler Handler, event *models.Event) { "cause", err, "stack", string(debug.Stack()), "sequence", event.Sequence, - "instnace", event.InstanceID, + "instance", event.InstanceID, ).Error("reduce panicked") } }() @@ -60,7 +60,7 @@ func ReduceEvent(handler Handler, event *models.Event) { SearchQuery(). SetLimit(eventLimit) - unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery) + unprocessedEvents, err := handler.Eventstore().FilterEvents(ctx, searchQuery) if err != nil { logging.WithFields("sequence", event.Sequence).Warn("filter failed") return diff --git a/internal/eventstore/v1/spooler/config.go b/internal/eventstore/v1/spooler/config.go index 04f5b7e659..92f2de1299 100644 --- a/internal/eventstore/v1/spooler/config.go +++ b/internal/eventstore/v1/spooler/config.go @@ -5,6 +5,7 @@ import ( "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/eventstore" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/query" "github.com/zitadel/zitadel/internal/id" @@ -12,6 +13,7 @@ import ( type Config struct { Eventstore v1.Eventstore + EventstoreV2 *eventstore.Eventstore Locker Locker ViewHandlers []query.Handler ConcurrentWorkers int @@ -31,6 +33,7 @@ func (c *Config) New() *Spooler { handlers: c.ViewHandlers, lockID: lockID, eventstore: c.Eventstore, + esV2: c.EventstoreV2, locker: c.Locker, queue: make(chan *spooledHandler, len(c.ViewHandlers)), workers: c.ConcurrentWorkers, diff --git a/internal/eventstore/v1/spooler/spooler.go b/internal/eventstore/v1/spooler/spooler.go index cd9133fe96..24e72ce5fd 100644 --- a/internal/eventstore/v1/spooler/spooler.go +++ b/internal/eventstore/v1/spooler/spooler.go @@ -9,6 +9,8 @@ import ( "github.com/zitadel/logging" + "github.com/zitadel/zitadel/internal/eventstore" + "github.com/zitadel/zitadel/internal/eventstore/handler" v1 "github.com/zitadel/zitadel/internal/eventstore/v1" "github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/query" @@ -16,13 +18,19 @@ import ( "github.com/zitadel/zitadel/internal/view/repository" ) -const systemID = "system" +const ( + systemID = "system" + schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded") + aggregateType = eventstore.AggregateType("system") + aggregateID = "SYSTEM" +) type Spooler struct { handlers []query.Handler locker Locker lockID string eventstore v1.Eventstore + esV2 *eventstore.Eventstore workers int queue chan *spooledHandler concurrentInstances int @@ -37,7 +45,9 @@ type spooledHandler struct { locker Locker queuedAt time.Time eventstore v1.Eventstore + esV2 *eventstore.Eventstore concurrentInstances int + succeededOnce bool } func (s *Spooler) Start() { @@ -57,7 +67,7 @@ func (s *Spooler) Start() { } go func() { for _, handler := range s.handlers { - s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, concurrentInstances: s.concurrentInstances} + s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, esV2: s.esV2, concurrentInstances: s.concurrentInstances} } }() } @@ -68,6 +78,32 @@ func requeueTask(task *spooledHandler, queue chan<- *spooledHandler) { queue <- task } +func (s *spooledHandler) hasSucceededOnce(ctx context.Context) (bool, error) { + events, err := s.esV2.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). + AddQuery(). + AggregateTypes(aggregateType). + AggregateIDs(aggregateID). + EventTypes(schedulerSucceeded). + EventData(map[string]interface{}{ + "name": s.ViewModel(), + }). + Builder(), + ) + return len(events) > 0 && err == nil, err +} + +func (s *spooledHandler) setSucceededOnce(ctx context.Context) error { + _, err := s.esV2.Push(ctx, &handler.ProjectionSucceededEvent{ + BaseEvent: *eventstore.NewBaseEventForPush(ctx, + eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"), + schedulerSucceeded, + ), + Name: s.ViewModel(), + }) + s.succeededOnce = err == nil + return err +} + func (s *spooledHandler) load(workerID string) { errs := make(chan error) defer func() { @@ -86,8 +122,24 @@ func (s *spooledHandler) load(workerID string) { hasLocked := s.lock(ctx, errs, workerID) if <-hasLocked { + if !s.succeededOnce { + var err error + s.succeededOnce, err = s.hasSucceededOnce(ctx) + if err != nil { + logging.WithFields("view", s.ViewModel()).OnError(err).Warn("initial lock failed for first schedule") + errs <- err + return + } + } + + instanceIDQuery := models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("") for { - ids, err := s.eventstore.InstanceIDs(ctx, models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("").SearchQuery()) + if s.succeededOnce { + // since we have at least one successful run, we can restrict it to events not older than + // twice the requeue time (just to be sure not to miss an event) + instanceIDQuery = instanceIDQuery.CreationDateNewerFilter(time.Now().Add(-2 * s.MinimumCycleDuration())) + } + ids, err := s.eventstore.InstanceIDs(ctx, instanceIDQuery.SearchQuery()) if err != nil { errs <- err break @@ -97,12 +149,16 @@ func (s *spooledHandler) load(workerID string) { if max > len(ids) { max = len(ids) } - err = s.processInstances(ctx, workerID, ids[i:max]...) + err = s.processInstances(ctx, workerID, ids[i:max]) if err != nil { errs <- err } } if ctx.Err() == nil { + if !s.succeededOnce { + err = s.setSucceededOnce(ctx) + logging.WithFields("view", s.ViewModel()).OnError(err).Warn("unable to push first schedule succeeded") + } errs <- nil } break @@ -111,16 +167,20 @@ func (s *spooledHandler) load(workerID string) { <-ctx.Done() } -func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids ...string) error { +func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids []string) error { for { - events, err := s.query(ctx, ids...) + processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + events, err := s.query(processCtx, ids) if err != nil { + cancel() return err } if len(events) == 0 { + cancel() return nil } - err = s.process(ctx, events, workerID) + err = s.process(processCtx, events, workerID, ids) + cancel() if err != nil { return err } @@ -139,7 +199,7 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str } } -func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error { +func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string, instanceIDs []string) error { for i, event := range events { select { case <-ctx.Done(): @@ -152,17 +212,17 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo continue } time.Sleep(100 * time.Millisecond) - return s.process(ctx, events[i:], workerID) + return s.process(ctx, events[i:], workerID, instanceIDs) } } } - err := s.OnSuccess() + err := s.OnSuccess(instanceIDs) logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func") return err } -func (s *spooledHandler) query(ctx context.Context, instanceIDs ...string) ([]*models.Event, error) { - query, err := s.EventQuery(instanceIDs...) +func (s *spooledHandler) query(ctx context.Context, instanceIDs []string) ([]*models.Event, error) { + query, err := s.EventQuery(instanceIDs) if err != nil { return nil, err } @@ -227,6 +287,6 @@ func HandleError(event *models.Event, failedErr error, return failedErr } -func HandleSuccess(updateSpoolerRunTimestamp func() error) error { - return updateSpoolerRunTimestamp() +func HandleSuccess(updateSpoolerRunTimestamp func([]string) error, instanceIDs []string) error { + return updateSpoolerRunTimestamp(instanceIDs) } diff --git a/internal/eventstore/v1/spooler/spooler_test.go b/internal/eventstore/v1/spooler/spooler_test.go index 315a16a2a5..9aa0c75431 100644 --- a/internal/eventstore/v1/spooler/spooler_test.go +++ b/internal/eventstore/v1/spooler/spooler_test.go @@ -51,7 +51,7 @@ func (h *testHandler) Subscription() *v1.Subscription { return nil } -func (h *testHandler) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { +func (h *testHandler) EventQuery(instanceIDs []string) (*models.SearchQuery, error) { if h.queryError != nil { return nil, h.queryError } @@ -71,7 +71,7 @@ func (h *testHandler) OnError(event *models.Event, err error) error { return err } -func (h *testHandler) OnSuccess() error { +func (h *testHandler) OnSuccess([]string) error { return nil } @@ -127,8 +127,9 @@ func TestSpooler_process(t *testing.T) { currentHandler *testHandler } type args struct { - timeout time.Duration - events []*models.Event + timeout time.Duration + events []*models.Event + instanceIDs []string } tests := []struct { name string @@ -184,7 +185,7 @@ func TestSpooler_process(t *testing.T) { start = time.Now() } - if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr { + if err := s.process(ctx, tt.args.events, "test", tt.args.instanceIDs); (err != nil) != tt.wantErr { t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr) } if tt.fields.currentHandler.maxErrCount != tt.wantRetries { diff --git a/internal/migration/migration.go b/internal/migration/migration.go index 2f27a3715c..3608332a8a 100644 --- a/internal/migration/migration.go +++ b/internal/migration/migration.go @@ -2,6 +2,8 @@ package migration import ( "context" + errs "errors" + "time" "github.com/zitadel/logging" @@ -18,6 +20,10 @@ const ( aggregateID = "SYSTEM" ) +var ( + errMigrationAlreadyStarted = errs.New("already started") +) + type Migration interface { String() string Execute(context.Context) error @@ -32,7 +38,7 @@ type RepeatableMigration interface { func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) { logging.Infof("verify migration %s", migration.String()) - if should, err := shouldExec(ctx, es, migration); !should || err != nil { + if should, err := checkExec(ctx, es, migration); !should || err != nil { return err } @@ -52,6 +58,30 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration return pushErr } +// checkExec ensures that only one setup step is done concurrently +// if a setup step is already started, it calls shouldExec after some time again +func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) { + timer := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return false, errors.ThrowInternal(nil, "MIGR-as3f7", "Errors.Internal") + case <-timer.C: + should, err := shouldExec(ctx, es, migration) + if err != nil { + if !errs.Is(err, errMigrationAlreadyStarted) { + return false, err + } + logging.WithFields("migration step", migration.String()). + Warn("migration already started, will check again in 5 seconds") + timer.Reset(5 * time.Second) + break + } + return should, nil + } + } +} + func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) { events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). OrderAsc(). @@ -90,7 +120,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat } if isStarted { - return false, nil + return false, errMigrationAlreadyStarted } repeatable, ok := migration.(RepeatableMigration) if !ok { diff --git a/internal/user/repository/view/token_view.go b/internal/user/repository/view/token_view.go index 405b66dc6e..e24a8f671a 100644 --- a/internal/user/repository/view/token_view.go +++ b/internal/user/repository/view/token_view.go @@ -36,8 +36,13 @@ func TokensByUserID(db *gorm.DB, table, userID, instanceID string) ([]*usr_model Method: domain.SearchMethodEquals, Value: instanceID, } + expirationQuery := &model.TokenSearchQuery{ + Key: model.TokenSearchKeyExpiration, + Method: domain.SearchMethodGreaterThan, + Value: "now()", + } query := repository.PrepareSearchQuery(table, usr_model.TokenSearchRequest{ - Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery}, + Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery, expirationQuery}, }) _, err := query(db, &tokens) return tokens, err diff --git a/internal/view/repository/sequence.go b/internal/view/repository/sequence.go index 2f1a2e1b25..5ccb7a36f7 100644 --- a/internal/view/repository/sequence.go +++ b/internal/view/repository/sequence.go @@ -169,7 +169,7 @@ func LatestSequence(db *gorm.DB, table, viewName, instanceID string) (*CurrentSe return nil, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName) } -func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs ...string) ([]*CurrentSequence, error) { +func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs []string) ([]*CurrentSequence, error) { searchQueries := []sequenceSearchQuery{ {key: sequenceSearchKey(SequenceSearchKeyViewName), value: viewName, method: domain.SearchMethodEquals}, }