Skip to content

Commit cc6108a

Browse files
committed
Rework how large objects are segmented and properly obey SLO min size
This commit unifies the logic handling DLO and SLO segmentation, it also adds more test cases repeatedly writing and seeking large objects. The minimum chunk size is now exposed and can also be used with DLOs if a large number of potentially small objects is not desired (with the performance penalty of having to retrieve the old segment when writing over it). The minimum size for SLOs is also retrived from a /info call and will override chunkSize and minChunkSize if it's larger. The implementation of the ReadFrom method was dropped in the writer. The reason for this is that the old implementation was incorrect sometimes due to it using the number of written bytes to find out if it had already finished. If the last segment had the exact same size as the chunk size it would try writing one more segment, and this segment would be empty. One way to solve this would be adding a DELETE call if a zero sized segment was written, but this is ugly and wasteful. Another option would be to buffer the read data in memory to find out if we're done, but this is exactly what io.Copy() already do for us, so I could simply drop the ReadFrom implementation.
1 parent b15be6f commit cc6108a

File tree

5 files changed

+368
-377
lines changed

5 files changed

+368
-377
lines changed

dlo.go

Lines changed: 4 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,33 @@
11
package swift
22

33
import (
4-
"bytes"
5-
"crypto/md5"
6-
"io"
74
"os"
8-
"strconv"
95
)
106

117
// DynamicLargeObjectCreateFile represents an open static large object
128
type DynamicLargeObjectCreateFile struct {
13-
LargeObjectCreateFile
9+
largeObjectCreateFile
1410
}
1511

