s3browser-backend/internal/dataloader.go

300 lines
7.1 KiB
Go

package s3browser
import (
"context"
"fmt"
"path/filepath"
"strings"
types "git.kapelle.org/niklas/s3browser/internal/types"
"github.com/graph-gophers/dataloader"
"github.com/minio/minio-go/v7"
log "github.com/sirupsen/logrus"
)
// listObjectsBatch batch func for calling s3.ListObjects()
func listObjectsBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
log.Debug("listObjectsBatch: ", k.Keys())
var results []*dataloader.Result
s3Client, ok := c.Value("s3Client").(*minio.Client)
if !ok {
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
}
for _, v := range k {
id := v.Raw().(types.ID)
results = append(results, &dataloader.Result{
Data: listObjects(s3Client, id, false),
Error: nil,
})
}
return results
}
// listObjectsRecursiveBatch just like listObjectsBatch but with recursive set to true
func listObjectsRecursiveBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
log.Debug("listObjectsRecursiveBatch: ", k.Keys())
var results []*dataloader.Result
s3Client, ok := c.Value("s3Client").(*minio.Client)
if !ok {
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
}
for _, v := range k {
id := v.Raw().(types.ID)
results = append(results, &dataloader.Result{
Data: listObjects(s3Client, id, true),
Error: nil,
})
}
return results
}
// listObjects helper func for listObjectsBatch
func listObjects(s3Client *minio.Client, id types.ID, recursive bool) []minio.ObjectInfo {
log.Debug("S3 call 'ListObjects': ", id)
objectCh := s3Client.ListObjects(context.Background(), id.Bucket, minio.ListObjectsOptions{
Prefix: id.Key,
Recursive: recursive,
})
result := make([]minio.ObjectInfo, 0)
for obj := range objectCh {
result = append(result, obj)
}
return result
}
// getFilesBatch batch func for getting all files in path. Uses "listObjects" dataloader
func getFilesBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
log.Debug("getFilesBatch: ", k.Keys())
var results []*dataloader.Result
loader, ok := c.Value("loader").(map[string]*dataloader.Loader)
if !ok {
return handleLoaderError(k, fmt.Errorf("Failed to get loader from context"))
}
for _, v := range k {
id := v.Raw().(types.ID)
files := make([]types.File, 0)
thunk := loader["listObjects"].Load(c, id)
objects, _ := thunk()
// TODO: handle thunk error
for _, obj := range objects.([]minio.ObjectInfo) {
if obj.Err != nil {
// TODO: how to handle?
} else if !strings.HasSuffix(obj.Key, "/") {
resultID := types.ID{
Bucket: id.Bucket,
Key: obj.Key,
}
resultID.Normalize()
files = append(files, types.File{
ID: resultID,
Name: filepath.Base(obj.Key),
Size: obj.Size,
ContentType: obj.ContentType,
ETag: obj.ETag,
LastModified: obj.LastModified,
})
}
}
results = append(results, &dataloader.Result{
Data: files,
Error: nil,
})
}
return results
}
// getFileBatch batch func for getting object info
func getFileBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
log.Debug("getFileBatch: ", k.Keys())
var results []*dataloader.Result
s3Client, ok := c.Value("s3Client").(*minio.Client)
if !ok {
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
}
for _, v := range k {
id := v.Raw().(types.ID)
log.Debug("S3 call 'StatObject': ", v.String())
obj, err := s3Client.StatObject(context.Background(), id.Bucket, id.Key, minio.StatObjectOptions{})
if err != nil {
results = append(results, &dataloader.Result{
Data: nil,
Error: err,
})
} else {
resultID := types.ID{
Bucket: id.Bucket,
Key: obj.Key,
}
resultID.Normalize()
results = append(results, &dataloader.Result{
Data: &types.File{
ID: resultID,
Size: obj.Size,
ContentType: obj.ContentType,
ETag: obj.ETag,
LastModified: obj.LastModified,
},
Error: nil,
})
}
}
return results
}
// getDirsBatch batch func for getting dirs in a path
func getDirsBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
log.Debug("getDirsBatch: ", k.Keys())
var results []*dataloader.Result
loader, ok := c.Value("loader").(map[string]*dataloader.Loader)
if !ok {
return handleLoaderError(k, fmt.Errorf("Failed to get loader from context"))
}
for _, v := range k {
id := v.Raw().(types.ID)
dirs := make([]types.Directory, 0)
thunk := loader["listObjects"].Load(c, id)
objects, _ := thunk()
// TODO: handle thunk error
for _, obj := range objects.([]minio.ObjectInfo) {
if obj.Err != nil {
// TODO: how to handle?
} else if strings.HasSuffix(obj.Key, "/") {
resultID := types.ID{
Bucket: id.Bucket,
Key: obj.Key,
}
resultID.Normalize()
dirs = append(dirs, types.Directory{
ID: resultID,
Name: filepath.Base(obj.Key),
})
}
}
results = append(results, &dataloader.Result{
Data: dirs,
Error: nil,
})
}
return results
}
// handleLoaderError helper func when the whole batch failed
func handleLoaderError(k dataloader.Keys, err error) []*dataloader.Result {
log.Error(err.Error())
var results []*dataloader.Result
for range k {
results = append(results, &dataloader.Result{
Data: nil,
Error: err,
})
}
return results
}
func listBucketsBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
log.Debug("listBucketsBatch")
var results []*dataloader.Result
s3Client, ok := c.Value("s3Client").(*minio.Client)
if !ok {
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
}
buckets, err := s3Client.ListBuckets(c)
if err != nil {
return handleLoaderError(k, err)
}
var bucketStrings []string
for _, v := range buckets {
bucketStrings = append(bucketStrings, v.Name)
}
result := &dataloader.Result{
Data: bucketStrings,
Error: nil,
}
for range k {
results = append(results, result)
}
return results
}
// createDataloader create all dataloaders and return a map of them plus a cache for objects
func createDataloader(config types.AppConfig) map[string]*dataloader.Loader {
loaderMap := make(map[string]*dataloader.Loader, 0)
loaderMap["getFiles"] = dataloader.NewBatchedLoader(
getFilesBatch,
dataloader.WithCache(newCache(config.CacheTTL, config.CacheCleanup)),
)
loaderMap["getFile"] = dataloader.NewBatchedLoader(
getFileBatch,
dataloader.WithCache(newCache(config.CacheTTL, config.CacheCleanup)),
)
loaderMap["listObjects"] = dataloader.NewBatchedLoader(
listObjectsBatch,
dataloader.WithCache(newCache(config.CacheTTL, config.CacheCleanup)),
)
loaderMap["listObjectsRecursive"] = dataloader.NewBatchedLoader(
listObjectsRecursiveBatch,
dataloader.WithCache(newCache(config.CacheTTL, config.CacheCleanup)),
)
loaderMap["getDirs"] = dataloader.NewBatchedLoader(
getDirsBatch,
dataloader.WithCache(newCache(config.CacheTTL, config.CacheCleanup)),
)
loaderMap["listBuckets"] = dataloader.NewBatchedLoader(
listBucketsBatch,
dataloader.WithCache(newCache(config.CacheTTL, config.CacheCleanup)),
)
return loaderMap
}