-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstore_swift.go
122 lines (93 loc) · 2.61 KB
/
store_swift.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package fsdup
import (
"bytes"
"errors"
"fmt"
"github.com/ncw/swift"
"sync"
)
type swiftChunkStore struct {
connection *swift.Connection
container string
chunkMap sync.Map
}
func NewSwiftStore(connection *swift.Connection, container string) *swiftChunkStore {
return &swiftChunkStore{
connection: connection,
container: container,
chunkMap: sync.Map{},
}
}
func (idx *swiftChunkStore) Stat(checksum []byte) error {
if err := idx.openConnection(); err != nil {
return err
}
checksumStr := fmt.Sprintf("%x", checksum)
if _, ok := idx.chunkMap.Load(checksumStr); ok {
return nil
}
_, _, err := idx.connection.Object(idx.container, checksumStr)
if err == nil {
idx.chunkMap.Store(checksumStr, true)
}
return err
}
func (idx *swiftChunkStore) ReadAt(checksum []byte, buffer []byte, offset int64) (int, error) {
if err := idx.openConnection(); err != nil {
return 0, err
}
checksumStr := fmt.Sprintf("%x", checksum)
requestHeaders := make(swift.Headers)
requestHeaders["Range"] = fmt.Sprintf("bytes=%d-%d", offset, len(buffer)-1)
var responseBuffer bytes.Buffer
_, err := idx.connection.ObjectGet(idx.container, checksumStr, &responseBuffer, false, requestHeaders)
if err != nil {
return 0, err
}
if responseBuffer.Len() != len(buffer) {
return 0, errors.New(fmt.Sprintf("cannot read %d chunk bytes, response was %s bytes instead", len(buffer), responseBuffer.Len()))
}
copied := copy(buffer, responseBuffer.Bytes())
if copied != len(buffer) {
return 0, errors.New(fmt.Sprintf("cannot copy %d chunk bytes, only %s bytes copied instead", len(buffer), copied))
}
return len(buffer), nil
}
func (idx *swiftChunkStore) Write(checksum []byte, buffer []byte) error {
if err := idx.openConnection(); err != nil {
return err
}
if err := idx.Stat(checksum); err == nil {
return nil // Exists!
}
checksumStr := fmt.Sprintf("%x", checksum)
if _, ok := idx.chunkMap.Load(checksumStr); !ok {
if err := idx.connection.ObjectPutBytes(idx.container, checksumStr, buffer, "application/x-fsdup-chunk"); err != nil {
return err
}
idx.chunkMap.Store(checksumStr, true)
}
return nil
}
func (idx *swiftChunkStore) Remove(checksum []byte) error {
if err := idx.openConnection(); err != nil {
return err
}
checksumStr := fmt.Sprintf("%x", checksum)
err := idx.connection.ObjectDelete(idx.container, checksumStr)
if err != nil {
return err
}
idx.chunkMap.Delete(checksumStr)
return nil
}
func (idx *swiftChunkStore) openConnection() error {
if idx.connection.Authenticated() {
return nil
}
err := idx.connection.Authenticate()
if err != nil {
return err
}
return nil
}