-
Notifications
You must be signed in to change notification settings - Fork 4
/
store_remote.go
101 lines (79 loc) · 2.1 KB
/
store_remote.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package fsdup
import (
"context"
"errors"
"fmt"
"google.golang.org/grpc"
"heckel.io/fsdup/pb"
"sync"
)
type remoteChunkStore struct {
serverAddr string
client pb.HubClient
sync.Mutex
}
func NewRemoteChunkStore(serverAddr string) *remoteChunkStore {
return &remoteChunkStore{
serverAddr: serverAddr,
client: nil,
}
}
func (idx *remoteChunkStore) Stat(checksum []byte) error {
return nil
}
func (idx *remoteChunkStore) Write(checksum []byte, buffer []byte) error {
if err := idx.ensureConnected(); err != nil {
return err
}
_, err := idx.client.WriteChunk(context.Background(), &pb.WriteChunkRequest{
Checksum: checksum,
Data: buffer,
})
return err
}
func (idx *remoteChunkStore) ReadAt(checksum []byte, buffer []byte, offset int64) (int, error) {
if err := idx.ensureConnected(); err != nil {
return 0, err
}
response, err := idx.client.ReadChunk(context.Background(), &pb.ReadChunkRequest{
Checksum: checksum,
Offset: offset,
Length: int64(len(buffer)),
})
if err != nil {
return 0, err
}
if len(buffer) != len(response.Data) {
return 0, errors.New(fmt.Sprintf("unexpected chunk returned from server, expected %d bytes, but got %d",
len(buffer), len(response.Data)))
}
copied := copy(buffer, response.Data)
if copied != len(buffer) {
return copied, errors.New(fmt.Sprintf("could not copy entire response to buffer, only %d bytes copied, but %d bytes requested",
copied, len(buffer)))
}
return copied, nil
}
func (idx *remoteChunkStore) Remove(checksum []byte) error {
if err := idx.ensureConnected(); err != nil {
return err
}
_, err := idx.client.RemoveChunk(context.Background(), &pb.RemoveChunkRequest{
Checksum: checksum,
})
return err
}
func (idx *remoteChunkStore) ensureConnected() error {
idx.Lock()
defer idx.Unlock()
if idx.client != nil {
return nil
}
conn, err := grpc.Dial(idx.serverAddr, grpc.WithBlock(), grpc.WithInsecure(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(128 * 1024 * 1024), grpc.MaxCallRecvMsgSize(128 * 1024 * 1024))) // FIXME
if err != nil {
return err
}
idx.client = pb.NewHubClient(conn)
return nil
}