mirror of
https://github.com/zitadel/zitadel.git
synced 2025-08-11 19:07:30 +00:00
feat: store assets in database (#3290)
* feat: use database as asset storage * being only uploading assets if allowed * tests * fixes * cleanup after merge * renaming * various fixes * fix: change to repository event types and removed unused code * feat: set default features * error handling * error handling and naming * fix tests * fix tests * fix merge * rename
This commit is contained in:
@@ -1,9 +1,13 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
|
||||
"github.com/caos/zitadel/internal/errors"
|
||||
caos_errs "github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/static"
|
||||
)
|
||||
@@ -34,3 +38,15 @@ func (c *Config) NewStorage() (static.Storage, error) {
|
||||
MultiDelete: c.MultiDelete,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func NewStorage(_ *sql.DB, rawConfig map[string]interface{}) (static.Storage, error) {
|
||||
configData, err := json.Marshal(rawConfig)
|
||||
if err != nil {
|
||||
return nil, errors.ThrowInternal(err, "MINIO-Ef2f2", "could not map config")
|
||||
}
|
||||
c := new(Config)
|
||||
if err := json.Unmarshal(configData, c); err != nil {
|
||||
return nil, errors.ThrowInternal(err, "MINIO-GB4nw", "could not map config")
|
||||
}
|
||||
return c.NewStorage()
|
||||
}
|
||||
|
@@ -2,11 +2,10 @@ package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/caos/logging"
|
||||
"github.com/minio/minio-go/v7"
|
||||
@@ -14,8 +13,11 @@ import (
|
||||
|
||||
"github.com/caos/zitadel/internal/domain"
|
||||
caos_errs "github.com/caos/zitadel/internal/errors"
|
||||
"github.com/caos/zitadel/internal/static"
|
||||
)
|
||||
|
||||
var _ static.Storage = (*Minio)(nil)
|
||||
|
||||
type Minio struct {
|
||||
Client *minio.Client
|
||||
Location string
|
||||
@@ -23,129 +25,66 @@ type Minio struct {
|
||||
MultiDelete bool
|
||||
}
|
||||
|
||||
func (m *Minio) CreateBucket(ctx context.Context, name, location string) error {
|
||||
if location == "" {
|
||||
location = m.Location
|
||||
func (m *Minio) PutObject(ctx context.Context, instanceID, location, resourceOwner, name, contentType string, objectType static.ObjectType, object io.Reader, objectSize int64) (*static.Asset, error) {
|
||||
err := m.createBucket(ctx, instanceID, location)
|
||||
if err != nil && !caos_errs.IsErrorAlreadyExists(err) {
|
||||
return nil, err
|
||||
}
|
||||
name = m.prefixBucketName(name)
|
||||
exists, err := m.Client.BucketExists(ctx, name)
|
||||
if err != nil {
|
||||
logging.LogWithFields("MINIO-ADvf3", "bucketname", name).WithError(err).Error("cannot check if bucket exists")
|
||||
return caos_errs.ThrowInternal(err, "MINIO-1b8fs", "Errors.Assets.Bucket.Internal")
|
||||
}
|
||||
if exists {
|
||||
return caos_errs.ThrowAlreadyExists(nil, "MINIO-9n3MK", "Errors.Assets.Bucket.AlreadyExists")
|
||||
}
|
||||
err = m.Client.MakeBucket(ctx, name, minio.MakeBucketOptions{Region: location})
|
||||
if err != nil {
|
||||
return caos_errs.ThrowInternal(err, "MINIO-4m90d", "Errors.Assets.Bucket.CreateFailed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Minio) ListBuckets(ctx context.Context) ([]*domain.BucketInfo, error) {
|
||||
infos, err := m.Client.ListBuckets(ctx)
|
||||
if err != nil {
|
||||
return nil, caos_errs.ThrowInternal(err, "MINIO-390OP", "Errors.Assets.Bucket.ListFailed")
|
||||
}
|
||||
buckets := make([]*domain.BucketInfo, len(infos))
|
||||
for i, info := range infos {
|
||||
buckets[i] = &domain.BucketInfo{
|
||||
Name: info.Name,
|
||||
CreationDate: info.CreationDate,
|
||||
}
|
||||
}
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
func (m *Minio) RemoveBucket(ctx context.Context, name string) error {
|
||||
name = m.prefixBucketName(name)
|
||||
err := m.Client.RemoveBucket(ctx, name)
|
||||
if err != nil {
|
||||
return caos_errs.ThrowInternal(err, "MINIO-338Hs", "Errors.Assets.Bucket.RemoveFailed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Minio) PutObject(ctx context.Context, bucketName, objectName, contentType string, object io.Reader, objectSize int64, createBucketIfNotExisting bool) (*domain.AssetInfo, error) {
|
||||
if createBucketIfNotExisting {
|
||||
err := m.CreateBucket(ctx, bucketName, "")
|
||||
if err != nil && !caos_errs.IsErrorAlreadyExists(err) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
bucketName := m.prefixBucketName(instanceID)
|
||||
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
|
||||
info, err := m.Client.PutObject(ctx, bucketName, objectName, object, objectSize, minio.PutObjectOptions{ContentType: contentType})
|
||||
if err != nil {
|
||||
return nil, caos_errs.ThrowInternal(err, "MINIO-590sw", "Errors.Assets.Object.PutFailed")
|
||||
}
|
||||
return &domain.AssetInfo{
|
||||
Bucket: info.Bucket,
|
||||
Key: info.Key,
|
||||
ETag: info.ETag,
|
||||
Size: info.Size,
|
||||
LastModified: info.LastModified,
|
||||
Location: info.Location,
|
||||
VersionID: info.VersionID,
|
||||
return &static.Asset{
|
||||
InstanceID: info.Bucket,
|
||||
ResourceOwner: resourceOwner,
|
||||
Name: info.Key,
|
||||
Hash: info.ETag,
|
||||
Size: info.Size,
|
||||
LastModified: info.LastModified,
|
||||
Location: info.Location,
|
||||
ContentType: contentType,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *Minio) GetObjectInfo(ctx context.Context, bucketName, objectName string) (*domain.AssetInfo, error) {
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
objectinfo, err := m.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
|
||||
func (m *Minio) GetObject(ctx context.Context, instanceID, resourceOwner, name string) ([]byte, func() (*static.Asset, error), error) {
|
||||
bucketName := m.prefixBucketName(instanceID)
|
||||
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
|
||||
object, err := m.Client.GetObject(ctx, bucketName, objectName, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, caos_errs.ThrowInternal(err, "MINIO-VGDgv", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
info := func() (*static.Asset, error) {
|
||||
info, err := object.Stat()
|
||||
if err != nil {
|
||||
return nil, caos_errs.ThrowInternal(err, "MINIO-F96xF", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
return m.objectToAssetInfo(instanceID, resourceOwner, info), nil
|
||||
}
|
||||
asset, err := io.ReadAll(object)
|
||||
if err != nil {
|
||||
return nil, nil, caos_errs.ThrowInternal(err, "MINIO-SFef1", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
return asset, info, nil
|
||||
}
|
||||
|
||||
func (m *Minio) GetObjectInfo(ctx context.Context, instanceID, resourceOwner, name string) (*static.Asset, error) {
|
||||
bucketName := m.prefixBucketName(instanceID)
|
||||
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
|
||||
objectInfo, err := m.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
if errResp := minio.ToErrorResponse(err); errResp.StatusCode == http.StatusNotFound {
|
||||
return nil, caos_errs.ThrowNotFound(err, "MINIO-Gdfh4", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
return nil, caos_errs.ThrowInternal(err, "MINIO-1vySX", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
return m.objectToAssetInfo(bucketName, objectinfo), nil
|
||||
return m.objectToAssetInfo(instanceID, resourceOwner, objectInfo), nil
|
||||
}
|
||||
|
||||
func (m *Minio) GetObject(ctx context.Context, bucketName, objectName string) (io.Reader, func() (*domain.AssetInfo, error), error) {
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
object, err := m.Client.GetObject(ctx, bucketName, objectName, minio.GetObjectOptions{})
|
||||
if err != nil {
|
||||
return nil, nil, caos_errs.ThrowInternal(err, "MINIO-VGDgv", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
info := func() (*domain.AssetInfo, error) {
|
||||
info, err := object.Stat()
|
||||
if err != nil {
|
||||
return nil, caos_errs.ThrowInternal(err, "MINIO-F96xF", "Errors.Assets.Object.GetFailed")
|
||||
}
|
||||
return m.objectToAssetInfo(bucketName, info), nil
|
||||
}
|
||||
return object, info, nil
|
||||
}
|
||||
|
||||
func (m *Minio) GetObjectPresignedURL(ctx context.Context, bucketName, objectName string, expiration time.Duration) (*url.URL, error) {
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
reqParams := make(url.Values)
|
||||
presignedURL, err := m.Client.PresignedGetObject(ctx, bucketName, objectName, expiration, reqParams)
|
||||
if err != nil {
|
||||
return nil, caos_errs.ThrowInternal(err, "MINIO-19Mp0", "Errors.Assets.Object.PresignedTokenFailed")
|
||||
}
|
||||
return presignedURL, nil
|
||||
}
|
||||
|
||||
func (m *Minio) ListObjectInfos(ctx context.Context, bucketName, prefix string, recursive bool) ([]*domain.AssetInfo, error) {
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
assetInfos := make([]*domain.AssetInfo, 0)
|
||||
|
||||
objects, cancel := m.listObjects(ctx, bucketName, prefix, recursive)
|
||||
defer cancel()
|
||||
for object := range objects {
|
||||
if object.Err != nil {
|
||||
logging.LogWithFields("MINIO-wC8sd", "bucket-name", bucketName, "prefix", prefix).WithError(object.Err).Debug("unable to get object")
|
||||
return nil, caos_errs.ThrowInternal(object.Err, "MINIO-1m09S", "Errors.Assets.Object.ListFailed")
|
||||
}
|
||||
assetInfos = append(assetInfos, m.objectToAssetInfo(bucketName, object))
|
||||
}
|
||||
return assetInfos, nil
|
||||
}
|
||||
|
||||
func (m *Minio) RemoveObject(ctx context.Context, bucketName, objectName string) error {
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
func (m *Minio) RemoveObject(ctx context.Context, instanceID, resourceOwner, name string) error {
|
||||
bucketName := m.prefixBucketName(instanceID)
|
||||
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
|
||||
err := m.Client.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
|
||||
if err != nil {
|
||||
return caos_errs.ThrowInternal(err, "MINIO-x85RT", "Errors.Assets.Object.RemoveFailed")
|
||||
@@ -153,19 +92,27 @@ func (m *Minio) RemoveObject(ctx context.Context, bucketName, objectName string)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Minio) RemoveObjects(ctx context.Context, bucketName, path string, recursive bool) error {
|
||||
bucketName = m.prefixBucketName(bucketName)
|
||||
func (m *Minio) RemoveObjects(ctx context.Context, instanceID, resourceOwner string, objectType static.ObjectType) error {
|
||||
bucketName := m.prefixBucketName(instanceID)
|
||||
objectsCh := make(chan minio.ObjectInfo)
|
||||
g := new(errgroup.Group)
|
||||
|
||||
var path string
|
||||
switch objectType {
|
||||
case static.ObjectTypeStyling:
|
||||
path = domain.LabelPolicyPrefix + "/"
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
defer close(objectsCh)
|
||||
objects, cancel := m.listObjects(ctx, bucketName, path, recursive)
|
||||
objects, cancel := m.listObjects(ctx, bucketName, resourceOwner, true)
|
||||
for object := range objects {
|
||||
if err := object.Err; err != nil {
|
||||
cancel()
|
||||
if errResp := minio.ToErrorResponse(err); errResp.StatusCode == http.StatusNotFound {
|
||||
logging.LogWithFields("MINIO-ss8va", "bucketName", bucketName, "path", path).Warn("list objects for remove failed with not found")
|
||||
logging.WithFields("bucketName", bucketName, "path", path).Warn("list objects for remove failed with not found")
|
||||
continue
|
||||
}
|
||||
return caos_errs.ThrowInternal(object.Err, "MINIO-WQF32", "Errors.Assets.Object.ListFailed")
|
||||
@@ -189,6 +136,26 @@ func (m *Minio) RemoveObjects(ctx context.Context, bucketName, path string, recu
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func (m *Minio) createBucket(ctx context.Context, name, location string) error {
|
||||
if location == "" {
|
||||
location = m.Location
|
||||
}
|
||||
name = m.prefixBucketName(name)
|
||||
exists, err := m.Client.BucketExists(ctx, name)
|
||||
if err != nil {
|
||||
logging.WithFields("bucketname", name).WithError(err).Error("cannot check if bucket exists")
|
||||
return caos_errs.ThrowInternal(err, "MINIO-1b8fs", "Errors.Assets.Bucket.Internal")
|
||||
}
|
||||
if exists {
|
||||
return caos_errs.ThrowAlreadyExists(nil, "MINIO-9n3MK", "Errors.Assets.Bucket.AlreadyExists")
|
||||
}
|
||||
err = m.Client.MakeBucket(ctx, name, minio.MakeBucketOptions{Region: location})
|
||||
if err != nil {
|
||||
return caos_errs.ThrowInternal(err, "MINIO-4m90d", "Errors.Assets.Bucket.CreateFailed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Minio) listObjects(ctx context.Context, bucketName, prefix string, recursive bool) (<-chan minio.ObjectInfo, context.CancelFunc) {
|
||||
ctxCancel, cancel := context.WithCancel(ctx)
|
||||
|
||||
@@ -198,17 +165,15 @@ func (m *Minio) listObjects(ctx context.Context, bucketName, prefix string, recu
|
||||
}), cancel
|
||||
}
|
||||
|
||||
func (m *Minio) objectToAssetInfo(bucketName string, object minio.ObjectInfo) *domain.AssetInfo {
|
||||
return &domain.AssetInfo{
|
||||
Bucket: bucketName,
|
||||
Key: object.Key,
|
||||
ETag: object.ETag,
|
||||
Size: object.Size,
|
||||
LastModified: object.LastModified,
|
||||
VersionID: object.VersionID,
|
||||
Expiration: object.Expires,
|
||||
ContentType: object.ContentType,
|
||||
AutheticatedURL: m.Client.EndpointURL().String() + "/" + bucketName + "/" + object.Key,
|
||||
func (m *Minio) objectToAssetInfo(bucketName string, resourceOwner string, object minio.ObjectInfo) *static.Asset {
|
||||
return &static.Asset{
|
||||
InstanceID: bucketName,
|
||||
ResourceOwner: resourceOwner,
|
||||
Name: object.Key,
|
||||
Hash: object.ETag,
|
||||
Size: object.Size,
|
||||
LastModified: object.LastModified,
|
||||
ContentType: object.ContentType,
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user