even more refactor
This commit is contained in:
135
internal/loader/batch.go
Normal file
135
internal/loader/batch.go
Normal file
@@ -0,0 +1,135 @@
|
||||
package loader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
types "git.kapelle.org/niklas/s3browser/internal/types"
|
||||
"github.com/graph-gophers/dataloader"
|
||||
"github.com/minio/minio-go/v7"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// listObjectsBatch batch func for calling s3.ListObjects()
|
||||
func listObjectsBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
|
||||
log.Debug("listObjectsBatch: ", k.Keys())
|
||||
var results []*dataloader.Result
|
||||
|
||||
s3Client, ok := c.Value("s3Client").(*minio.Client)
|
||||
|
||||
if !ok {
|
||||
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
|
||||
}
|
||||
|
||||
for _, v := range k {
|
||||
id := v.Raw().(types.ID)
|
||||
results = append(results, &dataloader.Result{
|
||||
Data: listObjects(s3Client, id, false),
|
||||
Error: nil,
|
||||
})
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// listObjectsRecursiveBatch just like listObjectsBatch but with recursive set to true
|
||||
func listObjectsRecursiveBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
|
||||
log.Debug("listObjectsRecursiveBatch: ", k.Keys())
|
||||
var results []*dataloader.Result
|
||||
|
||||
s3Client, ok := c.Value("s3Client").(*minio.Client)
|
||||
|
||||
if !ok {
|
||||
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
|
||||
}
|
||||
|
||||
for _, v := range k {
|
||||
id := v.Raw().(types.ID)
|
||||
results = append(results, &dataloader.Result{
|
||||
Data: listObjects(s3Client, id, true),
|
||||
Error: nil,
|
||||
})
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// listObjects helper func for listObjectsBatch
|
||||
func listObjects(s3Client *minio.Client, id types.ID, recursive bool) []minio.ObjectInfo {
|
||||
log.Debug("S3 call 'ListObjects': ", id)
|
||||
objectCh := s3Client.ListObjects(context.Background(), id.Bucket, minio.ListObjectsOptions{
|
||||
Prefix: id.Key,
|
||||
Recursive: recursive,
|
||||
})
|
||||
|
||||
result := make([]minio.ObjectInfo, 0)
|
||||
for obj := range objectCh {
|
||||
result = append(result, obj)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func listBucketsBatch(c context.Context, k dataloader.Keys) []*dataloader.Result {
|
||||
log.Debug("listBucketsBatch")
|
||||
var results []*dataloader.Result
|
||||
|
||||
s3Client, ok := c.Value("s3Client").(*minio.Client)
|
||||
|
||||
if !ok {
|
||||
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
|
||||
}
|
||||
|
||||
buckets, err := s3Client.ListBuckets(c)
|
||||
|
||||
if err != nil {
|
||||
return handleLoaderError(k, err)
|
||||
}
|
||||
|
||||
result := &dataloader.Result{
|
||||
Data: buckets,
|
||||
Error: nil,
|
||||
}
|
||||
|
||||
for range k {
|
||||
results = append(results, result)
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
func statObjectBatch(ctx context.Context, k dataloader.Keys) []*dataloader.Result {
|
||||
log.Debug("statObjectBatch")
|
||||
|
||||
var results []*dataloader.Result
|
||||
s3Client, ok := ctx.Value("s3Client").(*minio.Client)
|
||||
|
||||
if !ok {
|
||||
return handleLoaderError(k, fmt.Errorf("Failed to get s3Client from context"))
|
||||
}
|
||||
|
||||
for _, v := range k {
|
||||
id := v.Raw().(types.ID)
|
||||
stat, err := s3Client.StatObject(ctx, id.Bucket, id.Key, minio.GetObjectOptions{})
|
||||
results = append(results, &dataloader.Result{
|
||||
Data: stat,
|
||||
Error: err,
|
||||
})
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
|
||||
// handleLoaderError helper func when the whole batch failed
|
||||
func handleLoaderError(k dataloader.Keys, err error) []*dataloader.Result {
|
||||
log.Error(err.Error())
|
||||
var results []*dataloader.Result
|
||||
for range k {
|
||||
results = append(results, &dataloader.Result{
|
||||
Data: nil,
|
||||
Error: err,
|
||||
})
|
||||
}
|
||||
|
||||
return results
|
||||
}
|
||||
162
internal/loader/loader.go
Normal file
162
internal/loader/loader.go
Normal file
@@ -0,0 +1,162 @@
|
||||
package loader
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"git.kapelle.org/niklas/s3browser/internal/cache"
|
||||
"git.kapelle.org/niklas/s3browser/internal/helper"
|
||||
types "git.kapelle.org/niklas/s3browser/internal/types"
|
||||
"github.com/graph-gophers/dataloader"
|
||||
"github.com/minio/minio-go/v7"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type Loader struct {
|
||||
listObjectsLoader *dataloader.Loader
|
||||
listObjectsRecursiveLoader *dataloader.Loader
|
||||
statObjectLoader *dataloader.Loader
|
||||
listBucketsLoader *dataloader.Loader
|
||||
}
|
||||
|
||||
func NewLoader(config types.AppConfig) *Loader {
|
||||
return &Loader{
|
||||
listObjectsLoader: dataloader.NewBatchedLoader(
|
||||
listObjectsBatch,
|
||||
dataloader.WithCache(cache.NewTTLCache(config.CacheTTL, config.CacheCleanup)),
|
||||
),
|
||||
listObjectsRecursiveLoader: dataloader.NewBatchedLoader(
|
||||
listObjectsRecursiveBatch,
|
||||
dataloader.WithCache(cache.NewTTLCache(config.CacheTTL, config.CacheCleanup)),
|
||||
),
|
||||
statObjectLoader: dataloader.NewBatchedLoader(
|
||||
statObjectBatch,
|
||||
dataloader.WithCache(cache.NewTTLCache(config.CacheTTL, config.CacheCleanup)),
|
||||
),
|
||||
listBucketsLoader: dataloader.NewBatchedLoader(
|
||||
listBucketsBatch,
|
||||
dataloader.WithCache(cache.NewTTLCache(config.CacheTTL, config.CacheCleanup)),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *Loader) GetFiles(ctx context.Context, path types.ID) ([]types.File, error) {
|
||||
thunk := l.listObjectsLoader.Load(ctx, path)
|
||||
objects, err := thunk()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var files []types.File
|
||||
|
||||
for _, obj := range objects.([]minio.ObjectInfo) {
|
||||
if obj.Err != nil {
|
||||
return nil, obj.Err
|
||||
} else if !strings.HasSuffix(obj.Key, "/") {
|
||||
files = append(files, *helper.ObjInfoToFile(obj, path.Bucket))
|
||||
}
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (l *Loader) GetFile(ctx context.Context, id types.ID) (*types.File, error) {
|
||||
thunk := l.statObjectLoader.Load(ctx, id)
|
||||
|
||||
result, err := thunk()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
objInfo, ok := result.(minio.ObjectInfo)
|
||||
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Failed to stats object")
|
||||
}
|
||||
|
||||
return helper.ObjInfoToFile(objInfo, id.Bucket), nil
|
||||
}
|
||||
|
||||
func (l *Loader) GetDirs(ctx context.Context, path types.ID) ([]types.Directory, error) {
|
||||
thunk := l.listObjectsLoader.Load(ctx, path)
|
||||
|
||||
result, err := thunk()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var dirs []types.Directory
|
||||
for _, obj := range result.([]minio.ObjectInfo) {
|
||||
if obj.Err != nil {
|
||||
return nil, obj.Err
|
||||
} else if strings.HasSuffix(obj.Key, "/") {
|
||||
resultID := types.ID{
|
||||
Bucket: path.Bucket,
|
||||
Key: obj.Key,
|
||||
}
|
||||
|
||||
resultID.Normalize()
|
||||
|
||||
dirs = append(dirs, types.Directory{
|
||||
ID: resultID,
|
||||
Name: filepath.Base(obj.Key),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return dirs, nil
|
||||
}
|
||||
|
||||
func (l *Loader) GetBuckets(ctx context.Context) ([]string, error) {
|
||||
thunk := l.listBucketsLoader.Load(ctx, dataloader.StringKey(""))
|
||||
|
||||
result, err := thunk()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bucketsInfo := result.([]minio.BucketInfo)
|
||||
var buckets []string
|
||||
for _, i := range bucketsInfo {
|
||||
buckets = append(buckets, i.Name)
|
||||
}
|
||||
|
||||
return buckets, nil
|
||||
}
|
||||
|
||||
func (l *Loader) GetFilesRecursive(ctx context.Context, path types.ID) ([]types.File, error) {
|
||||
thunk := l.listObjectsRecursiveLoader.Load(ctx, path)
|
||||
result, err := thunk()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objects := result.([]minio.ObjectInfo)
|
||||
|
||||
var files []types.File
|
||||
for _, obj := range objects {
|
||||
files = append(files, *helper.ObjInfoToFile(obj, path.Bucket))
|
||||
}
|
||||
|
||||
return files, nil
|
||||
}
|
||||
|
||||
func (l *Loader) InvalidateCacheForFile(ctx context.Context, id types.ID) {
|
||||
log.Debug("Clear cache for file:", id.String())
|
||||
parent := id.Parent()
|
||||
|
||||
l.listObjectsLoader.Clear(ctx, id)
|
||||
l.listObjectsRecursiveLoader.Clear(ctx, parent)
|
||||
}
|
||||
|
||||
func (l *Loader) InvalidateCacheForDir(ctx context.Context, path types.ID) {
|
||||
log.Debug("Clear cache for dir:", path.String())
|
||||
parent := helper.GetParentDir(path)
|
||||
|
||||
l.listBucketsLoader.Clear(ctx, path).Clear(ctx, parent)
|
||||
l.listObjectsRecursiveLoader.Clear(ctx, path).Clear(ctx, parent)
|
||||
}
|
||||
Reference in New Issue
Block a user