-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmeta_store_mysql.go
114 lines (96 loc) · 2.35 KB
/
meta_store_mysql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package fsdup
import (
"database/sql"
"encoding/hex"
"fmt"
)
type mysqlMetaStore struct {
db *sql.DB
}
var _ MetaStore = &mysqlMetaStore{}
func NewMysqlMetaStore(dataSource string) (*mysqlMetaStore, error) {
debugf("Creating MySQL metadata store using datasource %s\n", dataSource)
db, err := sql.Open("mysql", dataSource) // "fsdup:fsdup@/fsdup"
if err != nil {
return nil, err
}
return &mysqlMetaStore{
db: db,
}, nil
}
func (s *mysqlMetaStore) ReadManifest(manifestId string) (*manifest, error) {
manifest := NewManifest(DefaultChunkSizeMaxBytes) // FIXME
rows, err := s.db.Query(
"SELECT checksum, chunkOffset, chunkLength, kind FROM manifest WHERE manifestId = ? ORDER BY offset ASC",
manifestId)
if err != nil {
return nil, err
}
defer rows.Close()
diskfrom := int64(0)
for rows.Next() {
var chunkOffset, chunkLength int64
var kindStr string
var checksum sql.NullString
if err := rows.Scan(&checksum, &chunkOffset, &chunkLength, &kindStr); err != nil {
return nil, err
}
kind, err := kindFromString(kindStr)
if err != nil {
return nil, err
}
var checksumBytes []byte
if checksum.Valid {
checksumBytes, err = hex.DecodeString(checksum.String)
if err != nil {
return nil, err
}
}
manifest.Add(&chunkSlice{
checksum: checksumBytes, // Can be nil!
kind: kind,
diskfrom: diskfrom,
diskto: diskfrom + chunkLength,
chunkfrom: chunkOffset,
chunkto: chunkOffset + chunkLength,
length: chunkLength,
})
diskfrom += chunkLength
}
return manifest, nil
}
func (s* mysqlMetaStore) WriteManifest(manifestId string, manifest *manifest) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT INTO manifest
SET
manifestId = ?,
offset = ?,
checksum = ?,
chunkOffset = ?,
chunkLength = ?,
kind = ?
`)
if err != nil {
return err
}
defer stmt.Close() // danger!
for _, offset := range manifest.Offsets() {
slice := manifest.Get(offset)
if slice.kind == kindSparse {
_, err = stmt.Exec(manifestId, offset, &sql.NullString{},
slice.chunkfrom, slice.length, slice.kind.toString())
} else {
_, err = stmt.Exec(manifestId, offset, fmt.Sprintf("%x", slice.checksum),
slice.chunkfrom, slice.length, slice.kind.toString())
}
if err != nil {
return err
}
}
return tx.Commit()
}