zitadel/internal/static/s3/minio.go
Florian Forster fa9f581d56
chore(v2): move to new org (#3499)
* chore: move to new org

* logging

* fix: org rename caos -> zitadel

Co-authored-by: adlerhurst <silvan.reusser@gmail.com>
2022-04-26 23:01:45 +00:00

183 lines
6.3 KiB
Go

package s3
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"github.com/minio/minio-go/v7"
"github.com/zitadel/logging"
"golang.org/x/sync/errgroup"
"github.com/zitadel/zitadel/internal/domain"
caos_errs "github.com/zitadel/zitadel/internal/errors"
"github.com/zitadel/zitadel/internal/static"
)
var _ static.Storage = (*Minio)(nil)
type Minio struct {
Client *minio.Client
Location string
BucketPrefix string
MultiDelete bool
}
func (m *Minio) PutObject(ctx context.Context, instanceID, location, resourceOwner, name, contentType string, objectType static.ObjectType, object io.Reader, objectSize int64) (*static.Asset, error) {
err := m.createBucket(ctx, instanceID, location)
if err != nil && !caos_errs.IsErrorAlreadyExists(err) {
return nil, err
}
bucketName := m.prefixBucketName(instanceID)
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
info, err := m.Client.PutObject(ctx, bucketName, objectName, object, objectSize, minio.PutObjectOptions{ContentType: contentType})
if err != nil {
return nil, caos_errs.ThrowInternal(err, "MINIO-590sw", "Errors.Assets.Object.PutFailed")
}
return &static.Asset{
InstanceID: info.Bucket,
ResourceOwner: resourceOwner,
Name: info.Key,
Hash: info.ETag,
Size: info.Size,
LastModified: info.LastModified,
Location: info.Location,
ContentType: contentType,
}, nil
}
func (m *Minio) GetObject(ctx context.Context, instanceID, resourceOwner, name string) ([]byte, func() (*static.Asset, error), error) {
bucketName := m.prefixBucketName(instanceID)
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
object, err := m.Client.GetObject(ctx, bucketName, objectName, minio.GetObjectOptions{})
if err != nil {
return nil, nil, caos_errs.ThrowInternal(err, "MINIO-VGDgv", "Errors.Assets.Object.GetFailed")
}
info := func() (*static.Asset, error) {
info, err := object.Stat()
if err != nil {
return nil, caos_errs.ThrowInternal(err, "MINIO-F96xF", "Errors.Assets.Object.GetFailed")
}
return m.objectToAssetInfo(instanceID, resourceOwner, info), nil
}
asset, err := io.ReadAll(object)
if err != nil {
return nil, nil, caos_errs.ThrowInternal(err, "MINIO-SFef1", "Errors.Assets.Object.GetFailed")
}
return asset, info, nil
}
func (m *Minio) GetObjectInfo(ctx context.Context, instanceID, resourceOwner, name string) (*static.Asset, error) {
bucketName := m.prefixBucketName(instanceID)
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
objectInfo, err := m.Client.StatObject(ctx, bucketName, objectName, minio.StatObjectOptions{})
if err != nil {
if errResp := minio.ToErrorResponse(err); errResp.StatusCode == http.StatusNotFound {
return nil, caos_errs.ThrowNotFound(err, "MINIO-Gdfh4", "Errors.Assets.Object.GetFailed")
}
return nil, caos_errs.ThrowInternal(err, "MINIO-1vySX", "Errors.Assets.Object.GetFailed")
}
return m.objectToAssetInfo(instanceID, resourceOwner, objectInfo), nil
}
func (m *Minio) RemoveObject(ctx context.Context, instanceID, resourceOwner, name string) error {
bucketName := m.prefixBucketName(instanceID)
objectName := fmt.Sprintf("%s/%s", resourceOwner, name)
err := m.Client.RemoveObject(ctx, bucketName, objectName, minio.RemoveObjectOptions{})
if err != nil {
return caos_errs.ThrowInternal(err, "MINIO-x85RT", "Errors.Assets.Object.RemoveFailed")
}
return nil
}
func (m *Minio) RemoveObjects(ctx context.Context, instanceID, resourceOwner string, objectType static.ObjectType) error {
bucketName := m.prefixBucketName(instanceID)
objectsCh := make(chan minio.ObjectInfo)
g := new(errgroup.Group)
var path string
switch objectType {
case static.ObjectTypeStyling:
path = domain.LabelPolicyPrefix + "/"
default:
return nil
}
g.Go(func() error {
defer close(objectsCh)
objects, cancel := m.listObjects(ctx, bucketName, resourceOwner, true)
for object := range objects {
if err := object.Err; err != nil {
cancel()
if errResp := minio.ToErrorResponse(err); errResp.StatusCode == http.StatusNotFound {
logging.WithFields("bucketName", bucketName, "path", path).Warn("list objects for remove failed with not found")
continue
}
return caos_errs.ThrowInternal(object.Err, "MINIO-WQF32", "Errors.Assets.Object.ListFailed")
}
objectsCh <- object
}
return nil
})
if m.MultiDelete {
for objError := range m.Client.RemoveObjects(ctx, bucketName, objectsCh, minio.RemoveObjectsOptions{GovernanceBypass: true}) {
return caos_errs.ThrowInternal(objError.Err, "MINIO-Sfdgr", "Errors.Assets.Object.RemoveFailed")
}
return g.Wait()
}
for objectInfo := range objectsCh {
if err := m.Client.RemoveObject(ctx, bucketName, objectInfo.Key, minio.RemoveObjectOptions{GovernanceBypass: true}); err != nil {
return caos_errs.ThrowInternal(err, "MINIO-GVgew", "Errors.Assets.Object.RemoveFailed")
}
}
return g.Wait()
}
func (m *Minio) createBucket(ctx context.Context, name, location string) error {
if location == "" {
location = m.Location
}
name = m.prefixBucketName(name)
exists, err := m.Client.BucketExists(ctx, name)
if err != nil {
logging.WithFields("bucketname", name).WithError(err).Error("cannot check if bucket exists")
return caos_errs.ThrowInternal(err, "MINIO-1b8fs", "Errors.Assets.Bucket.Internal")
}
if exists {
return caos_errs.ThrowAlreadyExists(nil, "MINIO-9n3MK", "Errors.Assets.Bucket.AlreadyExists")
}
err = m.Client.MakeBucket(ctx, name, minio.MakeBucketOptions{Region: location})
if err != nil {
return caos_errs.ThrowInternal(err, "MINIO-4m90d", "Errors.Assets.Bucket.CreateFailed")
}
return nil
}
func (m *Minio) listObjects(ctx context.Context, bucketName, prefix string, recursive bool) (<-chan minio.ObjectInfo, context.CancelFunc) {
ctxCancel, cancel := context.WithCancel(ctx)
return m.Client.ListObjects(ctxCancel, bucketName, minio.ListObjectsOptions{
Prefix: prefix,
Recursive: recursive,
}), cancel
}
func (m *Minio) objectToAssetInfo(bucketName string, resourceOwner string, object minio.ObjectInfo) *static.Asset {
return &static.Asset{
InstanceID: bucketName,
ResourceOwner: resourceOwner,
Name: object.Key,
Hash: object.ETag,
Size: object.Size,
LastModified: object.LastModified,
ContentType: object.ContentType,
}
}
func (m *Minio) prefixBucketName(name string) string {
return strings.ToLower(m.BucketPrefix + "-" + name)
}