2023-09-15 14:58:45 +00:00
package projection
import (
"context"
"time"
"github.com/zitadel/zitadel/internal/database"
2023-09-22 09:37:16 +00:00
zitadel_errors "github.com/zitadel/zitadel/internal/errors"
2023-09-15 14:58:45 +00:00
"github.com/zitadel/zitadel/internal/eventstore"
2023-10-19 10:19:10 +00:00
old_handler "github.com/zitadel/zitadel/internal/eventstore/handler"
"github.com/zitadel/zitadel/internal/eventstore/handler/v2"
2023-09-15 14:58:45 +00:00
"github.com/zitadel/zitadel/internal/repository/instance"
"github.com/zitadel/zitadel/internal/repository/quota"
)
const (
QuotasProjectionTable = "projections.quotas"
QuotaPeriodsProjectionTable = QuotasProjectionTable + "_" + quotaPeriodsTableSuffix
QuotaNotificationsTable = QuotasProjectionTable + "_" + quotaNotificationsTableSuffix
QuotaColumnID = "id"
QuotaColumnInstanceID = "instance_id"
QuotaColumnUnit = "unit"
QuotaColumnAmount = "amount"
QuotaColumnFrom = "from_anchor"
QuotaColumnInterval = "interval"
QuotaColumnLimit = "limit_usage"
quotaPeriodsTableSuffix = "periods"
QuotaPeriodColumnInstanceID = "instance_id"
QuotaPeriodColumnUnit = "unit"
QuotaPeriodColumnStart = "start"
QuotaPeriodColumnUsage = "usage"
quotaNotificationsTableSuffix = "notifications"
QuotaNotificationColumnInstanceID = "instance_id"
QuotaNotificationColumnUnit = "unit"
QuotaNotificationColumnID = "id"
QuotaNotificationColumnCallURL = "call_url"
QuotaNotificationColumnPercent = "percent"
QuotaNotificationColumnRepeat = "repeat"
QuotaNotificationColumnLatestDuePeriodStart = "latest_due_period_start"
QuotaNotificationColumnNextDueThreshold = "next_due_threshold"
)
const (
incrementQuotaStatement = ` INSERT INTO projections.quotas_periods ` +
` (instance_id, unit, start, usage) ` +
` VALUES ($1, $2, $3, $4) ON CONFLICT (instance_id, unit, start) ` +
` DO UPDATE SET usage = projections.quotas_periods.usage + excluded.usage RETURNING usage `
)
type quotaProjection struct {
2023-10-19 10:19:10 +00:00
handler * handler . Handler
client * database . DB
2023-09-15 14:58:45 +00:00
}
2023-10-19 10:19:10 +00:00
func newQuotaProjection ( ctx context . Context , config handler . Config ) * quotaProjection {
p := & quotaProjection {
client : config . Client ,
}
p . handler = handler . NewHandler ( ctx , & config , p )
return p
}
func ( * quotaProjection ) Name ( ) string {
return QuotasProjectionTable
}
func ( * quotaProjection ) Init ( ) * old_handler . Check {
return handler . NewMultiTableCheck (
handler . NewTable (
[ ] * handler . InitColumn {
handler . NewColumn ( QuotaColumnID , handler . ColumnTypeText ) ,
handler . NewColumn ( QuotaColumnInstanceID , handler . ColumnTypeText ) ,
handler . NewColumn ( QuotaColumnUnit , handler . ColumnTypeEnum ) ,
handler . NewColumn ( QuotaColumnAmount , handler . ColumnTypeInt64 , handler . Nullable ( ) ) ,
handler . NewColumn ( QuotaColumnFrom , handler . ColumnTypeTimestamp , handler . Nullable ( ) ) ,
handler . NewColumn ( QuotaColumnInterval , handler . ColumnTypeInterval , handler . Nullable ( ) ) ,
handler . NewColumn ( QuotaColumnLimit , handler . ColumnTypeBool , handler . Nullable ( ) ) ,
2023-09-15 14:58:45 +00:00
} ,
2023-10-19 10:19:10 +00:00
handler . NewPrimaryKey ( QuotaColumnInstanceID , QuotaColumnUnit ) ,
2023-09-15 14:58:45 +00:00
) ,
2023-10-19 10:19:10 +00:00
handler . NewSuffixedTable (
[ ] * handler . InitColumn {
handler . NewColumn ( QuotaPeriodColumnInstanceID , handler . ColumnTypeText ) ,
handler . NewColumn ( QuotaPeriodColumnUnit , handler . ColumnTypeEnum ) ,
handler . NewColumn ( QuotaPeriodColumnStart , handler . ColumnTypeTimestamp ) ,
handler . NewColumn ( QuotaPeriodColumnUsage , handler . ColumnTypeInt64 ) ,
2023-09-15 14:58:45 +00:00
} ,
2023-10-19 10:19:10 +00:00
handler . NewPrimaryKey ( QuotaPeriodColumnInstanceID , QuotaPeriodColumnUnit , QuotaPeriodColumnStart ) ,
2023-09-15 14:58:45 +00:00
quotaPeriodsTableSuffix ,
) ,
2023-10-19 10:19:10 +00:00
handler . NewSuffixedTable (
[ ] * handler . InitColumn {
handler . NewColumn ( QuotaNotificationColumnInstanceID , handler . ColumnTypeText ) ,
handler . NewColumn ( QuotaNotificationColumnUnit , handler . ColumnTypeEnum ) ,
handler . NewColumn ( QuotaNotificationColumnID , handler . ColumnTypeText ) ,
handler . NewColumn ( QuotaNotificationColumnCallURL , handler . ColumnTypeText ) ,
handler . NewColumn ( QuotaNotificationColumnPercent , handler . ColumnTypeInt64 ) ,
handler . NewColumn ( QuotaNotificationColumnRepeat , handler . ColumnTypeBool ) ,
handler . NewColumn ( QuotaNotificationColumnLatestDuePeriodStart , handler . ColumnTypeTimestamp , handler . Nullable ( ) ) ,
handler . NewColumn ( QuotaNotificationColumnNextDueThreshold , handler . ColumnTypeInt64 , handler . Nullable ( ) ) ,
2023-09-15 14:58:45 +00:00
} ,
2023-10-19 10:19:10 +00:00
handler . NewPrimaryKey ( QuotaNotificationColumnInstanceID , QuotaNotificationColumnUnit , QuotaNotificationColumnID ) ,
2023-09-15 14:58:45 +00:00
quotaNotificationsTableSuffix ,
) ,
)
}
2023-10-19 10:19:10 +00:00
func ( q * quotaProjection ) Reducers ( ) [ ] handler . AggregateReducer {
2023-09-15 14:58:45 +00:00
return [ ] handler . AggregateReducer {
{
Aggregate : instance . AggregateType ,
2023-10-19 10:19:10 +00:00
EventReducers : [ ] handler . EventReducer {
2023-09-15 14:58:45 +00:00
{
Event : instance . InstanceRemovedEventType ,
Reduce : q . reduceInstanceRemoved ,
} ,
} ,
} ,
{
Aggregate : quota . AggregateType ,
2023-10-19 10:19:10 +00:00
EventReducers : [ ] handler . EventReducer {
2023-09-15 14:58:45 +00:00
{
Event : quota . AddedEventType ,
2023-09-22 09:37:16 +00:00
Reduce : q . reduceQuotaSet ,
} ,
{
Event : quota . SetEventType ,
Reduce : q . reduceQuotaSet ,
2023-09-15 14:58:45 +00:00
} ,
{
Event : quota . RemovedEventType ,
Reduce : q . reduceQuotaRemoved ,
} ,
{
Event : quota . NotificationDueEventType ,
Reduce : q . reduceQuotaNotificationDue ,
} ,
{
Event : quota . NotifiedEventType ,
Reduce : q . reduceQuotaNotified ,
} ,
} ,
} ,
}
}
func ( q * quotaProjection ) reduceQuotaNotified ( event eventstore . Event ) ( * handler . Statement , error ) {
2023-10-19 10:19:10 +00:00
return handler . NewNoOpStatement ( event ) , nil
2023-09-15 14:58:45 +00:00
}
2023-09-22 09:37:16 +00:00
func ( q * quotaProjection ) reduceQuotaSet ( event eventstore . Event ) ( * handler . Statement , error ) {
e , err := assertEvent [ * quota . SetEvent ] ( event )
2023-09-15 14:58:45 +00:00
if err != nil {
return nil , err
}
2023-10-19 10:19:10 +00:00
var statements [ ] func ( e eventstore . Event ) handler . Exec
2023-09-15 14:58:45 +00:00
2023-09-22 09:37:16 +00:00
// 1. Insert or update quota if the event has not only notification changes
quotaConflictColumns := [ ] handler . Column {
handler . NewCol ( QuotaColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCol ( QuotaColumnUnit , e . Unit ) ,
}
quotaUpdateCols := make ( [ ] handler . Column , 0 , 4 + 1 + len ( quotaConflictColumns ) )
if e . Limit != nil {
quotaUpdateCols = append ( quotaUpdateCols , handler . NewCol ( QuotaColumnLimit , * e . Limit ) )
}
if e . Amount != nil {
quotaUpdateCols = append ( quotaUpdateCols , handler . NewCol ( QuotaColumnAmount , * e . Amount ) )
}
if e . From != nil {
quotaUpdateCols = append ( quotaUpdateCols , handler . NewCol ( QuotaColumnFrom , * e . From ) )
}
if e . ResetInterval != nil {
quotaUpdateCols = append ( quotaUpdateCols , handler . NewCol ( QuotaColumnInterval , * e . ResetInterval ) )
}
if len ( quotaUpdateCols ) > 0 {
// TODO: Add the quota ID to the primary key in a migration?
quotaUpdateCols = append ( quotaUpdateCols , handler . NewCol ( QuotaColumnID , e . Aggregate ( ) . ID ) )
quotaUpdateCols = append ( quotaUpdateCols , quotaConflictColumns ... )
2023-10-19 10:19:10 +00:00
statements = append ( statements , handler . AddUpsertStatement ( quotaConflictColumns , quotaUpdateCols ) )
2023-09-22 09:37:16 +00:00
}
// 2. Delete existing notifications
if e . Notifications == nil {
2023-10-19 10:19:10 +00:00
return handler . NewMultiStatement ( e , statements ... ) , nil
2023-09-22 09:37:16 +00:00
}
2023-10-19 10:19:10 +00:00
statements = append ( statements , handler . AddDeleteStatement (
2023-09-22 09:37:16 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaNotificationColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCond ( QuotaNotificationColumnUnit , e . Unit ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaNotificationsTableSuffix ) ,
2023-09-22 09:37:16 +00:00
) )
notifications := * e . Notifications
for i := range notifications {
notification := notifications [ i ]
2023-10-19 10:19:10 +00:00
statements = append ( statements , handler . AddCreateStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Column {
handler . NewCol ( QuotaNotificationColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCol ( QuotaNotificationColumnUnit , e . Unit ) ,
handler . NewCol ( QuotaNotificationColumnID , notification . ID ) ,
handler . NewCol ( QuotaNotificationColumnCallURL , notification . CallURL ) ,
handler . NewCol ( QuotaNotificationColumnPercent , notification . Percent ) ,
handler . NewCol ( QuotaNotificationColumnRepeat , notification . Repeat ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaNotificationsTableSuffix ) ,
2023-09-22 09:37:16 +00:00
) )
2023-09-15 14:58:45 +00:00
}
2023-10-19 10:19:10 +00:00
return handler . NewMultiStatement ( e , statements ... ) , nil
2023-09-15 14:58:45 +00:00
}
func ( q * quotaProjection ) reduceQuotaNotificationDue ( event eventstore . Event ) ( * handler . Statement , error ) {
e , err := assertEvent [ * quota . NotificationDueEvent ] ( event )
if err != nil {
return nil , err
}
2023-10-19 10:19:10 +00:00
return handler . NewUpdateStatement ( e ,
2023-09-15 14:58:45 +00:00
[ ] handler . Column {
handler . NewCol ( QuotaNotificationColumnLatestDuePeriodStart , e . PeriodStart ) ,
handler . NewCol ( QuotaNotificationColumnNextDueThreshold , e . Threshold + 100 ) , // next due_threshold is always the reached + 100 => percent (e.g. 90) in the next bucket (e.g. 190)
} ,
[ ] handler . Condition {
handler . NewCond ( QuotaNotificationColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCond ( QuotaNotificationColumnUnit , e . Unit ) ,
handler . NewCond ( QuotaNotificationColumnID , e . ID ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaNotificationsTableSuffix ) ,
2023-09-15 14:58:45 +00:00
) , nil
}
func ( q * quotaProjection ) reduceQuotaRemoved ( event eventstore . Event ) ( * handler . Statement , error ) {
e , err := assertEvent [ * quota . RemovedEvent ] ( event )
if err != nil {
return nil , err
}
2023-10-19 10:19:10 +00:00
return handler . NewMultiStatement (
2023-09-15 14:58:45 +00:00
e ,
2023-10-19 10:19:10 +00:00
handler . AddDeleteStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaPeriodColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCond ( QuotaPeriodColumnUnit , e . Unit ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaPeriodsTableSuffix ) ,
2023-09-15 14:58:45 +00:00
) ,
2023-10-19 10:19:10 +00:00
handler . AddDeleteStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaNotificationColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCond ( QuotaNotificationColumnUnit , e . Unit ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaNotificationsTableSuffix ) ,
2023-09-15 14:58:45 +00:00
) ,
2023-10-19 10:19:10 +00:00
handler . AddDeleteStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
handler . NewCond ( QuotaColumnUnit , e . Unit ) ,
} ,
) ,
) , nil
}
func ( q * quotaProjection ) reduceInstanceRemoved ( event eventstore . Event ) ( * handler . Statement , error ) {
// we only assert the event to make sure it is the correct type
e , err := assertEvent [ * instance . InstanceRemovedEvent ] ( event )
if err != nil {
return nil , err
}
2023-10-19 10:19:10 +00:00
return handler . NewMultiStatement (
2023-09-15 14:58:45 +00:00
e ,
2023-10-19 10:19:10 +00:00
handler . AddDeleteStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaPeriodColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaPeriodsTableSuffix ) ,
2023-09-15 14:58:45 +00:00
) ,
2023-10-19 10:19:10 +00:00
handler . AddDeleteStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaNotificationColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
} ,
2023-10-19 10:19:10 +00:00
handler . WithTableSuffix ( quotaNotificationsTableSuffix ) ,
2023-09-15 14:58:45 +00:00
) ,
2023-10-19 10:19:10 +00:00
handler . AddDeleteStatement (
2023-09-15 14:58:45 +00:00
[ ] handler . Condition {
handler . NewCond ( QuotaColumnInstanceID , e . Aggregate ( ) . InstanceID ) ,
} ,
) ,
) , nil
}
func ( q * quotaProjection ) IncrementUsage ( ctx context . Context , unit quota . Unit , instanceID string , periodStart time . Time , count uint64 ) ( sum uint64 , err error ) {
if count == 0 {
return 0 , nil
}
err = q . client . DB . QueryRowContext (
ctx ,
incrementQuotaStatement ,
instanceID , unit , periodStart , count ,
) . Scan ( & sum )
if err != nil {
2023-09-22 09:37:16 +00:00
return 0 , zitadel_errors . ThrowInternalf ( err , "PROJ-SJL3h" , "incrementing usage for unit %d failed for at least one quota period" , unit )
2023-09-15 14:58:45 +00:00
}
return sum , err
}