-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathserver.go
113 lines (88 loc) · 2.79 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package fsdup
import (
"context"
_ "github.com/go-sql-driver/mysql"
"google.golang.org/grpc"
"heckel.io/fsdup/pb"
"net"
)
func ListenAndServe(address string, store ChunkStore, metaStore MetaStore) error {
srv := &server{
store: store,
metaStore: metaStore,
}
listener, err := net.Listen("tcp", address)
if err != nil {
return err
}
grpcServer := grpc.NewServer(grpc.MaxSendMsgSize(128 * 1024 * 1024), grpc.MaxRecvMsgSize(128 * 1024 * 1024)) // FIXME
pb.RegisterHubServer(grpcServer, srv)
return grpcServer.Serve(listener)
}
type server struct {
store ChunkStore
metaStore MetaStore
}
func (s *server) Diff(ctx context.Context, request *pb.DiffRequest) (*pb.DiffResponse, error) {
unknownChecksums := make([][]byte, 0)
for _, checksum := range request.Checksums {
if err := s.store.Stat(checksum); err != nil {
unknownChecksums = append(unknownChecksums, checksum)
}
}
return &pb.DiffResponse{
UnknownChecksums: unknownChecksums,
}, nil
}
func (s *server) WriteChunk(ctx context.Context, request *pb.WriteChunkRequest) (*pb.WriteChunkResponse, error) {
err := s.store.Write(request.Checksum, request.Data)
if err != nil {
return nil, err
}
return &pb.WriteChunkResponse{}, nil
}
func (s *server) ReadChunk(ctx context.Context, request *pb.ReadChunkRequest) (*pb.ReadChunkResponse, error) {
response := &pb.ReadChunkResponse{
Data: make([]byte, request.Length),
}
_, err := s.store.ReadAt(request.Checksum, response.Data, request.Offset)
if err != nil {
return nil, err
}
return response, nil
}
func (s *server) StatChunk(ctx context.Context, request *pb.StatChunkRequest) (*pb.StatChunkResponse, error) {
err := s.store.Stat(request.Checksum)
// FIXME this API is wrong
if err != nil {
return &pb.StatChunkResponse{Exists: false}, nil
} else {
return &pb.StatChunkResponse{Exists: true}, nil
}
}
func (s *server) RemoveChunk(ctx context.Context, request *pb.RemoveChunkRequest) (*pb.RemoveChunkResponse, error) {
debugf("server.RemoveChunk(%x)", request.Checksum)
err := s.store.Remove(request.Checksum)
if err != nil {
return nil, err
}
return &pb.RemoveChunkResponse{}, nil
}
func (s *server) WriteManifest(ctx context.Context, request *pb.WriteManifestRequest) (*pb.WriteManifestResponse, error) {
debugf("server.WriteManifest(%x)", request.Manifest.Id)
manifest, err := NewManifestFromProto(request.Manifest)
if err != nil {
return nil, err
}
if err := s.metaStore.WriteManifest(request.Id, manifest); err != nil {
return nil, err
}
return &pb.WriteManifestResponse{}, nil
}
func (s *server) ReadManifest(ctx context.Context, request *pb.ReadManifestRequest) (*pb.ReadManifestResponse, error) {
manifest, err := s.metaStore.ReadManifest(request.Id)
if err != nil {
return nil, err
}
return &pb.ReadManifestResponse{Manifest: manifest.Proto()}, nil
}