Skip to content

Commit 4eedfbd

Browse files
committed
S3: GlobalPrefix and Recursive
1 parent 8b71881 commit 4eedfbd

File tree

2 files changed

+91
-4
lines changed

2 files changed

+91
-4
lines changed

backends/s3/s3.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ type Options struct {
5151
// CreateBucket tells us to try to create the bucket
5252
CreateBucket bool `yaml:"create_bucket"`
5353

54+
// GlobalPrefix is a prefix applied to all operations, allowing work within a prefix
55+
// seamlessly
56+
GlobalPrefix string `yaml:"global_prefix"`
57+
58+
// Recursive can be enabled to make List operations recurse into nested prefixes
59+
Recursive bool `yaml:"recursive"`
60+
5461
// EndpointURL can be set to something like "http://localhost:9000" when using Minio
5562
// or "https://s3.amazonaws.com" for AWS S3.
5663
EndpointURL string `yaml:"endpoint_url"`
@@ -110,6 +117,9 @@ type Backend struct {
110117
}
111118

112119
func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
120+
// Prepend global prefix
121+
prefix = b.prependGlobalPrefix(prefix)
122+
113123
if !b.opt.UseUpdateMarker {
114124
return b.doList(ctx, prefix)
115125
}
@@ -150,9 +160,12 @@ func (b *Backend) List(ctx context.Context, prefix string) (simpleblob.BlobList,
150160
func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobList, error) {
151161
var blobs simpleblob.BlobList
152162

163+
// Runes to strip from blob names for GlobalPrefix
164+
gpEndRune := len(b.opt.GlobalPrefix)
165+
153166
objCh := b.client.ListObjects(ctx, b.opt.Bucket, minio.ListObjectsOptions{
154167
Prefix: prefix,
155-
Recursive: false,
168+
Recursive: b.opt.Recursive,
156169
})
157170
for obj := range objCh {
158171
// Handle error returned by MinIO client
@@ -166,7 +179,14 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
166179
if obj.Key == UpdateMarkerFilename {
167180
continue
168181
}
169-
blobs = append(blobs, simpleblob.Blob{Name: obj.Key, Size: obj.Size})
182+
183+
// Strip global prefix from blob
184+
blobName := obj.Key
185+
if gpEndRune > 0 {
186+
blobName = blobName[gpEndRune:]
187+
}
188+
189+
blobs = append(blobs, simpleblob.Blob{Name: blobName, Size: obj.Size})
170190
}
171191

172192
// Minio appears to return them sorted, but maybe not all implementations
@@ -179,6 +199,9 @@ func (b *Backend) doList(ctx context.Context, prefix string) (simpleblob.BlobLis
179199
// Load retrieves the content of the object identified by name from S3 Bucket
180200
// configured in b.
181201
func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
202+
// Prepend global prefix
203+
name = b.prependGlobalPrefix(name)
204+
182205
metricCalls.WithLabelValues("load").Inc()
183206
metricLastCallTimestamp.WithLabelValues("load").SetToCurrentTime()
184207

@@ -199,6 +222,9 @@ func (b *Backend) Load(ctx context.Context, name string) ([]byte, error) {
199222
// Store sets the content of the object identified by name to the content
200223
// of data, in the S3 Bucket configured in b.
201224
func (b *Backend) Store(ctx context.Context, name string, data []byte) error {
225+
// Prepend global prefix
226+
name = b.prependGlobalPrefix(name)
227+
202228
info, err := b.doStore(ctx, name, data)
203229
if err != nil {
204230
return err
@@ -222,6 +248,9 @@ func (b *Backend) doStore(ctx context.Context, name string, data []byte) (minio.
222248
// Delete removes the object identified by name from the S3 Bucket
223249
// configured in b.
224250
func (b *Backend) Delete(ctx context.Context, name string) error {
251+
// Prepend global prefix
252+
name = b.prependGlobalPrefix(name)
253+
225254
if err := b.doDelete(ctx, name); err != nil {
226255
return err
227256
}
@@ -370,6 +399,12 @@ func convertMinioError(err error) error {
370399
return nil
371400
}
372401

402+
// prependGlobalPrefix prepends the GlobalPrefix to the name/prefix
403+
// passed as input
404+
func (b *Backend) prependGlobalPrefix(name string) string {
405+
return b.opt.GlobalPrefix + name
406+
}
407+
373408
func init() {
374409
simpleblob.RegisterBackend("s3", func(ctx context.Context, p simpleblob.InitParams) (simpleblob.Interface, error) {
375410
var opt Options

backends/s3/s3_test.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ func getBackend(ctx context.Context, t *testing.T) (b *Backend) {
5454
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
5555
defer cancel()
5656

57-
blobs, err := b.doList(ctx, "")
57+
blobs, err := b.List(ctx, "")
5858
if err != nil {
5959
t.Logf("Blobs list error: %s", err)
6060
return
6161
}
6262
for _, blob := range blobs {
63-
err := b.client.RemoveObject(ctx, b.opt.Bucket, blob.Name, minio.RemoveObjectOptions{})
63+
err := b.Delete(ctx, blob.Name)
6464
if err != nil {
6565
t.Logf("Object delete error: %s", err)
6666
}
@@ -101,3 +101,55 @@ func TestBackend_marker(t *testing.T) {
101101
assert.NoError(t, err)
102102
assert.EqualValues(t, b.lastMarker, markerFileContent)
103103
}
104+
105+
func TestBackend_globalprefix(t *testing.T) {
106+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
107+
defer cancel()
108+
109+
b := getBackend(ctx, t)
110+
b.opt.GlobalPrefix = "v5/"
111+
112+
tester.DoBackendTests(t, b)
113+
assert.Len(t, b.lastMarker, 0)
114+
}
115+
116+
func TestBackend_recursive(t *testing.T) {
117+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
118+
defer cancel()
119+
120+
b := getBackend(ctx, t)
121+
122+
// Starts empty
123+
ls, err := b.List(ctx, "")
124+
assert.NoError(t, err)
125+
assert.Len(t, ls, 0)
126+
127+
// Add items
128+
err = b.Store(ctx, "bar-1", []byte("bar1"))
129+
assert.NoError(t, err)
130+
err = b.Store(ctx, "bar-2", []byte("bar2"))
131+
assert.NoError(t, err)
132+
err = b.Store(ctx, "foo/bar-3", []byte("bar3"))
133+
assert.NoError(t, err)
134+
135+
// List all - no recursion (default)
136+
ls, err = b.List(ctx, "")
137+
assert.NoError(t, err)
138+
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/"})
139+
140+
// List all - recursive enabled
141+
b.opt.Recursive = true
142+
143+
ls, err = b.List(ctx, "")
144+
assert.NoError(t, err)
145+
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/bar-3"})
146+
147+
// List all - recursive disabled
148+
b.opt.Recursive = false
149+
150+
ls, err = b.List(ctx, "")
151+
assert.NoError(t, err)
152+
assert.Equal(t, ls.Names(), []string{"bar-1", "bar-2", "foo/"})
153+
154+
assert.Len(t, b.lastMarker, 0)
155+
}

0 commit comments

Comments
 (0)