Merge branch 'instance-create' of github.com:caos/zitadel into instance-create

This commit is contained in:
Stefan Benz 2022-11-25 14:44:40 +01:00
commit a52aef8223
No known key found for this signature in database
GPG Key ID: 9D2FE4EA50BEFE68
56 changed files with 712 additions and 293 deletions

View File

@ -2,7 +2,6 @@ module.exports = {
branches: [ branches: [
{name: 'main'}, {name: 'main'},
{name: '1.87.x', range: '1.87.x', channel: '1.87.x'}, {name: '1.87.x', range: '1.87.x', channel: '1.87.x'},
{name: 'startup-times', prerelease: 'beta'}
], ],
plugins: [ plugins: [
"@semantic-release/commit-analyzer" "@semantic-release/commit-analyzer"

View File

@ -160,7 +160,7 @@ Auth:
SearchLimit: 1000 SearchLimit: 1000
Spooler: Spooler:
ConcurrentWorkers: 1 ConcurrentWorkers: 1
ConcurrentInstances: 10 ConcurrentInstances: 1
BulkLimit: 10000 BulkLimit: 10000
FailureCountUntilSkip: 5 FailureCountUntilSkip: 5
@ -168,7 +168,7 @@ Admin:
SearchLimit: 1000 SearchLimit: 1000
Spooler: Spooler:
ConcurrentWorkers: 1 ConcurrentWorkers: 1
ConcurrentInstances: 10 ConcurrentInstances: 1
BulkLimit: 10000 BulkLimit: 10000
FailureCountUntilSkip: 5 FailureCountUntilSkip: 5

View File

@ -1,10 +1,10 @@
CREATE INDEX instance_id_idx ON adminapi.current_sequences (instance_id); CREATE INDEX current_sequences_instance_id_idx ON adminapi.current_sequences (instance_id);
CREATE INDEX instance_id_idx ON auth.current_sequences (instance_id); CREATE INDEX current_sequences_instance_id_idx ON auth.current_sequences (instance_id);
CREATE INDEX instance_id_idx ON projections.current_sequences (instance_id); CREATE INDEX current_sequences_instance_id_idx ON projections.current_sequences (instance_id);
CREATE INDEX instance_id_idx ON adminapi.failed_events (instance_id); CREATE INDEX failed_events_instance_id_idx ON adminapi.failed_events (instance_id);
CREATE INDEX instance_id_idx ON auth.failed_events (instance_id); CREATE INDEX failed_events_instance_id_idx ON auth.failed_events (instance_id);
CREATE INDEX instance_id_idx ON projections.failed_events (instance_id); CREATE INDEX failed_events_instance_id_idx ON projections.failed_events (instance_id);
ALTER TABLE adminapi.failed_events ADD COLUMN last_failed TIMESTAMPTZ; ALTER TABLE adminapi.failed_events ADD COLUMN last_failed TIMESTAMPTZ;
ALTER TABLE auth.failed_events ADD COLUMN last_failed TIMESTAMPTZ; ALTER TABLE auth.failed_events ADD COLUMN last_failed TIMESTAMPTZ;

View File

@ -173,11 +173,11 @@ func startAPIs(ctx context.Context, router *mux.Router, commands *command.Comman
return err return err
} }
apis := api.New(config.Port, router, queries, verifier, config.InternalAuthZ, config.ExternalSecure, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader) apis := api.New(config.Port, router, queries, verifier, config.InternalAuthZ, config.ExternalSecure, tlsConfig, config.HTTP2HostHeader, config.HTTP1HostHeader)
authRepo, err := auth_es.Start(config.Auth, config.SystemDefaults, commands, queries, dbClient, keys.OIDC, keys.User) authRepo, err := auth_es.Start(ctx, config.Auth, config.SystemDefaults, commands, queries, dbClient, eventstore, keys.OIDC, keys.User)
if err != nil { if err != nil {
return fmt.Errorf("error starting auth repo: %w", err) return fmt.Errorf("error starting auth repo: %w", err)
} }
adminRepo, err := admin_es.Start(config.Admin, store, dbClient) adminRepo, err := admin_es.Start(ctx, config.Admin, store, dbClient, eventstore)
if err != nil { if err != nil {
return fmt.Errorf("error starting admin repo: %w", err) return fmt.Errorf("error starting admin repo: %w", err)
} }

View File

@ -2,30 +2,71 @@
title: Overview title: Overview
--- ---
import Tabs from '@theme/Tabs'; import Tabs from "@theme/Tabs";
import TabItem from '@theme/TabItem'; import TabItem from "@theme/TabItem";
import {Card, CardWrapper} from '../../src/components/card'; import { Card, CardWrapper } from "../../src/components/card";
Get started with ZITADEL quickly by reading a quickstart or by cloning an example from our [ZITADEL examples](https://github.com/zitadel/zitadel-examples) repo. Get started with ZITADEL quickly by reading a quickstart or by cloning a [ZITADEL example](https://github.com/search?q=topic%3Aexamples+org%3Azitadel) repo.
<Tabs> <Tabs>
<TabItem value="app" label="Web · Native applications" default> <TabItem value="app" label="Web · Native applications" default>
<CardWrapper> <CardWrapper>
<Card link="/docs/examples/login/angular" imageSource="/img/tech/angular.svg" title="Angular" description="Add the user login to your application and query some data from the userinfo endpoint" /> <Card
<Card link="/docs/examples/login/react" imageSource="/img/tech/react.png" title="React" description="Logs into your application and queries some data from the userinfo endpoint" /> link="/docs/examples/login/angular"
<Card link="/docs/examples/login/flutter" imageSource="/img/tech/flutter.svg" title="Flutter" description="Mobile Application working for iOS and Android that authenticates your user." /> imageSource="/img/tech/angular.svg"
<Card link="/docs/examples/login/nextjs" imageSource="/img/tech/nextjs.svg" title="NextJS" description="A simple application to log into your user account and query some data from User endpoint." /> title="Angular"
description="Add the user login to your application and query some data from the userinfo endpoint"
/>
<Card
link="/docs/examples/login/react"
imageSource="/img/tech/react.png"
title="React"
description="Logs into your application and queries some data from the userinfo endpoint"
/>
<Card
link="/docs/examples/login/flutter"
imageSource="/img/tech/flutter.svg"
title="Flutter"
description="Mobile Application working for iOS and Android that authenticates your user."
/>
<Card
link="/docs/examples/login/nextjs"
imageSource="/img/tech/nextjs.svg"
title="NextJS"
description="A simple application to log into your user account and query some data from User endpoint."
/>
<Card
link="/docs/examples/login/nextjs-b2b"
imageSource="/img/tech/nextjs.svg"
title="NextJS B2B Scenario"
description="An application to showcase your user account having multiple organizations and the use of Personal Access Tokens."
/>
</CardWrapper> </CardWrapper>
</TabItem> </TabItem>
<TabItem value="backend" label="Backend · API"> <TabItem value="backend" label="Backend · API">
<CardWrapper> <CardWrapper>
<Card link="/docs/examples/call-zitadel-api/go" imageSource="/img/tech/golang.svg" title="GO" description="Demonstrates how to fetch some data from the ZITADEL management API." /> <Card
<Card link="/docs/examples/call-zitadel-api/dot-net" imageSource="/img/tech/dotnet.svg" title=".NET" description="This integration guide shows you how to integrate ZITADEL into your .NET application. It demonstrates how to fetch some data from the ZITADEL management API." /> link="/docs/examples/call-zitadel-api/go"
imageSource="/img/tech/golang.svg"
title="GO"
description="Demonstrates how to fetch some data from the ZITADEL management API."
/>
<Card
link="/docs/examples/call-zitadel-api/dot-net"
imageSource="/img/tech/dotnet.svg"
title=".NET"
description="This integration guide shows you how to integrate ZITADEL into your .NET application. It demonstrates how to fetch some data from the ZITADEL management API."
/>
</CardWrapper> </CardWrapper>
</TabItem> </TabItem>
<TabItem value="proxy" label="Proxy"> <TabItem value="proxy" label="Proxy">
<CardWrapper> <CardWrapper>
<Card link="/docs/examples/identity-proxy/oauth2-proxy" imageSource="/img/tech/oauth2-proxy.svg" title="OAuth 2.0 Proxy" description="Allows services to delegate the authentication flow to a IDP, for example ZITADEL" /> <Card
link="/docs/examples/identity-proxy/oauth2-proxy"
imageSource="/img/tech/oauth2-proxy.svg"
title="OAuth 2.0 Proxy"
description="Allows services to delegate the authentication flow to a IDP, for example ZITADEL"
/>
</CardWrapper> </CardWrapper>
</TabItem> </TabItem>
</Tabs> </Tabs>
@ -33,18 +74,33 @@ Get started with ZITADEL quickly by reading a quickstart or by cloning an exampl
## Clone a sample project ## Clone a sample project
<CardWrapper> <CardWrapper>
<Card githubLink="https://github.com/zitadel/zitadel-examples/tree/main/java/spring-boot" title="Java" label="Java" /> <Card
<Card githubLink="https://github.com/zitadel/zitadel-examples/tree/main/python3" title="Python" label="Python" /> githubLink="https://github.com/zitadel/zitadel-examples/tree/main/java/spring-boot"
<Card githubLink="https://github.com/zitadel/zitadel-examples/tree/main/angular" title="Angular" label="Web · Mobile Web" /> title="Java"
<Card githubLink="https://github.com/zitadel/zitadel-examples/tree/main/nextjs" title="NextJS" label="Web · Mobile Web" /> label="Java"
/>
<Card
githubLink="https://github.com/zitadel/zitadel-examples/tree/main/python3"
title="Python"
label="Python"
/>
<Card
githubLink="https://github.com/zitadel/zitadel-examples/tree/main/angular"
title="Angular"
label="Web · Mobile Web"
/>
<Card
githubLink="https://github.com/zitadel/zitadel-examples/tree/main/nextjs"
title="NextJS"
label="Web · Mobile Web"
/>
</CardWrapper> </CardWrapper>
## Libraries ## Libraries
| Language | Description | Link | | Language | Description | Link |
| ------------ | ---------------------|-------------| | -------- | ------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------- |
| Go | Go client library for ZITADEL. | [https://github.com/zitadel/zitadel-go](https://github.com/zitadel/zitadel-go) | Go | Go client library for ZITADEL. | [https://github.com/zitadel/zitadel-go](https://github.com/zitadel/zitadel-go) |
| .Net | Authentication / Authorization library written in dotnet for the asp.net web application package. | [https://github.com/zitadel/zitadel-net](https://github.com/zitadel/zitadel-net) | .Net | Authentication / Authorization library written in dotnet for the asp.net web application package. | [https://github.com/zitadel/zitadel-net](https://github.com/zitadel/zitadel-net) |
| Dart | Dart library for ZITADEL, contains gRPC and API access elements. | [https://github.com/zitadel/zitadel-dart](https://github.com/zitadel/zitadel-dart) | | Dart | Dart library for ZITADEL, contains gRPC and API access elements. | [https://github.com/zitadel/zitadel-dart](https://github.com/zitadel/zitadel-dart) |
| Elixir | API Client for the ZITADEL API. | [https://github.com/jshmrtn/zitadel_api](https://github.com/jshmrtn/zitadel_api) | | Elixir | API Client for the ZITADEL API. | [https://github.com/jshmrtn/zitadel_api](https://github.com/jshmrtn/zitadel_api) |

View File

@ -0,0 +1,177 @@
---
title: Next.js B2B Scenario
---
This is our Zitadel [Next.js](https://nextjs.org/) B2B template. It shows how to authenticate as a user with multiple organizations. The application shows your users roles on the selected organizations, other projects your organization is allowed to use and other users having a grant to use the application.
If you need more info on B2B use cases consider reading our guide for the [B2B solution scenario](../../guides/solution-scenarios/b2b.mdx).
> You can follow along with the template code in our [zitadel-nextjs-b2b](https://github.com/zitadel/zitadel-nextjs-b2b) repo.
![B2B Application](/img/nextjs-b2b/home.png)
## What does it do?
Users with `view` role can view granted projects on their organization which were granted by your organization (owning this portal application).
Users with `admin` role can view granted projects and list users of the selected organization who are granted to use the portal application too.
## Setup Vendor application and users in ZITADEL
First we need to create an organization that holds the Vendor's users, projects and applications.
### Vendor Organization
Navigate to `https://{YourDomain}.zitadel.cloud/ui/console/orgs` (replace {YourDomain}), and click on the button "New".
Toggle the setting "Use your personal account as organization owner".
Enter the name `Demo-Vendor`, and click "Create". Then click on that organization.
### Portal Web Application
To setup this sample you have to create a project and an application in the vendor organization (`Demo-Vendor`) first.
Open the Console (`https://{YourDomain}.zitadel.cloud/ui/console/projects`) and create a new project. Let's call it `Portal`.
Then on the project detail page click on new application and enter a name for this app.
Let's call this one `portal-web`.
Select `Web`, continue, `PKCE`, then enter `http://localhost:3000/api/auth/callback/zitadel` for the redirect, and `http://localhost:3000` for the post redirect. Then press on `create`.
Copy the "Resource Id" of the project `Portal` as you will need this in your environment configuration file later.
Click on the application `portal-web`.
On the application detail page click on the section under redirect settings and enable `Development Mode`. This will allow you application to work on `localhost:3000`.
To read the user data and roles from ID Token, go to the section Token Settings and make sure both checkboxes, `User roles inside ID Token` and `User Info inside ID Token` are enabled.
Make sure to save your changes.
Copy the "Resource Id" of the application `portal-web` as you will need this in your environment configuration file later.
### Roles
To setup the needed roles for your project, navigate to your `Portal` project, and add the following roles
| Key | Display Name | Group | Description |
| :----- | :------------ | :---- | ---------------------------------------------------------------------- |
| admin | Administrator | | The administrator, allowed to read granted projects and to user grants |
| reader | Reader | | A user who is allowed to read his organizations granted projects only |
Now in the `General` section of the Portal project, make sure to enable `Assert Roles on Authentication`.
This makes sure that roles, which is used by the application to enable UI components, are set in your OIDC ID Token.
### Service User
To make the application work you need a service user which loads granted-projects and user-grants for you.
In the B2B-Demo organization, navigate to `Users` in navigation of Console, click on `Service Users` and create a new user.
Let's set its username to `nextjs` and its name to `NextJS`. Then press `create`.
On the detail page of that user, navigate to "Personal Access Tokens" and add a new entry, set an optional expiration date.
Copy the generated Token as you will need this in your environment configuration file later.
Go back to the `Portal` project and add the Service User as Manager (top right).
Make sure to select `Project Owner Viewer` as the management role.
To show granted projects, go to the `Demo-Vendor` organization and add the Service User as `Org Project Permission Editor` Manager.
## Configuration
Now clone this project and navigate to its root folder.
Create a file `.env.local` and copy paste the following:
```text
NEXTAUTH_URL=http://localhost:3000
NEXT_PUBLIC_ZITADEL_ISSUER=https://{YourDomain}.zitadel.cloud
ZITADEL_API=https://{YourDomain}.zitadel.cloud
ORG_ID={YourOrgId}
PROJECT_ID={YourProjectId}
ZITADEL_CLIENT_ID={YourClientID}
SERVICE_ACCOUNT_ACCESS_TOKEN={YourServiceAccountSecret}
NEXTAUTH_SECRET=randomsecret
```
Replace the values as follows
`NEXTAUTH_URL`: Base url of this demo app (B2B portal); runs per default on [http://localhost:3000](http://localhost:3000)
`NEXT_PUBLIC_ZITADEL_ISSUER`: The url to your zitadel instance. When using zitadel.cloud for this demo you can find the domain of your ZITADEL instance in the customer portal. You can also find this information by going to your application `portal-web` and click 'Urls' in the navigation. The variable is prefixed with `NEXT_PUBLIC_` such that it can be accessed from the client.
`ZITADEL_API`: URL of the Management API. Typically the same as `ZITADEL_ISSUER`.
`ORG_ID`: We will create an organization during later steps. You can find `{YourOrgId}` by selecting the `Demo-Vendor` organization in Console. `{YourOrgId}` is displayed on top of the organization detail page as "Resource Id".
`PROJECT_ID`: You can find `{YourProjectId}` by clicking on "Projects" in the navigation and select the Project `Portal`. `{YourProjectId}` is displayed on the top as "Resource Id".
`ZITADEL_CLIENT_ID`: Having the project `Portal` selected, click on the Application `portal-web`. `{YourClientID}` is displayed as a field in the OIDC configuration, labelled "Client ID" and has the format `12345678@portal`.
`SERVICE_ACCOUNT_ACCESS_TOKEN`: Setup a service user, add a Personal Access Token and copy the secret here (see below).
## Install and Run
To run this sample locally you need to install dependencies first.
Type and execute:
```bash
yarn install
```
then, to run the development server:
```bash
npm run dev
# or
yarn dev
```
and open [http://localhost:3000](http://localhost:3000) with your browser to see the result.
## Create a customer organization
### Customer organization
Create a new organization in Console. Easiest way is to use the organization dropdown on the top left.
Let's call this new organization `Demo-Customer`.
### Users
Now switch back to the organization `Demo-Customer` and [create a new user](https://docs.zitadel.com/docs/manuals/user-register) in this organization.
Let's call the first user `Alice Admin`. Create a second user called `Eric Employee`.
### Manager Role
We want to enable Alice to assign roles to users in her organization in a self-service manner.
To make this happen, we need give Alice an [Manager Role](https://docs.zitadel.com/docs/concepts/structure/managers) within the Organization `Demo-Customer`.
Still in the organization `Demo-Customer`, navigate to Organization. Click on the plus on the top right and give `Alice Admin` the Manager Role `Org Owner`.
Login with your user on the customer organization to validate the setup.
## Create a project grant
### Organization Grant
Switch to the `Demo-Vendor` organization, select Projects in the navigation, and click on `Portal` and then `Grants`.
[Grant all roles of the Project](https://docs.zitadel.com/docs/guides/basics/projects#exercise---grant-a-project) to the organization `demo-customer.{YourDomain}.zitadel.cloud`.
### Authorization
As you have guessed, these two users need to be authorized.
On the `Demo-Customer` organization, navigate to Projects and select "Granted Projects" in the sub-navigation.
Select the project portal `Portal` and navigate to "Authorizations".
Give `Alice Admin` the roles `reader` and `admin`.
`Eric Employee` will get only the role `reader`.
### Login
You should be able to login to the Demo Application with `Alice Admin` and see all granted projects.
You can log out and log in with `Eric Employee` and you should only have access to the granted projects, but not to the Authorizations tab.
## What next
You could create another project (eg, `Data Cube`) and grant that project to the customer organization. The granted project should appear after a reload automatically. This gives you an idea of how you could do Service Discovery with ZITADEL.
You could also build out the code (PRs welcome :wink:) for this application, for example:
- Create a mock `datacube-web` application and show how SSO between the portal and the application works with ZITADEL.
- Implement a feature in the Authorization tab to assign roles directly from the customer portal.

View File

@ -8,25 +8,15 @@ This is our Zitadel [Next.js](https://nextjs.org/) template. It shows how to aut
## Getting Started ## Getting Started
First, we start by creating a new NextJS app with `npx create-next-app`, which sets up everything automatically for you. To create a project, run: ### Install dependencies
To install the dependencies type:
```bash ```bash
npx create-next-app --typescript yarn install
# or
yarn create next-app --typescript
``` ```
### Install Authentication library then to run the app:
To keep the template as easy as possible we use [next-auth](https://next-auth.js.org/) as our main authentication library. To install, run:
```bash
npm i next-auth
# or
yarn add next-auth
```
To run the app, type:
```bash ```bash
npm run dev npm run dev

View File

@ -0,0 +1,45 @@
---
title: PAT (Personal Access Token)
---
A Personal Access Token (PAT) is a ready to use token which can be used as _Authorization_ header.
At the moment ZITADEL only allows PATs for machine accounts (service users).
It is an alternative to the JWT profile authentication where the service user has a key to authenticate. Read more about that [here](serviceusers)
## Create a Service User with a PAT
1. Navigate to Service Users
2. Click on **New**
3. Enter a user name and a display name
4. Click on the Personal Access Token menu point in the detail of your user
5. Click on **New**
6. You can either set an expiration date or leave it empty if you don't want it to expire
7. Copy the token from the dialog (You will not see this again)
![Create new service user](/img/guides/console-service-user-pat.gif)
## Grant role for ZITADEL
To be able to access the ZITADEL APIs your service user needs permissions to ZITADEL.
1. Go to the detail page of your organization
2. Click in the top right corner the "+" button
3. Search for your service user
4. Give the user the role you need, for the example we choose Org Owner (More about [ZITADEL Permissions](../manage/console/managers))
![Add org owner to service user](/img/guides/console-service-user-org-owner.gif)
## Call ZITADEL API with PAT
Because the PAT is a ready to use Token, you can add it as Authorization Header and send it in your requests to the ZITADEL API.
In this example we read the organization of the service user.
```bash
curl --request GET \
--url {your-domain}/management/v1/orgs/me \
--header 'Authorization: Bearer {PAT}'
```

View File

@ -173,7 +173,7 @@ To be able to use the email as username you have to disable the attribute "User
This means that all your users will not be suffixed with the domain of your organization and you can enter the email as username. This means that all your users will not be suffixed with the domain of your organization and you can enter the email as username.
All usernames will then be globally unique within your instance. All usernames will then be globally unique within your instance.
You can either set this attribute on your whole ZITADEL instance or just on some specific orgnizations. You can either set this attribute on your whole ZITADEL instance or just on some specific organizations.
## Privacy Policy and TOS ## Privacy Policy and TOS

View File

@ -19,7 +19,7 @@ See a description of all possible _runtime configuration_ options with their def
The `zitadel` binary expects the `--config` flag for this configuration. The `zitadel` binary expects the `--config` flag for this configuration.
### Database Initialization ### Database Initialization
Apart from these options, ZITADEL uses a [different configuration](https://github.com/zitadel/zitadel/blob/main/cmd/admin/setup/steps.yaml) for _database initialization steps_. Apart from these options, ZITADEL uses a [different configuration](https://github.com/zitadel/zitadel/blob/main/cmd/setup/steps.yaml) for _database initialization steps_.
The `zitadel` binary expects the `--steps` flag for this configuration. The `zitadel` binary expects the `--steps` flag for this configuration.
### Split Configuration ### Split Configuration

View File

@ -19,6 +19,8 @@ Read [on the configure page](/docs/guides/manage/self-hosted/configure) about th
- To enable and restrict access to **HTTPS**, head over to [the description of your TLS options](/docs/guides/manage/self-hosted/tls_modes). - To enable and restrict access to **HTTPS**, head over to [the description of your TLS options](/docs/guides/manage/self-hosted/tls_modes).
- If you want to front ZITADEL with a reverse proxy, web application firewall or content delivery network, make sure to support **[HTTP/2](/docs/guides/manage/self-hosted/http2)**. - If you want to front ZITADEL with a reverse proxy, web application firewall or content delivery network, make sure to support **[HTTP/2](/docs/guides/manage/self-hosted/http2)**.
- You can also refer to some **[example reverse proxy configurations](/docs/guides/manage/self-hosted/reverseproxy/reverse_proxy)**. - You can also refer to some **[example reverse proxy configurations](/docs/guides/manage/self-hosted/reverseproxy/reverse_proxy)**.
- The ZITADEL Console web GUI uses many gRPC-Web stubs. This results in a fairly big JavaScript bundle. You might want to compress it using [Gzip](https://www.gnu.org/software/gzip/) or [Brotli](https://github.com/google/brotli).
- Serving and caching the assets using a content delivery network could improve network latencies and shield your ZITADEL runtime.
## Monitoring ## Monitoring
@ -36,6 +38,16 @@ Tracing:
## Database ## Database
### Prefer CockroachDB
ZITADEL supports [CockroachDB](https://www.cockroachlabs.com/) and [PostgreSQL](https://www.postgresql.org/).
We highly recommend using CockroachDB,
as horizontal scaling is much easier than with PostgreSQL.
Also, if you are concerned about multi-regional data locality,
[the way to go is with CockroachDB](https://www.cockroachlabs.com/docs/stable/multiregion-overview.html).
### Configure ZITADEL
Depending on your environment, you maybe would want to tweak some settings about how ZITADEL interacts with the database in the database section of your ZITADEL configuration. Read more about your [database configuration options](/docs/guides/manage/self-hosted/database). Depending on your environment, you maybe would want to tweak some settings about how ZITADEL interacts with the database in the database section of your ZITADEL configuration. Read more about your [database configuration options](/docs/guides/manage/self-hosted/database).
```yaml ```yaml
@ -67,6 +79,25 @@ Projections:
BulkLimit: 2000 BulkLimit: 2000
``` ```
### Manage your Data
When designing your backup strategy,
it is worth knowing that
[ZITADEL is event sourced](/docs/concepts/eventstore/overview).
That means, ZITADEL itself is able to recompute its
whole state from the records in the table eventstore.events.
The timestamp of your last record in the events table
defines up to which point in time ZITADEL can restore its state.
The ZITADEL binary itself is stateless,
so there is no need for a special backup job.
Generally, for maintaining your database management system in production,
please refer to the corresponding docs
[for CockroachDB](https://www.cockroachlabs.com/docs/stable/recommended-production-settings.html)
or [for PostgreSQL](https://www.postgresql.org/docs/current/admin.html).
## Data Initialization ## Data Initialization
- You can configure instance defaults in the DefaultInstance section. - You can configure instance defaults in the DefaultInstance section.

View File

@ -2,7 +2,7 @@
title: B2B title: B2B
--- ---
import { B2B } from '../../../src/components/b2b'; import { B2B } from "../../../src/components/b2b";
## Business to Business ## Business to Business
@ -13,6 +13,7 @@ In ZITADEL a B2B organization represents a business partner or partner who typic
B2B can be a simple scenario where an organization only shares one of its projects with another organization or have a more complex case where an organization is offering a portal application to all its partners with included (self)administration. B2B can be a simple scenario where an organization only shares one of its projects with another organization or have a more complex case where an organization is offering a portal application to all its partners with included (self)administration.
<!-- This guide describes an application --> <!-- This guide describes an application -->
## Sample scenario ## Sample scenario
Octagon is a fictitious company which is used throughout this guide to explain the details and key concepts of such a B2B scenario. Octagon is a fictitious company which is used throughout this guide to explain the details and key concepts of such a B2B scenario.
@ -24,6 +25,7 @@ Octagon has a **Portal application** where its employees can access their accoun
Employees work for a department within Octagon or for Octagon itself. Employees work for a department within Octagon or for Octagon itself.
Some of the users have enhanced features because they supervise certain teams. Those can onboard new employees and manage their roles and features. Some of the users have enhanced features because they supervise certain teams. Those can onboard new employees and manage their roles and features.
Target groups of the application can be split into: Target groups of the application can be split into:
- **Employees:** users who are using the application as a starting point for their work. - **Employees:** users who are using the application as a starting point for their work.
- **Supervisors:** users who are mainly using the application to manage users and their access of their department. - **Supervisors:** users who are mainly using the application to manage users and their access of their department.
- **Administrators:** this users are able to grant additional organizations or departments and elect supervisors. - **Administrators:** this users are able to grant additional organizations or departments and elect supervisors.
@ -56,7 +58,7 @@ In our sample scenario, we assume to have the following users:
- **Bill:** is employed at Octagon as Administrator of the Portal Application. Bill also uses a Microsoft Account in combination with a Security Key to secure his account. - **Bill:** is employed at Octagon as Administrator of the Portal Application. Bill also uses a Microsoft Account in combination with a Security Key to secure his account.
After having determined the constellation of the organizations and its users, all the necessary data (Portal project with roles and app, users, login requirements, identity providers, branding) should be set up in [Console](https://{your_domain}.zitadel.cloud/ui/console/org). After having determined the constellation of the organizations and its users, all the necessary data (Portal project with roles and app, users, login requirements, identity providers, branding) should be set up in [Console](https://{your_domain}.zitadel.cloud/ui/console/org).
A B2B sample application for NextJS can be found in our [Example Repo](https://github.com/zitadel/zitadel-examples). A B2B [sample application](https://github.com/zitadel/zitadel-nextjs-b2b). for NextJS can be found [here](../../examples/login/nextjs-b2b).
To allow another organization to use a project, a project grant has to be created. Upon creation, roles for a grant can be limited to a subset of the total project roles. To allow another organization to use a project, a project grant has to be created. Upon creation, roles for a grant can be limited to a subset of the total project roles.

View File

@ -46,3 +46,31 @@ This will have the following impacts:
To request the organization send either the the organization id (`urn:zitadel:iam:org:id:{id}`) or organization primary domain (`urn:zitadel:iam:org:domain:primary:{domainname}`) scope on your authentication request from your application. To request the organization send either the the organization id (`urn:zitadel:iam:org:id:{id}`) or organization primary domain (`urn:zitadel:iam:org:domain:primary:{domainname}`) scope on your authentication request from your application.
More about the [scopes](../../apis/openidoauth/scopes#reserved-scopes) More about the [scopes](../../apis/openidoauth/scopes#reserved-scopes)
## Use email to login
There are two different possibilities to achieve login with an email.
1. Use an email address as username
2. Use the email field of the user as additional login to the username
![Domain Policy: Organization domain as suffix](/img/guides/scenarios/domain_policy_org_domain_disabled.png)
### Use an email address as username
To be able to use the email as username you have to disable the attribute "User Loginname must contain orgdomain" on your domain settings.
This means that all your users will not be suffixed with the domain of your organization and you can enter the email as username.
All usernames will then be globally unique within your instance.
You can either set this attribute on your whole ZITADEL instance or just on some specific organizations.
### Use the email field of the user as additional login to the username
No matter how the username of your user does look like.
You can additionally allow login with the email attribute of the user.
You can find this in the "Login Behaviour and Security" Setting of your instance or organizations.
Go to the "Advanced" section, per default login with email address should be allowed. It is possible to disable it.
![Login Policy Advanced Setting: Disable email for login](/img/guides/scenarios/login_policy_advanced.png)

View File

@ -3,7 +3,7 @@
[[redirects]] [[redirects]]
from = "/proxy/js/script.js" from = "/proxy/js/script.js"
to = "https://plausible.io/js/plausible.js" to = "https://plausible.io/js/script.outbound-links.js"
status = 200 status = 200
force = true force = true

View File

@ -9,6 +9,7 @@ module.exports = {
"examples/login/react", "examples/login/react",
"examples/login/flutter", "examples/login/flutter",
"examples/login/nextjs", "examples/login/nextjs",
"examples/login/nextjs-b2b",
], ],
collapsed: false, collapsed: false,
}, },
@ -132,6 +133,7 @@ module.exports = {
items: [ items: [
"guides/integrate/serviceusers", "guides/integrate/serviceusers",
"guides/integrate/access-zitadel-apis", "guides/integrate/access-zitadel-apis",
"guides/integrate/pat",
"guides/integrate/access-zitadel-system-api", "guides/integrate/access-zitadel-system-api",
"guides/integrate/export-and-import", "guides/integrate/export-and-import",
], ],

View File

@ -29,6 +29,7 @@
background-position: center; background-position: center;
padding: 0.5rem 0; padding: 0.5rem 0;
pointer-events: none; pointer-events: none;
box-shadow: none !important;
} }
.fillspace { .fillspace {

Binary file not shown.

After

Width:  |  Height:  |  Size: 648 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 67 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 52 KiB

BIN
docs/static/img/nextjs-b2b/home.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

View File

@ -1,6 +1,7 @@
package handler package handler
import ( import (
"context"
"time" "time"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
@ -28,10 +29,10 @@ func (h *handler) Eventstore() v1.Eventstore {
return h.es return h.es
} }
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler { func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, static static.Storage) []query.Handler {
handlers := []query.Handler{} handlers := []query.Handler{}
if static != nil { if static != nil {
handlers = append(handlers, newStyling( handlers = append(handlers, newStyling(ctx,
handler{view, bulkLimit, configs.cycleDuration("Styling"), errorCount, es}, handler{view, bulkLimit, configs.cycleDuration("Styling"), errorCount, es},
static)) static))
} }

View File

@ -34,21 +34,21 @@ type Styling struct {
subscription *v1.Subscription subscription *v1.Subscription
} }
func newStyling(handler handler, static static.Storage) *Styling { func newStyling(ctx context.Context, handler handler, static static.Storage) *Styling {
h := &Styling{ h := &Styling{
handler: handler, handler: handler,
static: static, static: static,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (m *Styling) subscribe() { func (m *Styling) subscribe(ctx context.Context) {
m.subscription = m.es.Subscribe(m.AggregateTypes()...) m.subscription = m.es.Subscribe(m.AggregateTypes()...)
go func() { go func() {
for event := range m.subscription.Events { for event := range m.subscription.Events {
query.ReduceEvent(m, event) query.ReduceEvent(ctx, m, event)
} }
}() }()
} }
@ -73,15 +73,15 @@ func (m *Styling) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { func (m *Styling) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := m.view.GetLatestStylingSequences(instanceIDs...) sequences, err := m.view.GetLatestStylingSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
searchQuery := models.NewSearchQuery() searchQuery := models.NewSearchQuery()
for _, sequence := range sequences {
var seq uint64
for _, instanceID := range instanceIDs { for _, instanceID := range instanceIDs {
var seq uint64
for _, sequence := range sequences {
if sequence.InstanceID == instanceID { if sequence.InstanceID == instanceID {
seq = sequence.CurrentSequence seq = sequence.CurrentSequence
break break
@ -90,7 +90,7 @@ func (m *Styling) EventQuery(instanceIDs ...string) (*models.SearchQuery, error)
searchQuery.AddQuery(). searchQuery.AddQuery().
AggregateTypeFilter(m.AggregateTypes()...). AggregateTypeFilter(m.AggregateTypes()...).
LatestSequenceFilter(seq). LatestSequenceFilter(seq).
InstanceIDFilter(sequence.InstanceID) InstanceIDFilter(instanceID)
} }
return searchQuery, nil return searchQuery, nil
} }
@ -166,12 +166,12 @@ func (m *Styling) processLabelPolicy(event *models.Event) (err error) {
} }
func (m *Styling) OnError(event *models.Event, err error) error { func (m *Styling) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-2m9fs", "id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in label policy handler")
return spooler.HandleError(event, err, m.view.GetLatestStylingFailedEvent, m.view.ProcessedStylingFailedEvent, m.view.ProcessedStylingSequence, m.errorCountUntilSkip) return spooler.HandleError(event, err, m.view.GetLatestStylingFailedEvent, m.view.ProcessedStylingFailedEvent, m.view.ProcessedStylingSequence, m.errorCountUntilSkip)
} }
func (m *Styling) OnSuccess() error { func (m *Styling) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp) return spooler.HandleSuccess(m.view.UpdateStylingSpoolerRunTimestamp, instanceIDs)
} }
func (m *Styling) generateStylingFile(policy *iam_model.LabelPolicyView) error { func (m *Styling) generateStylingFile(policy *iam_model.LabelPolicyView) error {

View File

@ -7,6 +7,7 @@ import (
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/eventstore" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/eventstore"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/spooler" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/spooler"
admin_view "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" admin_view "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
eventstore2 "github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1" v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/static" "github.com/zitadel/zitadel/internal/static"
@ -22,7 +23,7 @@ type EsRepository struct {
eventstore.AdministratorRepo eventstore.AdministratorRepo
} }
func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository, error) { func Start(ctx context.Context, conf Config, static static.Storage, dbClient *sql.DB, esV2 *eventstore2.Eventstore) (*EsRepository, error) {
es, err := v1.Start(dbClient) es, err := v1.Start(dbClient)
if err != nil { if err != nil {
return nil, err return nil, err
@ -32,7 +33,7 @@ func Start(conf Config, static static.Storage, dbClient *sql.DB) (*EsRepository,
return nil, err return nil, err
} }
spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, static) spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, static)
return &EsRepository{ return &EsRepository{
spooler: spool, spooler: spool,

View File

@ -1,14 +1,15 @@
package spooler package spooler
import ( import (
"context"
"database/sql" "database/sql"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/static"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/handler" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/handler"
"github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view" "github.com/zitadel/zitadel/internal/admin/repository/eventsourcing/view"
"github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/static"
) )
type SpoolerConfig struct { type SpoolerConfig struct {
@ -19,13 +20,14 @@ type SpoolerConfig struct {
Handlers handler.Configs Handlers handler.Configs
} }
func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler { func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, sql *sql.DB, static static.Storage) *spooler.Spooler {
spoolerConfig := spooler.Config{ spoolerConfig := spooler.Config{
Eventstore: es, Eventstore: es,
EventstoreV2: esV2,
Locker: &locker{dbClient: sql}, Locker: &locker{dbClient: sql},
ConcurrentWorkers: c.ConcurrentWorkers, ConcurrentWorkers: c.ConcurrentWorkers,
ConcurrentInstances: c.ConcurrentInstances, ConcurrentInstances: c.ConcurrentInstances,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static), ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, static),
} }
spool := spoolerConfig.New() spool := spoolerConfig.New()
spool.Start() spool.Start()

View File

@ -19,16 +19,16 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID)
} }
func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...) return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
} }
func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) { func (v *View) AllCurrentSequences(db, instanceID string) ([]*repository.CurrentSequence, error) {
return repository.AllCurrentSequences(v.Db, db+".current_sequences", instanceID) return repository.AllCurrentSequences(v.Db, db+".current_sequences", instanceID)
} }
func (v *View) updateSpoolerRunSequence(viewName string) error { func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
if err != nil { if err != nil {
return err return err
} }
@ -41,12 +41,6 @@ func (v *View) updateSpoolerRunSequence(viewName string) error {
return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences) return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences)
} }
func (v *View) GetCurrentSequence(db, viewName string) ([]*repository.CurrentSequence, error) {
sequenceTable := db + ".current_sequences"
fullView := db + "." + viewName
return repository.LatestSequences(v.Db, sequenceTable, fullView)
}
func (v *View) ClearView(db, viewName string) error { func (v *View) ClearView(db, viewName string) error {
truncateView := db + "." + viewName truncateView := db + "." + viewName
sequenceTable := db + ".current_sequences" sequenceTable := db + ".current_sequences"

View File

@ -35,16 +35,16 @@ func (v *View) GetLatestStylingSequence(instanceID string) (*global_view.Current
return v.latestSequence(stylingTyble, instanceID) return v.latestSequence(stylingTyble, instanceID)
} }
func (v *View) GetLatestStylingSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { func (v *View) GetLatestStylingSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(stylingTyble, instanceIDs...) return v.latestSequences(stylingTyble, instanceIDs)
} }
func (v *View) ProcessedStylingSequence(event *models.Event) error { func (v *View) ProcessedStylingSequence(event *models.Event) error {
return v.saveCurrentSequence(stylingTyble, event) return v.saveCurrentSequence(stylingTyble, event)
} }
func (v *View) UpdateStylingSpoolerRunTimestamp() error { func (v *View) UpdateStylingSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(stylingTyble) return v.updateSpoolerRunSequence(stylingTyble, instanceIDs)
} }
func (v *View) GetLatestStylingFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { func (v *View) GetLatestStylingFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -33,24 +33,24 @@ func (h *handler) Eventstore() v1.Eventstore {
return h.es return h.es
} }
func Register(configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler { func Register(ctx context.Context, configs Configs, bulkLimit, errorCount uint64, view *view.View, es v1.Eventstore, systemDefaults sd.SystemDefaults, queries *query2.Queries) []query.Handler {
return []query.Handler{ return []query.Handler{
newUser( newUser(ctx,
handler{view, bulkLimit, configs.cycleDuration("User"), errorCount, es}, queries), handler{view, bulkLimit, configs.cycleDuration("User"), errorCount, es}, queries),
newUserSession( newUserSession(ctx,
handler{view, bulkLimit, configs.cycleDuration("UserSession"), errorCount, es}, queries), handler{view, bulkLimit, configs.cycleDuration("UserSession"), errorCount, es}, queries),
newToken( newToken(ctx,
handler{view, bulkLimit, configs.cycleDuration("Token"), errorCount, es}), handler{view, bulkLimit, configs.cycleDuration("Token"), errorCount, es}),
newIDPConfig( newIDPConfig(ctx,
handler{view, bulkLimit, configs.cycleDuration("IDPConfig"), errorCount, es}), handler{view, bulkLimit, configs.cycleDuration("IDPConfig"), errorCount, es}),
newIDPProvider( newIDPProvider(ctx,
handler{view, bulkLimit, configs.cycleDuration("IDPProvider"), errorCount, es}, handler{view, bulkLimit, configs.cycleDuration("IDPProvider"), errorCount, es},
systemDefaults, queries), systemDefaults, queries),
newExternalIDP( newExternalIDP(ctx,
handler{view, bulkLimit, configs.cycleDuration("ExternalIDP"), errorCount, es}, handler{view, bulkLimit, configs.cycleDuration("ExternalIDP"), errorCount, es},
systemDefaults, queries), systemDefaults, queries),
newRefreshToken(handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}), newRefreshToken(ctx, handler{view, bulkLimit, configs.cycleDuration("RefreshToken"), errorCount, es}),
newOrgProjectMapping(handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}), newOrgProjectMapping(ctx, handler{view, bulkLimit, configs.cycleDuration("OrgProjectMapping"), errorCount, es}),
} }
} }
@ -80,9 +80,9 @@ func withInstanceID(ctx context.Context, instanceID string) context.Context {
func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []models.AggregateType, instanceIDs []string) *models.SearchQuery { func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []models.AggregateType, instanceIDs []string) *models.SearchQuery {
searchQuery := models.NewSearchQuery() searchQuery := models.NewSearchQuery()
for _, sequence := range sequences {
var seq uint64
for _, instanceID := range instanceIDs { for _, instanceID := range instanceIDs {
var seq uint64
for _, sequence := range sequences {
if sequence.InstanceID == instanceID { if sequence.InstanceID == instanceID {
seq = sequence.CurrentSequence seq = sequence.CurrentSequence
break break
@ -91,7 +91,7 @@ func newSearchQuery(sequences []*repository.CurrentSequence, aggregateTypes []mo
searchQuery.AddQuery(). searchQuery.AddQuery().
AggregateTypeFilter(aggregateTypes...). AggregateTypeFilter(aggregateTypes...).
LatestSequenceFilter(seq). LatestSequenceFilter(seq).
InstanceIDFilter(sequence.InstanceID) InstanceIDFilter(instanceID)
} }
return searchQuery return searchQuery
} }

View File

@ -1,6 +1,8 @@
package handler package handler
import ( import (
"context"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
@ -23,21 +25,21 @@ type IDPConfig struct {
subscription *v1.Subscription subscription *v1.Subscription
} }
func newIDPConfig(h handler) *IDPConfig { func newIDPConfig(ctx context.Context, h handler) *IDPConfig {
idpConfig := &IDPConfig{ idpConfig := &IDPConfig{
handler: h, handler: h,
} }
idpConfig.subscribe() idpConfig.subscribe(ctx)
return idpConfig return idpConfig
} }
func (i *IDPConfig) subscribe() { func (i *IDPConfig) subscribe(ctx context.Context) {
i.subscription = i.es.Subscribe(i.AggregateTypes()...) i.subscription = i.es.Subscribe(i.AggregateTypes()...)
go func() { go func() {
for event := range i.subscription.Events { for event := range i.subscription.Events {
query.ReduceEvent(i, event) query.ReduceEvent(ctx, i, event)
} }
}() }()
} }
@ -62,8 +64,8 @@ func (i *IDPConfig) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (i *IDPConfig) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { func (i *IDPConfig) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs...) sequences, err := i.view.GetLatestIDPConfigSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -129,10 +131,10 @@ func (i *IDPConfig) processIdpConfig(providerType iam_model.IDPProviderType, eve
} }
func (i *IDPConfig) OnError(event *models.Event, err error) error { func (i *IDPConfig) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-Ejf8s", "id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in idp config handler")
return spooler.HandleError(event, err, i.view.GetLatestIDPConfigFailedEvent, i.view.ProcessedIDPConfigFailedEvent, i.view.ProcessedIDPConfigSequence, i.errorCountUntilSkip) return spooler.HandleError(event, err, i.view.GetLatestIDPConfigFailedEvent, i.view.ProcessedIDPConfigFailedEvent, i.view.ProcessedIDPConfigSequence, i.errorCountUntilSkip)
} }
func (i *IDPConfig) OnSuccess() error { func (i *IDPConfig) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp) return spooler.HandleSuccess(i.view.UpdateIDPConfigSpoolerRunTimestamp, instanceIDs)
} }

View File

@ -32,6 +32,7 @@ type IDPProvider struct {
} }
func newIDPProvider( func newIDPProvider(
ctx context.Context,
h handler, h handler,
defaults systemdefaults.SystemDefaults, defaults systemdefaults.SystemDefaults,
queries *query2.Queries, queries *query2.Queries,
@ -42,16 +43,16 @@ func newIDPProvider(
queries: queries, queries: queries,
} }
idpProvider.subscribe() idpProvider.subscribe(ctx)
return idpProvider return idpProvider
} }
func (i *IDPProvider) subscribe() { func (i *IDPProvider) subscribe(ctx context.Context) {
i.subscription = i.es.Subscribe(i.AggregateTypes()...) i.subscription = i.es.Subscribe(i.AggregateTypes()...)
go func() { go func() {
for event := range i.subscription.Events { for event := range i.subscription.Events {
query.ReduceEvent(i, event) query.ReduceEvent(ctx, i, event)
} }
}() }()
} }
@ -76,8 +77,8 @@ func (i *IDPProvider) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (i *IDPProvider) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { func (i *IDPProvider) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs...) sequences, err := i.view.GetLatestIDPProviderSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -188,14 +189,14 @@ func (i *IDPProvider) OnError(event *es_models.Event, err error) error {
return spooler.HandleError(event, err, i.view.GetLatestIDPProviderFailedEvent, i.view.ProcessedIDPProviderFailedEvent, i.view.ProcessedIDPProviderSequence, i.errorCountUntilSkip) return spooler.HandleError(event, err, i.view.GetLatestIDPProviderFailedEvent, i.view.ProcessedIDPProviderFailedEvent, i.view.ProcessedIDPProviderSequence, i.errorCountUntilSkip)
} }
func (i *IDPProvider) OnSuccess() error { func (i *IDPProvider) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp) return spooler.HandleSuccess(i.view.UpdateIDPProviderSpoolerRunTimestamp, instanceIDs)
} }
func (i *IDPProvider) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) { func (i *IDPProvider) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) {
return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, aggregateID) return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, aggregateID)
} }
func (u *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) { func (i *IDPProvider) getDefaultIDPConfig(instanceID, idpConfigID string) (*query2.IDP, error) {
return u.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID) return i.queries.IDPByIDAndResourceOwner(withInstanceID(context.Background(), instanceID), false, idpConfigID, instanceID)
} }

View File

@ -1,6 +1,8 @@
package handler package handler
import ( import (
"context"
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore" "github.com/zitadel/zitadel/internal/eventstore"
@ -23,22 +25,23 @@ type OrgProjectMapping struct {
} }
func newOrgProjectMapping( func newOrgProjectMapping(
ctx context.Context,
handler handler, handler handler,
) *OrgProjectMapping { ) *OrgProjectMapping {
h := &OrgProjectMapping{ h := &OrgProjectMapping{
handler: handler, handler: handler,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (k *OrgProjectMapping) subscribe() { func (p *OrgProjectMapping) subscribe(ctx context.Context) {
k.subscription = k.es.Subscribe(k.AggregateTypes()...) p.subscription = p.es.Subscribe(p.AggregateTypes()...)
go func() { go func() {
for event := range k.subscription.Events { for event := range p.subscription.Events {
query.ReduceEvent(k, event) query.ReduceEvent(ctx, p, event)
} }
}() }()
} }
@ -63,8 +66,8 @@ func (p *OrgProjectMapping) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (p *OrgProjectMapping) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { func (p *OrgProjectMapping) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs...) sequences, err := p.view.GetLatestOrgProjectMappingSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -85,15 +88,21 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) {
} }
case project.GrantAddedType: case project.GrantAddedType:
projectGrant := new(view_model.ProjectGrant) projectGrant := new(view_model.ProjectGrant)
projectGrant.SetData(event) err := projectGrant.SetData(event)
if err != nil {
return err
}
mapping.OrgID = projectGrant.GrantedOrgID mapping.OrgID = projectGrant.GrantedOrgID
mapping.ProjectID = event.AggregateID mapping.ProjectID = event.AggregateID
mapping.ProjectGrantID = projectGrant.GrantID mapping.ProjectGrantID = projectGrant.GrantID
mapping.InstanceID = event.InstanceID mapping.InstanceID = event.InstanceID
case project.GrantRemovedType: case project.GrantRemovedType:
projectGrant := new(view_model.ProjectGrant) projectGrant := new(view_model.ProjectGrant)
projectGrant.SetData(event) err := projectGrant.SetData(event)
err := p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID) if err != nil {
return err
}
err = p.view.DeleteOrgProjectMappingsByProjectGrantID(event.AggregateID, event.InstanceID)
if err == nil { if err == nil {
return p.view.ProcessedOrgProjectMappingSequence(event) return p.view.ProcessedOrgProjectMappingSequence(event)
} }
@ -109,10 +118,10 @@ func (p *OrgProjectMapping) Reduce(event *es_models.Event) (err error) {
} }
func (p *OrgProjectMapping) OnError(event *es_models.Event, err error) error { func (p *OrgProjectMapping) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-2k0fS", "id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in org project mapping handler")
return spooler.HandleError(event, err, p.view.GetLatestOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingSequence, p.errorCountUntilSkip) return spooler.HandleError(event, err, p.view.GetLatestOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingFailedEvent, p.view.ProcessedOrgProjectMappingSequence, p.errorCountUntilSkip)
} }
func (p *OrgProjectMapping) OnSuccess() error { func (p *OrgProjectMapping) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp) return spooler.HandleSuccess(p.view.UpdateOrgProjectMappingSpoolerRunTimestamp, instanceIDs)
} }

View File

@ -1,6 +1,7 @@
package handler package handler
import ( import (
"context"
"encoding/json" "encoding/json"
"github.com/zitadel/logging" "github.com/zitadel/logging"
@ -27,22 +28,23 @@ type RefreshToken struct {
} }
func newRefreshToken( func newRefreshToken(
ctx context.Context,
handler handler, handler handler,
) *RefreshToken { ) *RefreshToken {
h := &RefreshToken{ h := &RefreshToken{
handler: handler, handler: handler,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (t *RefreshToken) subscribe() { func (t *RefreshToken) subscribe(ctx context.Context) {
t.subscription = t.es.Subscribe(t.AggregateTypes()...) t.subscription = t.es.Subscribe(t.AggregateTypes()...)
go func() { go func() {
for event := range t.subscription.Events { for event := range t.subscription.Events {
query.ReduceEvent(t, event) query.ReduceEvent(ctx, t, event)
} }
}() }()
} }
@ -67,8 +69,8 @@ func (t *RefreshToken) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (t *RefreshToken) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { func (t *RefreshToken) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs...) sequences, err := t.view.GetLatestRefreshTokenSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -87,7 +89,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) {
case user.HumanRefreshTokenRenewedType: case user.HumanRefreshTokenRenewedType:
e := new(user.HumanRefreshTokenRenewedEvent) e := new(user.HumanRefreshTokenRenewedEvent)
if err := json.Unmarshal(event.Data, e); err != nil { if err := json.Unmarshal(event.Data, e); err != nil {
logging.Log("EVEN-DBbn4").WithError(err).Error("could not unmarshal event data") logging.WithError(err).Error("could not unmarshal event data")
return caos_errs.ThrowInternal(nil, "MODEL-BHn75", "could not unmarshal data") return caos_errs.ThrowInternal(nil, "MODEL-BHn75", "could not unmarshal data")
} }
token, err := t.view.RefreshTokenByID(e.TokenID, event.InstanceID) token, err := t.view.RefreshTokenByID(e.TokenID, event.InstanceID)
@ -102,7 +104,7 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) {
case user.HumanRefreshTokenRemovedType: case user.HumanRefreshTokenRemovedType:
e := new(user.HumanRefreshTokenRemovedEvent) e := new(user.HumanRefreshTokenRemovedEvent)
if err := json.Unmarshal(event.Data, e); err != nil { if err := json.Unmarshal(event.Data, e); err != nil {
logging.Log("EVEN-BDbh3").WithError(err).Error("could not unmarshal event data") logging.WithError(err).Error("could not unmarshal event data")
return caos_errs.ThrowInternal(nil, "MODEL-Bz653", "could not unmarshal data") return caos_errs.ThrowInternal(nil, "MODEL-Bz653", "could not unmarshal data")
} }
return t.view.DeleteRefreshToken(e.TokenID, event.InstanceID, event) return t.view.DeleteRefreshToken(e.TokenID, event.InstanceID, event)
@ -118,10 +120,10 @@ func (t *RefreshToken) Reduce(event *es_models.Event) (err error) {
} }
func (t *RefreshToken) OnError(event *es_models.Event, err error) error { func (t *RefreshToken) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler")
return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip) return spooler.HandleError(event, err, t.view.GetLatestRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenFailedEvent, t.view.ProcessedRefreshTokenSequence, t.errorCountUntilSkip)
} }
func (t *RefreshToken) OnSuccess() error { func (t *RefreshToken) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp) return spooler.HandleSuccess(t.view.UpdateRefreshTokenSpoolerRunTimestamp, instanceIDs)
} }

View File

@ -33,22 +33,23 @@ type Token struct {
} }
func newToken( func newToken(
ctx context.Context,
handler handler, handler handler,
) *Token { ) *Token {
h := &Token{ h := &Token{
handler: handler, handler: handler,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (t *Token) subscribe() { func (t *Token) subscribe(ctx context.Context) {
t.subscription = t.es.Subscribe(t.AggregateTypes()...) t.subscription = t.es.Subscribe(t.AggregateTypes()...)
go func() { go func() {
for event := range t.subscription.Events { for event := range t.subscription.Events {
query.ReduceEvent(t, event) query.ReduceEvent(ctx, t, event)
} }
}() }()
} }
@ -65,16 +66,16 @@ func (_ *Token) AggregateTypes() []es_models.AggregateType {
return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType} return []es_models.AggregateType{user.AggregateType, project.AggregateType, instance.AggregateType}
} }
func (p *Token) CurrentSequence(instanceID string) (uint64, error) { func (t *Token) CurrentSequence(instanceID string) (uint64, error) {
sequence, err := p.view.GetLatestTokenSequence(instanceID) sequence, err := t.view.GetLatestTokenSequence(instanceID)
if err != nil { if err != nil {
return 0, err return 0, err
} }
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (t *Token) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { func (t *Token) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := t.view.GetLatestTokenSequences(instanceIDs...) sequences, err := t.view.GetLatestTokenSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -94,7 +95,10 @@ func (t *Token) Reduce(event *es_models.Event) (err error) {
case user.UserV1ProfileChangedType, case user.UserV1ProfileChangedType,
user.HumanProfileChangedType: user.HumanProfileChangedType:
user := new(view_model.UserView) user := new(view_model.UserView)
user.AppendEvent(event) err := user.AppendEvent(event)
if err != nil {
return err
}
tokens, err := t.view.TokensByUserID(event.AggregateID, event.InstanceID) tokens, err := t.view.TokensByUserID(event.AggregateID, event.InstanceID)
if err != nil { if err != nil {
return err return err
@ -153,14 +157,14 @@ func (t *Token) Reduce(event *es_models.Event) (err error) {
} }
func (t *Token) OnError(event *es_models.Event, err error) error { func (t *Token) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-3jkl4", "id", event.AggregateID).WithError(err).Warn("something went wrong in token handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in token handler")
return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip) return spooler.HandleError(event, err, t.view.GetLatestTokenFailedEvent, t.view.ProcessedTokenFailedEvent, t.view.ProcessedTokenSequence, t.errorCountUntilSkip)
} }
func agentIDFromSession(event *es_models.Event) (string, error) { func agentIDFromSession(event *es_models.Event) (string, error) {
session := make(map[string]interface{}) session := make(map[string]interface{})
if err := json.Unmarshal(event.Data, &session); err != nil { if err := json.Unmarshal(event.Data, &session); err != nil {
logging.Log("EVEN-s3bq9").WithError(err).Error("could not unmarshal event data") logging.WithError(err).Error("could not unmarshal event data")
return "", caos_errs.ThrowInternal(nil, "MODEL-sd325", "could not unmarshal data") return "", caos_errs.ThrowInternal(nil, "MODEL-sd325", "could not unmarshal data")
} }
return session["userAgentID"].(string), nil return session["userAgentID"].(string), nil
@ -169,7 +173,7 @@ func agentIDFromSession(event *es_models.Event) (string, error) {
func applicationFromSession(event *es_models.Event) (*project_es_model.Application, error) { func applicationFromSession(event *es_models.Event) (*project_es_model.Application, error) {
application := new(project_es_model.Application) application := new(project_es_model.Application)
if err := json.Unmarshal(event.Data, &application); err != nil { if err := json.Unmarshal(event.Data, &application); err != nil {
logging.Log("EVEN-GRE2q").WithError(err).Error("could not unmarshal event data") logging.WithError(err).Error("could not unmarshal event data")
return nil, caos_errs.ThrowInternal(nil, "MODEL-Hrw1q", "could not unmarshal data") return nil, caos_errs.ThrowInternal(nil, "MODEL-Hrw1q", "could not unmarshal data")
} }
return application, nil return application, nil
@ -178,7 +182,7 @@ func applicationFromSession(event *es_models.Event) (*project_es_model.Applicati
func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) { func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) {
removed := make(map[string]interface{}) removed := make(map[string]interface{})
if err := json.Unmarshal(event.Data, &removed); err != nil { if err := json.Unmarshal(event.Data, &removed); err != nil {
logging.Log("EVEN-Sdff3").WithError(err).Error("could not unmarshal event data") logging.WithError(err).Error("could not unmarshal event data")
return "", caos_errs.ThrowInternal(nil, "MODEL-Sff32", "could not unmarshal data") return "", caos_errs.ThrowInternal(nil, "MODEL-Sff32", "could not unmarshal data")
} }
return removed["tokenId"].(string), nil return removed["tokenId"].(string), nil
@ -187,14 +191,14 @@ func tokenIDFromRemovedEvent(event *es_models.Event) (string, error) {
func refreshTokenIDFromRemovedEvent(event *es_models.Event) (string, error) { func refreshTokenIDFromRemovedEvent(event *es_models.Event) (string, error) {
removed := make(map[string]interface{}) removed := make(map[string]interface{})
if err := json.Unmarshal(event.Data, &removed); err != nil { if err := json.Unmarshal(event.Data, &removed); err != nil {
logging.Log("EVEN-Ff23g").WithError(err).Error("could not unmarshal event data") logging.WithError(err).Error("could not unmarshal event data")
return "", caos_errs.ThrowInternal(nil, "MODEL-Dfb3w", "could not unmarshal data") return "", caos_errs.ThrowInternal(nil, "MODEL-Dfb3w", "could not unmarshal data")
} }
return removed["tokenId"].(string), nil return removed["tokenId"].(string), nil
} }
func (t *Token) OnSuccess() error { func (t *Token) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp) return spooler.HandleSuccess(t.view.UpdateTokenSpoolerRunTimestamp, instanceIDs)
} }
func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) { func (t *Token) getProjectByID(ctx context.Context, projID, instanceID string) (*proj_model.Project, error) {

View File

@ -34,6 +34,7 @@ type User struct {
} }
func newUser( func newUser(
ctx context.Context,
handler handler, handler handler,
queries *query2.Queries, queries *query2.Queries,
) *User { ) *User {
@ -42,16 +43,16 @@ func newUser(
queries: queries, queries: queries,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (k *User) subscribe() { func (u *User) subscribe(ctx context.Context) {
k.subscription = k.es.Subscribe(k.AggregateTypes()...) u.subscription = u.es.Subscribe(u.AggregateTypes()...)
go func() { go func() {
for event := range k.subscription.Events { for event := range u.subscription.Events {
query.ReduceEvent(k, event) query.ReduceEvent(ctx, u, event)
} }
}() }()
} }
@ -75,8 +76,8 @@ func (u *User) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (u *User) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { func (u *User) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSequences(instanceIDs...) sequences, err := u.view.GetLatestUserSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -275,12 +276,12 @@ func (u *User) fillPreferredLoginNamesOnOrgUsers(event *es_models.Event) error {
} }
func (u *User) OnError(event *es_models.Event, err error) error { func (u *User) OnError(event *es_models.Event, err error) error {
logging.LogWithFields("SPOOL-is8aAWima", "id", event.AggregateID).WithError(err).Warn("something went wrong in user handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user handler")
return spooler.HandleError(event, err, u.view.GetLatestUserFailedEvent, u.view.ProcessedUserFailedEvent, u.view.ProcessedUserSequence, u.errorCountUntilSkip) return spooler.HandleError(event, err, u.view.GetLatestUserFailedEvent, u.view.ProcessedUserFailedEvent, u.view.ProcessedUserSequence, u.errorCountUntilSkip)
} }
func (u *User) OnSuccess() error { func (u *User) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp) return spooler.HandleSuccess(u.view.UpdateUserSpoolerRunTimestamp, instanceIDs)
} }
func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) { func (u *User) getOrgByID(ctx context.Context, orgID, instanceID string) (*org_model.Org, error) {

View File

@ -33,6 +33,7 @@ type ExternalIDP struct {
} }
func newExternalIDP( func newExternalIDP(
ctx context.Context,
handler handler, handler handler,
defaults systemdefaults.SystemDefaults, defaults systemdefaults.SystemDefaults,
queries *query2.Queries, queries *query2.Queries,
@ -43,16 +44,16 @@ func newExternalIDP(
queries: queries, queries: queries,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (i *ExternalIDP) subscribe() { func (i *ExternalIDP) subscribe(ctx context.Context) {
i.subscription = i.es.Subscribe(i.AggregateTypes()...) i.subscription = i.es.Subscribe(i.AggregateTypes()...)
go func() { go func() {
for event := range i.subscription.Events { for event := range i.subscription.Events {
query.ReduceEvent(i, event) query.ReduceEvent(ctx, i, event)
} }
}() }()
} }
@ -77,8 +78,8 @@ func (i *ExternalIDP) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (i *ExternalIDP) EventQuery(instanceIDs ...string) (*es_models.SearchQuery, error) { func (i *ExternalIDP) EventQuery(instanceIDs []string) (*es_models.SearchQuery, error) {
sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs...) sequences, err := i.view.GetLatestExternalIDPSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -178,8 +179,8 @@ func (i *ExternalIDP) OnError(event *es_models.Event, err error) error {
return spooler.HandleError(event, err, i.view.GetLatestExternalIDPFailedEvent, i.view.ProcessedExternalIDPFailedEvent, i.view.ProcessedExternalIDPSequence, i.errorCountUntilSkip) return spooler.HandleError(event, err, i.view.GetLatestExternalIDPFailedEvent, i.view.ProcessedExternalIDPFailedEvent, i.view.ProcessedExternalIDPSequence, i.errorCountUntilSkip)
} }
func (i *ExternalIDP) OnSuccess() error { func (i *ExternalIDP) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp) return spooler.HandleSuccess(i.view.UpdateExternalIDPSpoolerRunTimestamp, instanceIDs)
} }
func (i *ExternalIDP) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) { func (i *ExternalIDP) getOrgIDPConfig(instanceID, aggregateID, idpConfigID string) (*query2.IDP, error) {

View File

@ -33,22 +33,22 @@ type UserSession struct {
queries *query2.Queries queries *query2.Queries
} }
func newUserSession(handler handler, queries *query2.Queries) *UserSession { func newUserSession(ctx context.Context, handler handler, queries *query2.Queries) *UserSession {
h := &UserSession{ h := &UserSession{
handler: handler, handler: handler,
queries: queries, queries: queries,
} }
h.subscribe() h.subscribe(ctx)
return h return h
} }
func (k *UserSession) subscribe() { func (u *UserSession) subscribe(ctx context.Context) {
k.subscription = k.es.Subscribe(k.AggregateTypes()...) u.subscription = u.es.Subscribe(u.AggregateTypes()...)
go func() { go func() {
for event := range k.subscription.Events { for event := range u.subscription.Events {
query.ReduceEvent(k, event) query.ReduceEvent(ctx, u, event)
} }
}() }()
} }
@ -73,8 +73,8 @@ func (u *UserSession) CurrentSequence(instanceID string) (uint64, error) {
return sequence.CurrentSequence, nil return sequence.CurrentSequence, nil
} }
func (u *UserSession) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { func (u *UserSession) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs...) sequences, err := u.view.GetLatestUserSessionSequences(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -162,12 +162,12 @@ func (u *UserSession) Reduce(event *models.Event) (err error) {
} }
func (u *UserSession) OnError(event *models.Event, err error) error { func (u *UserSession) OnError(event *models.Event, err error) error {
logging.LogWithFields("SPOOL-sdfw3s", "id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler") logging.WithFields("id", event.AggregateID).WithError(err).Warn("something went wrong in user session handler")
return spooler.HandleError(event, err, u.view.GetLatestUserSessionFailedEvent, u.view.ProcessedUserSessionFailedEvent, u.view.ProcessedUserSessionSequence, u.errorCountUntilSkip) return spooler.HandleError(event, err, u.view.GetLatestUserSessionFailedEvent, u.view.ProcessedUserSessionFailedEvent, u.view.ProcessedUserSessionSequence, u.errorCountUntilSkip)
} }
func (u *UserSession) OnSuccess() error { func (u *UserSession) OnSuccess(instanceIDs []string) error {
return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp) return spooler.HandleSuccess(u.view.UpdateUserSessionSpoolerRunTimestamp, instanceIDs)
} }
func (u *UserSession) updateSession(session *view_model.UserSessionView, event *models.Event) error { func (u *UserSession) updateSession(session *view_model.UserSessionView, event *models.Event) error {

View File

@ -11,6 +11,7 @@ import (
"github.com/zitadel/zitadel/internal/command" "github.com/zitadel/zitadel/internal/command"
sd "github.com/zitadel/zitadel/internal/config/systemdefaults" sd "github.com/zitadel/zitadel/internal/config/systemdefaults"
"github.com/zitadel/zitadel/internal/crypto" "github.com/zitadel/zitadel/internal/crypto"
eventstore2 "github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1" v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler" es_spol "github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/id"
@ -33,7 +34,7 @@ type EsRepository struct {
eventstore.OrgRepository eventstore.OrgRepository
} }
func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) { func Start(ctx context.Context, conf Config, systemDefaults sd.SystemDefaults, command *command.Commands, queries *query.Queries, dbClient *sql.DB, esV2 *eventstore2.Eventstore, oidcEncryption crypto.EncryptionAlgorithm, userEncryption crypto.EncryptionAlgorithm) (*EsRepository, error) {
es, err := v1.Start(dbClient) es, err := v1.Start(dbClient)
if err != nil { if err != nil {
return nil, err return nil, err
@ -47,7 +48,7 @@ func Start(conf Config, systemDefaults sd.SystemDefaults, command *command.Comma
authReq := cache.Start(dbClient) authReq := cache.Start(dbClient)
spool := spooler.StartSpooler(conf.Spooler, es, view, dbClient, systemDefaults, queries) spool := spooler.StartSpooler(ctx, conf.Spooler, es, esV2, view, dbClient, systemDefaults, queries)
userRepo := eventstore.UserRepo{ userRepo := eventstore.UserRepo{
SearchLimit: conf.SearchLimit, SearchLimit: conf.SearchLimit,

View File

@ -1,15 +1,16 @@
package spooler package spooler
import ( import (
"context"
"database/sql" "database/sql"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/query"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/handler" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/handler"
"github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view" "github.com/zitadel/zitadel/internal/auth/repository/eventsourcing/view"
sd "github.com/zitadel/zitadel/internal/config/systemdefaults" sd "github.com/zitadel/zitadel/internal/config/systemdefaults"
"github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/spooler" "github.com/zitadel/zitadel/internal/eventstore/v1/spooler"
"github.com/zitadel/zitadel/internal/query"
) )
type SpoolerConfig struct { type SpoolerConfig struct {
@ -20,13 +21,14 @@ type SpoolerConfig struct {
Handlers handler.Configs Handlers handler.Configs
} }
func StartSpooler(c SpoolerConfig, es v1.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler { func StartSpooler(ctx context.Context, c SpoolerConfig, es v1.Eventstore, esV2 *eventstore.Eventstore, view *view.View, client *sql.DB, systemDefaults sd.SystemDefaults, queries *query.Queries) *spooler.Spooler {
spoolerConfig := spooler.Config{ spoolerConfig := spooler.Config{
Eventstore: es, Eventstore: es,
EventstoreV2: esV2,
Locker: &locker{dbClient: client}, Locker: &locker{dbClient: client},
ConcurrentWorkers: c.ConcurrentWorkers, ConcurrentWorkers: c.ConcurrentWorkers,
ConcurrentInstances: c.ConcurrentInstances, ConcurrentInstances: c.ConcurrentInstances,
ViewHandlers: handler.Register(c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries), ViewHandlers: handler.Register(ctx, c.Handlers, c.BulkLimit, c.FailureCountUntilSkip, view, es, systemDefaults, queries),
} }
spool := spoolerConfig.New() spool := spoolerConfig.New()
spool.Start() spool.Start()

View File

@ -68,16 +68,16 @@ func (v *View) GetLatestExternalIDPSequence(instanceID string) (*global_view.Cur
return v.latestSequence(externalIDPTable, instanceID) return v.latestSequence(externalIDPTable, instanceID)
} }
func (v *View) GetLatestExternalIDPSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { func (v *View) GetLatestExternalIDPSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(externalIDPTable, instanceIDs...) return v.latestSequences(externalIDPTable, instanceIDs)
} }
func (v *View) ProcessedExternalIDPSequence(event *models.Event) error { func (v *View) ProcessedExternalIDPSequence(event *models.Event) error {
return v.saveCurrentSequence(externalIDPTable, event) return v.saveCurrentSequence(externalIDPTable, event)
} }
func (v *View) UpdateExternalIDPSpoolerRunTimestamp() error { func (v *View) UpdateExternalIDPSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(externalIDPTable) return v.updateSpoolerRunSequence(externalIDPTable, instanceIDs)
} }
func (v *View) GetLatestExternalIDPFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { func (v *View) GetLatestExternalIDPFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -53,16 +53,16 @@ func (v *View) GetLatestIDPConfigSequence(instanceID string) (*global_view.Curre
return v.latestSequence(idpConfigTable, instanceID) return v.latestSequence(idpConfigTable, instanceID)
} }
func (v *View) GetLatestIDPConfigSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { func (v *View) GetLatestIDPConfigSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(idpConfigTable, instanceIDs...) return v.latestSequences(idpConfigTable, instanceIDs)
} }
func (v *View) ProcessedIDPConfigSequence(event *models.Event) error { func (v *View) ProcessedIDPConfigSequence(event *models.Event) error {
return v.saveCurrentSequence(idpConfigTable, event) return v.saveCurrentSequence(idpConfigTable, event)
} }
func (v *View) UpdateIDPConfigSpoolerRunTimestamp() error { func (v *View) UpdateIDPConfigSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(idpConfigTable) return v.updateSpoolerRunSequence(idpConfigTable, instanceIDs)
} }
func (v *View) GetLatestIDPConfigFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { func (v *View) GetLatestIDPConfigFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -73,16 +73,16 @@ func (v *View) GetLatestIDPProviderSequence(instanceID string) (*global_view.Cur
return v.latestSequence(idpProviderTable, instanceID) return v.latestSequence(idpProviderTable, instanceID)
} }
func (v *View) GetLatestIDPProviderSequences(instanceIDs ...string) ([]*global_view.CurrentSequence, error) { func (v *View) GetLatestIDPProviderSequences(instanceIDs []string) ([]*global_view.CurrentSequence, error) {
return v.latestSequences(idpProviderTable, instanceIDs...) return v.latestSequences(idpProviderTable, instanceIDs)
} }
func (v *View) ProcessedIDPProviderSequence(event *models.Event) error { func (v *View) ProcessedIDPProviderSequence(event *models.Event) error {
return v.saveCurrentSequence(idpProviderTable, event) return v.saveCurrentSequence(idpProviderTable, event)
} }
func (v *View) UpdateIDPProviderSpoolerRunTimestamp() error { func (v *View) UpdateIDPProviderSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(idpProviderTable) return v.updateSpoolerRunSequence(idpProviderTable, instanceIDs)
} }
func (v *View) GetLatestIDPProviderFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) { func (v *View) GetLatestIDPProviderFailedEvent(sequence uint64, instanceID string) (*global_view.FailedEvent, error) {

View File

@ -52,16 +52,16 @@ func (v *View) GetLatestOrgProjectMappingSequence(instanceID string) (*repositor
return v.latestSequence(orgPrgojectMappingTable, instanceID) return v.latestSequence(orgPrgojectMappingTable, instanceID)
} }
func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestOrgProjectMappingSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(orgPrgojectMappingTable, instanceIDs...) return v.latestSequences(orgPrgojectMappingTable, instanceIDs)
} }
func (v *View) ProcessedOrgProjectMappingSequence(event *models.Event) error { func (v *View) ProcessedOrgProjectMappingSequence(event *models.Event) error {
return v.saveCurrentSequence(orgPrgojectMappingTable, event) return v.saveCurrentSequence(orgPrgojectMappingTable, event)
} }
func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp() error { func (v *View) UpdateOrgProjectMappingSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(orgPrgojectMappingTable) return v.updateSpoolerRunSequence(orgPrgojectMappingTable, instanceIDs)
} }
func (v *View) GetLatestOrgProjectMappingFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { func (v *View) GetLatestOrgProjectMappingFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -77,16 +77,16 @@ func (v *View) GetLatestRefreshTokenSequence(instanceID string) (*repository.Cur
return v.latestSequence(refreshTokenTable, instanceID) return v.latestSequence(refreshTokenTable, instanceID)
} }
func (v *View) GetLatestRefreshTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestRefreshTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(refreshTokenTable, instanceIDs...) return v.latestSequences(refreshTokenTable, instanceIDs)
} }
func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error { func (v *View) ProcessedRefreshTokenSequence(event *models.Event) error {
return v.saveCurrentSequence(refreshTokenTable, event) return v.saveCurrentSequence(refreshTokenTable, event)
} }
func (v *View) UpdateRefreshTokenSpoolerRunTimestamp() error { func (v *View) UpdateRefreshTokenSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(refreshTokenTable) return v.updateSpoolerRunSequence(refreshTokenTable, instanceIDs)
} }
func (v *View) GetLatestRefreshTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { func (v *View) GetLatestRefreshTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -19,12 +19,12 @@ func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentS
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID)
} }
func (v *View) latestSequences(viewName string, instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) latestSequences(viewName string, instanceIDs []string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs...) return repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
} }
func (v *View) updateSpoolerRunSequence(viewName string) error { func (v *View) updateSpoolerRunSequence(viewName string, instanceIDs []string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName) currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName, instanceIDs)
if err != nil { if err != nil {
return err return err
} }

View File

@ -88,16 +88,16 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq
return v.latestSequence(tokenTable, instanceID) return v.latestSequence(tokenTable, instanceID)
} }
func (v *View) GetLatestTokenSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestTokenSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(tokenTable, instanceIDs...) return v.latestSequences(tokenTable, instanceIDs)
} }
func (v *View) ProcessedTokenSequence(event *models.Event) error { func (v *View) ProcessedTokenSequence(event *models.Event) error {
return v.saveCurrentSequence(tokenTable, event) return v.saveCurrentSequence(tokenTable, event)
} }
func (v *View) UpdateTokenSpoolerRunTimestamp() error { func (v *View) UpdateTokenSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(tokenTable) return v.updateSpoolerRunSequence(tokenTable, instanceIDs)
} }
func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -193,16 +193,16 @@ func (v *View) GetLatestUserSequence(instanceID string) (*repository.CurrentSequ
return v.latestSequence(userTable, instanceID) return v.latestSequence(userTable, instanceID)
} }
func (v *View) GetLatestUserSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestUserSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userTable, instanceIDs...) return v.latestSequences(userTable, instanceIDs)
} }
func (v *View) ProcessedUserSequence(event *models.Event) error { func (v *View) ProcessedUserSequence(event *models.Event) error {
return v.saveCurrentSequence(userTable, event) return v.saveCurrentSequence(userTable, event)
} }
func (v *View) UpdateUserSpoolerRunTimestamp() error { func (v *View) UpdateUserSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(userTable) return v.updateSpoolerRunSequence(userTable, instanceIDs)
} }
func (v *View) GetLatestUserFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { func (v *View) GetLatestUserFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -68,16 +68,16 @@ func (v *View) GetLatestUserSessionSequence(instanceID string) (*repository.Curr
return v.latestSequence(userSessionTable, instanceID) return v.latestSequence(userSessionTable, instanceID)
} }
func (v *View) GetLatestUserSessionSequences(instanceIDs ...string) ([]*repository.CurrentSequence, error) { func (v *View) GetLatestUserSessionSequences(instanceIDs []string) ([]*repository.CurrentSequence, error) {
return v.latestSequences(userSessionTable, instanceIDs...) return v.latestSequences(userSessionTable, instanceIDs)
} }
func (v *View) ProcessedUserSessionSequence(event *models.Event) error { func (v *View) ProcessedUserSessionSequence(event *models.Event) error {
return v.saveCurrentSequence(userSessionTable, event) return v.saveCurrentSequence(userSessionTable, event)
} }
func (v *View) UpdateUserSessionSpoolerRunTimestamp() error { func (v *View) UpdateUserSessionSpoolerRunTimestamp(instanceIDs []string) error {
return v.updateSpoolerRunSequence(userSessionTable) return v.updateSpoolerRunSequence(userSessionTable, instanceIDs)
} }
func (v *View) GetLatestUserSessionFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) { func (v *View) GetLatestUserSessionFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {

View File

@ -1,8 +1,6 @@
package view package view
import ( import (
"time"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/view/repository" "github.com/zitadel/zitadel/internal/view/repository"
) )
@ -18,21 +16,3 @@ func (v *View) saveCurrentSequence(viewName string, event *models.Event) error {
func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) { func (v *View) latestSequence(viewName, instanceID string) (*repository.CurrentSequence, error) {
return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID) return repository.LatestSequence(v.Db, sequencesTable, viewName, instanceID)
} }
func (v *View) latestSequences(viewName string) ([]*repository.CurrentSequence, error) {
return repository.LatestSequences(v.Db, sequencesTable, viewName)
}
func (v *View) updateSpoolerRunSequence(viewName string) error {
currentSequences, err := repository.LatestSequences(v.Db, sequencesTable, viewName)
if err != nil {
return err
}
for _, currentSequence := range currentSequences {
if currentSequence.ViewName == "" {
currentSequence.ViewName = viewName
}
currentSequence.LastSuccessfulSpoolerRun = time.Now()
}
return repository.UpdateCurrentSequences(v.Db, sequencesTable, currentSequences)
}

View File

@ -47,15 +47,3 @@ func (v *View) GetLatestTokenSequence(instanceID string) (*repository.CurrentSeq
func (v *View) ProcessedTokenSequence(event *models.Event) error { func (v *View) ProcessedTokenSequence(event *models.Event) error {
return v.saveCurrentSequence(tokenTable, event) return v.saveCurrentSequence(tokenTable, event)
} }
func (v *View) UpdateTokenSpoolerRunTimestamp() error {
return v.updateSpoolerRunSequence(tokenTable)
}
func (v *View) GetLatestTokenFailedEvent(sequence uint64, instanceID string) (*repository.FailedEvent, error) {
return v.latestFailedEvent(tokenTable, instanceID, sequence)
}
func (v *View) ProcessedTokenFailedEvent(failedEvent *repository.FailedEvent) error {
return v.saveFailedEvent(failedEvent)
}

View File

@ -17,10 +17,10 @@ const (
type Handler interface { type Handler interface {
ViewModel() string ViewModel() string
EventQuery(instanceIDs ...string) (*models.SearchQuery, error) EventQuery(instanceIDs []string) (*models.SearchQuery, error)
Reduce(*models.Event) error Reduce(*models.Event) error
OnError(event *models.Event, err error) error OnError(event *models.Event, err error) error
OnSuccess() error OnSuccess(instanceIDs []string) error
MinimumCycleDuration() time.Duration MinimumCycleDuration() time.Duration
LockDuration() time.Duration LockDuration() time.Duration
QueryLimit() uint64 QueryLimit() uint64
@ -32,7 +32,7 @@ type Handler interface {
Subscription() *v1.Subscription Subscription() *v1.Subscription
} }
func ReduceEvent(handler Handler, event *models.Event) { func ReduceEvent(ctx context.Context, handler Handler, event *models.Event) {
defer func() { defer func() {
err := recover() err := recover()
@ -42,7 +42,7 @@ func ReduceEvent(handler Handler, event *models.Event) {
"cause", err, "cause", err,
"stack", string(debug.Stack()), "stack", string(debug.Stack()),
"sequence", event.Sequence, "sequence", event.Sequence,
"instnace", event.InstanceID, "instance", event.InstanceID,
).Error("reduce panicked") ).Error("reduce panicked")
} }
}() }()
@ -60,7 +60,7 @@ func ReduceEvent(handler Handler, event *models.Event) {
SearchQuery(). SearchQuery().
SetLimit(eventLimit) SetLimit(eventLimit)
unprocessedEvents, err := handler.Eventstore().FilterEvents(context.Background(), searchQuery) unprocessedEvents, err := handler.Eventstore().FilterEvents(ctx, searchQuery)
if err != nil { if err != nil {
logging.WithFields("sequence", event.Sequence).Warn("filter failed") logging.WithFields("sequence", event.Sequence).Warn("filter failed")
return return

View File

@ -5,6 +5,7 @@ import (
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1" v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/query" "github.com/zitadel/zitadel/internal/eventstore/v1/query"
"github.com/zitadel/zitadel/internal/id" "github.com/zitadel/zitadel/internal/id"
@ -12,6 +13,7 @@ import (
type Config struct { type Config struct {
Eventstore v1.Eventstore Eventstore v1.Eventstore
EventstoreV2 *eventstore.Eventstore
Locker Locker Locker Locker
ViewHandlers []query.Handler ViewHandlers []query.Handler
ConcurrentWorkers int ConcurrentWorkers int
@ -31,6 +33,7 @@ func (c *Config) New() *Spooler {
handlers: c.ViewHandlers, handlers: c.ViewHandlers,
lockID: lockID, lockID: lockID,
eventstore: c.Eventstore, eventstore: c.Eventstore,
esV2: c.EventstoreV2,
locker: c.Locker, locker: c.Locker,
queue: make(chan *spooledHandler, len(c.ViewHandlers)), queue: make(chan *spooledHandler, len(c.ViewHandlers)),
workers: c.ConcurrentWorkers, workers: c.ConcurrentWorkers,

View File

@ -9,6 +9,8 @@ import (
"github.com/zitadel/logging" "github.com/zitadel/logging"
"github.com/zitadel/zitadel/internal/eventstore"
"github.com/zitadel/zitadel/internal/eventstore/handler"
v1 "github.com/zitadel/zitadel/internal/eventstore/v1" v1 "github.com/zitadel/zitadel/internal/eventstore/v1"
"github.com/zitadel/zitadel/internal/eventstore/v1/models" "github.com/zitadel/zitadel/internal/eventstore/v1/models"
"github.com/zitadel/zitadel/internal/eventstore/v1/query" "github.com/zitadel/zitadel/internal/eventstore/v1/query"
@ -16,13 +18,19 @@ import (
"github.com/zitadel/zitadel/internal/view/repository" "github.com/zitadel/zitadel/internal/view/repository"
) )
const systemID = "system" const (
systemID = "system"
schedulerSucceeded = eventstore.EventType("system.projections.scheduler.succeeded")
aggregateType = eventstore.AggregateType("system")
aggregateID = "SYSTEM"
)
type Spooler struct { type Spooler struct {
handlers []query.Handler handlers []query.Handler
locker Locker locker Locker
lockID string lockID string
eventstore v1.Eventstore eventstore v1.Eventstore
esV2 *eventstore.Eventstore
workers int workers int
queue chan *spooledHandler queue chan *spooledHandler
concurrentInstances int concurrentInstances int
@ -37,7 +45,9 @@ type spooledHandler struct {
locker Locker locker Locker
queuedAt time.Time queuedAt time.Time
eventstore v1.Eventstore eventstore v1.Eventstore
esV2 *eventstore.Eventstore
concurrentInstances int concurrentInstances int
succeededOnce bool
} }
func (s *Spooler) Start() { func (s *Spooler) Start() {
@ -57,7 +67,7 @@ func (s *Spooler) Start() {
} }
go func() { go func() {
for _, handler := range s.handlers { for _, handler := range s.handlers {
s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, concurrentInstances: s.concurrentInstances} s.queue <- &spooledHandler{Handler: handler, locker: s.locker, queuedAt: time.Now(), eventstore: s.eventstore, esV2: s.esV2, concurrentInstances: s.concurrentInstances}
} }
}() }()
} }
@ -68,6 +78,32 @@ func requeueTask(task *spooledHandler, queue chan<- *spooledHandler) {
queue <- task queue <- task
} }
func (s *spooledHandler) hasSucceededOnce(ctx context.Context) (bool, error) {
events, err := s.esV2.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
AddQuery().
AggregateTypes(aggregateType).
AggregateIDs(aggregateID).
EventTypes(schedulerSucceeded).
EventData(map[string]interface{}{
"name": s.ViewModel(),
}).
Builder(),
)
return len(events) > 0 && err == nil, err
}
func (s *spooledHandler) setSucceededOnce(ctx context.Context) error {
_, err := s.esV2.Push(ctx, &handler.ProjectionSucceededEvent{
BaseEvent: *eventstore.NewBaseEventForPush(ctx,
eventstore.NewAggregate(ctx, aggregateID, aggregateType, "v1"),
schedulerSucceeded,
),
Name: s.ViewModel(),
})
s.succeededOnce = err == nil
return err
}
func (s *spooledHandler) load(workerID string) { func (s *spooledHandler) load(workerID string) {
errs := make(chan error) errs := make(chan error)
defer func() { defer func() {
@ -86,8 +122,24 @@ func (s *spooledHandler) load(workerID string) {
hasLocked := s.lock(ctx, errs, workerID) hasLocked := s.lock(ctx, errs, workerID)
if <-hasLocked { if <-hasLocked {
if !s.succeededOnce {
var err error
s.succeededOnce, err = s.hasSucceededOnce(ctx)
if err != nil {
logging.WithFields("view", s.ViewModel()).OnError(err).Warn("initial lock failed for first schedule")
errs <- err
return
}
}
instanceIDQuery := models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("")
for { for {
ids, err := s.eventstore.InstanceIDs(ctx, models.NewSearchQuery().SetColumn(models.Columns_InstanceIDs).AddQuery().ExcludedInstanceIDsFilter("").SearchQuery()) if s.succeededOnce {
// since we have at least one successful run, we can restrict it to events not older than
// twice the requeue time (just to be sure not to miss an event)
instanceIDQuery = instanceIDQuery.CreationDateNewerFilter(time.Now().Add(-2 * s.MinimumCycleDuration()))
}
ids, err := s.eventstore.InstanceIDs(ctx, instanceIDQuery.SearchQuery())
if err != nil { if err != nil {
errs <- err errs <- err
break break
@ -97,12 +149,16 @@ func (s *spooledHandler) load(workerID string) {
if max > len(ids) { if max > len(ids) {
max = len(ids) max = len(ids)
} }
err = s.processInstances(ctx, workerID, ids[i:max]...) err = s.processInstances(ctx, workerID, ids[i:max])
if err != nil { if err != nil {
errs <- err errs <- err
} }
} }
if ctx.Err() == nil { if ctx.Err() == nil {
if !s.succeededOnce {
err = s.setSucceededOnce(ctx)
logging.WithFields("view", s.ViewModel()).OnError(err).Warn("unable to push first schedule succeeded")
}
errs <- nil errs <- nil
} }
break break
@ -111,16 +167,20 @@ func (s *spooledHandler) load(workerID string) {
<-ctx.Done() <-ctx.Done()
} }
func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids ...string) error { func (s *spooledHandler) processInstances(ctx context.Context, workerID string, ids []string) error {
for { for {
events, err := s.query(ctx, ids...) processCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
events, err := s.query(processCtx, ids)
if err != nil { if err != nil {
cancel()
return err return err
} }
if len(events) == 0 { if len(events) == 0 {
cancel()
return nil return nil
} }
err = s.process(ctx, events, workerID) err = s.process(processCtx, events, workerID, ids)
cancel()
if err != nil { if err != nil {
return err return err
} }
@ -139,7 +199,7 @@ func (s *spooledHandler) awaitError(cancel func(), errs chan error, workerID str
} }
} }
func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string) error { func (s *spooledHandler) process(ctx context.Context, events []*models.Event, workerID string, instanceIDs []string) error {
for i, event := range events { for i, event := range events {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -152,17 +212,17 @@ func (s *spooledHandler) process(ctx context.Context, events []*models.Event, wo
continue continue
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
return s.process(ctx, events[i:], workerID) return s.process(ctx, events[i:], workerID, instanceIDs)
} }
} }
} }
err := s.OnSuccess() err := s.OnSuccess(instanceIDs)
logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func") logging.WithFields("view", s.ViewModel(), "worker", workerID, "traceID", tracing.TraceIDFromCtx(ctx)).OnError(err).Warn("could not process on success func")
return err return err
} }
func (s *spooledHandler) query(ctx context.Context, instanceIDs ...string) ([]*models.Event, error) { func (s *spooledHandler) query(ctx context.Context, instanceIDs []string) ([]*models.Event, error) {
query, err := s.EventQuery(instanceIDs...) query, err := s.EventQuery(instanceIDs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -227,6 +287,6 @@ func HandleError(event *models.Event, failedErr error,
return failedErr return failedErr
} }
func HandleSuccess(updateSpoolerRunTimestamp func() error) error { func HandleSuccess(updateSpoolerRunTimestamp func([]string) error, instanceIDs []string) error {
return updateSpoolerRunTimestamp() return updateSpoolerRunTimestamp(instanceIDs)
} }

View File

@ -51,7 +51,7 @@ func (h *testHandler) Subscription() *v1.Subscription {
return nil return nil
} }
func (h *testHandler) EventQuery(instanceIDs ...string) (*models.SearchQuery, error) { func (h *testHandler) EventQuery(instanceIDs []string) (*models.SearchQuery, error) {
if h.queryError != nil { if h.queryError != nil {
return nil, h.queryError return nil, h.queryError
} }
@ -71,7 +71,7 @@ func (h *testHandler) OnError(event *models.Event, err error) error {
return err return err
} }
func (h *testHandler) OnSuccess() error { func (h *testHandler) OnSuccess([]string) error {
return nil return nil
} }
@ -129,6 +129,7 @@ func TestSpooler_process(t *testing.T) {
type args struct { type args struct {
timeout time.Duration timeout time.Duration
events []*models.Event events []*models.Event
instanceIDs []string
} }
tests := []struct { tests := []struct {
name string name string
@ -184,7 +185,7 @@ func TestSpooler_process(t *testing.T) {
start = time.Now() start = time.Now()
} }
if err := s.process(ctx, tt.args.events, "test"); (err != nil) != tt.wantErr { if err := s.process(ctx, tt.args.events, "test", tt.args.instanceIDs); (err != nil) != tt.wantErr {
t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr) t.Errorf("Spooler.process() error = %v, wantErr %v", err, tt.wantErr)
} }
if tt.fields.currentHandler.maxErrCount != tt.wantRetries { if tt.fields.currentHandler.maxErrCount != tt.wantRetries {

View File

@ -2,6 +2,8 @@ package migration
import ( import (
"context" "context"
errs "errors"
"time"
"github.com/zitadel/logging" "github.com/zitadel/logging"
@ -18,6 +20,10 @@ const (
aggregateID = "SYSTEM" aggregateID = "SYSTEM"
) )
var (
errMigrationAlreadyStarted = errs.New("already started")
)
type Migration interface { type Migration interface {
String() string String() string
Execute(context.Context) error Execute(context.Context) error
@ -32,7 +38,7 @@ type RepeatableMigration interface {
func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) { func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration) (err error) {
logging.Infof("verify migration %s", migration.String()) logging.Infof("verify migration %s", migration.String())
if should, err := shouldExec(ctx, es, migration); !should || err != nil { if should, err := checkExec(ctx, es, migration); !should || err != nil {
return err return err
} }
@ -52,6 +58,30 @@ func Migrate(ctx context.Context, es *eventstore.Eventstore, migration Migration
return pushErr return pushErr
} }
// checkExec ensures that only one setup step is done concurrently
// if a setup step is already started, it calls shouldExec after some time again
func checkExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (bool, error) {
timer := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return false, errors.ThrowInternal(nil, "MIGR-as3f7", "Errors.Internal")
case <-timer.C:
should, err := shouldExec(ctx, es, migration)
if err != nil {
if !errs.Is(err, errMigrationAlreadyStarted) {
return false, err
}
logging.WithFields("migration step", migration.String()).
Warn("migration already started, will check again in 5 seconds")
timer.Reset(5 * time.Second)
break
}
return should, nil
}
}
}
func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) { func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migration) (should bool, err error) {
events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent). events, err := es.Filter(ctx, eventstore.NewSearchQueryBuilder(eventstore.ColumnsEvent).
OrderAsc(). OrderAsc().
@ -90,7 +120,7 @@ func shouldExec(ctx context.Context, es *eventstore.Eventstore, migration Migrat
} }
if isStarted { if isStarted {
return false, nil return false, errMigrationAlreadyStarted
} }
repeatable, ok := migration.(RepeatableMigration) repeatable, ok := migration.(RepeatableMigration)
if !ok { if !ok {

View File

@ -36,8 +36,13 @@ func TokensByUserID(db *gorm.DB, table, userID, instanceID string) ([]*usr_model
Method: domain.SearchMethodEquals, Method: domain.SearchMethodEquals,
Value: instanceID, Value: instanceID,
} }
expirationQuery := &model.TokenSearchQuery{
Key: model.TokenSearchKeyExpiration,
Method: domain.SearchMethodGreaterThan,
Value: "now()",
}
query := repository.PrepareSearchQuery(table, usr_model.TokenSearchRequest{ query := repository.PrepareSearchQuery(table, usr_model.TokenSearchRequest{
Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery}, Queries: []*model.TokenSearchQuery{userIDQuery, instanceIDQuery, expirationQuery},
}) })
_, err := query(db, &tokens) _, err := query(db, &tokens)
return tokens, err return tokens, err

View File

@ -169,7 +169,7 @@ func LatestSequence(db *gorm.DB, table, viewName, instanceID string) (*CurrentSe
return nil, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName) return nil, caos_errs.ThrowInternalf(err, "VIEW-9LyCB", "unable to get latest sequence of %s", viewName)
} }
func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs ...string) ([]*CurrentSequence, error) { func LatestSequences(db *gorm.DB, table, viewName string, instanceIDs []string) ([]*CurrentSequence, error) {
searchQueries := []sequenceSearchQuery{ searchQueries := []sequenceSearchQuery{
{key: sequenceSearchKey(SequenceSearchKeyViewName), value: viewName, method: domain.SearchMethodEquals}, {key: sequenceSearchKey(SequenceSearchKeyViewName), value: viewName, method: domain.SearchMethodEquals},
} }