16-
// Check it satisfies the interfaces
17-
var (
18-
_ io.Writer = &DynamicLargeObjectCreateFile{}
19-
_ io.Seeker = &DynamicLargeObjectCreateFile{}
20-
_ io.Closer = &DynamicLargeObjectCreateFile{}
21-
_ io.ReaderFrom = &DynamicLargeObjectCreateFile{}
22-
)
23-
2412
// DynamicLargeObjectCreateFile creates a dynamic large object
2513
// returning an object which satisfies io.Writer, io.Seeker, io.Closer
2614
// and io.ReaderFrom. The flags are as passes to the
2715
// LargeObjectCreate method.
28-
func (c *Connection) DynamicLargeObjectCreateFile(opts *LargeObjectOpts) (*DynamicLargeObjectCreateFile, error) {
16+
func (c *Connection) DynamicLargeObjectCreateFile(opts *LargeObjectOpts) (LargeObjectFile, error) {
2917
lo, err := c.LargeObjectCreate(opts)
3018
if err != nil {
3119
return nil, err
3220
}
3321

3422
return &DynamicLargeObjectCreateFile{
35-
LargeObjectCreateFile: *lo,
23+
largeObjectCreateFile: *lo,
3624
}, nil
3725
}
3826

3927
// DynamicLargeObjectCreate creates or truncates an existing dynamic
4028
// large object returning a writeable object. This sets opts.Flags to
4129
// an appropriate value before calling DynamicLargeObjectCreateFile
42-
func (c *Connection) DynamicLargeObjectCreate(opts *LargeObjectOpts) (*DynamicLargeObjectCreateFile, error) {
30+
func (c *Connection) DynamicLargeObjectCreate(opts *LargeObjectOpts) (LargeObjectFile, error) {
4331
opts.Flags = os.O_TRUNC | os.O_CREATE
4432
return c.DynamicLargeObjectCreateFile(opts)
4533
}
@@ -84,142 +72,6 @@ func (c *Connection) createDLOManifest(container string, objectName string, pref
8472
return nil
8573
}
8674

87-
// Write satisfies the io.Writer interface
88-
func (file *DynamicLargeObjectCreateFile) Write(buf []byte) (int, error) {
89-
reader := bytes.NewReader(buf)
90-
n, err := file.ReadFrom(reader)
91-
return int(n), err
92-
}
93-
94-
// ReadFrom statisfies the io.ReaderFrom interface
95-
func (file *DynamicLargeObjectCreateFile) ReadFrom(reader io.Reader) (n int64, err error) {
96-
var (
97-
multi io.Reader
98-
paddingReader io.Reader
99-
cursor int64
100-
currentLength int64
101-
)
102-
103-
partNumber := 1
104-
chunkSize := int64(file.chunkSize)
105-
readers := []io.Reader{}
106-
hash := md5.New()
107-
108-
file.segments, err = file.conn.getAllDLOSegments(file.segmentContainer, file.prefix)
109-
if err != nil {
110-
return 0, err
111-
}
112-
113-
for _, segment := range file.segments {
114-
currentLength += segment.Bytes
115-
}
116-
117-
// First, we skip the existing segments that are not modified by this call
118-
for i := range file.segments {
119-
if file.filePos < cursor+file.segments[i].Bytes {
120-
break
121-
}
122-
cursor += file.segments[i].Bytes
123-
partNumber++
124-
hash.Write([]byte(file.segments[i].Hash))
125-
}
126-
127-
if file.filePos-cursor > 0 && min(currentLength, file.filePos)-cursor > 0 {
128-
// Offset is inside the current segment : we need to read the data from
129-
// the beginning of the segment to offset, for this we must ensure that
130-
// the manifest is already written.
131-
err = file.Flush()
132-
if err != nil {
133-
return 0, err
134-
}
135-
headers := make(Headers)
136-
headers["Range"] = "bytes=" + strconv.FormatInt(cursor, 10) + "-" + strconv.FormatInt(min(currentLength, file.filePos)-1, 10)
137-
currentSegment, _, err := file.conn.ObjectOpen(file.container, file.objectName, false, headers)
138-
if err != nil {
139-
return 0, err
140-
}
141-
defer currentSegment.Close()
142-
paddingReader = currentSegment
143-
readers = append(readers, io.LimitReader(paddingReader, min(currentLength, file.filePos)-cursor))
144-
}
145-
146-
if paddingReader != nil {
147-
readers = append(readers, io.LimitReader(paddingReader, file.filePos-cursor))
148-
}
149-
readers = append(readers, io.LimitReader(reader, chunkSize-(file.filePos-cursor)))
150-
multi = io.MultiReader(readers...)
151-
152-
writeSegment := func() (finished bool, bytesRead int64, err error) {
153-
segment := getSegment(file.prefix, partNumber)
154-
155-
currentSegment, err := file.conn.ObjectCreate(file.segmentContainer, segment, false, "", file.contentType, nil)
156-
if err != nil {
157-
return false, bytesRead, err
158-
}
159-
160-
n, err := io.Copy(currentSegment, multi)
161-
if err != nil {
162-
return false, bytesRead, err
163-
}
164-
165-
if n > 0 {
166-
defer currentSegment.Close()
167-
bytesRead += n - max(0, file.filePos-cursor)
168-
}
169-
170-
if n < chunkSize {
171-
// We wrote all the data
172-
end := currentLength
173-
if partNumber-1 < len(file.segments) {
174-
end = cursor + file.segments[partNumber-1].Bytes
175-
}
176-
if cursor+n < end {
177-
// Copy the end of the chunk
178-
headers := make(Headers)
179-
headers["Range"] = "bytes=" + strconv.FormatInt(cursor+n, 10) + "-" + strconv.FormatInt(end-1, 10)
180-
f, _, err := file.conn.ObjectOpen(file.container, file.objectName, false, headers)
181-
if err != nil {
182-
return false, bytesRead, err
183-
}
184-
185-
_, copyErr := io.Copy(currentSegment, f)
186-
187-
if err := f.Close(); err != nil {
188-
return false, bytesRead, err
189-
}
190-
191-
if copyErr != nil {
192-
return false, bytesRead, copyErr
193-
}
194-
}
195-
196-
return true, bytesRead, nil
197-
}
198-
199-
multi = io.LimitReader(reader, chunkSize)
200-
cursor += chunkSize
201-
partNumber++
202-
203-
return false, bytesRead, nil
204-
}
205-
206-
finished := false
207-
read := int64(0)
208-
bytesRead := int64(0)
209-
startPos := file.filePos
210-
for finished == false {
211-
finished, read, err = writeSegment()
212-
bytesRead += read
213-
file.filePos += read
214-
if err != nil {
215-
return bytesRead, err
216-
}
217-
}
218-
file.currentLength = max(startPos+bytesRead, currentLength)
219-
220-
return bytesRead, nil
221-
}
222-
22375
// Close satisfies the io.Closer interface
22476
func (file *DynamicLargeObjectCreateFile) Close() error {
22577
return file.Flush()

0 commit comments

Comments
 (0)