Skip to content

Commit 3c5fcd0

Browse files
author
Soumo Sarkar
committed
Add hashing of local file path to translate while distributing across other servers
1 parent 8b1d14a commit 3c5fcd0

File tree

6 files changed

+143
-126
lines changed

6 files changed

+143
-126
lines changed

crypto.go

+37-44
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,33 @@ package main
33
import (
44
"crypto/aes"
55
"crypto/cipher"
6+
"crypto/md5"
67
"crypto/rand"
8+
"encoding/hex"
79
"io"
810
)
911

12+
func generateID() string {
13+
buf := make([]byte, 32)
14+
io.ReadFull(rand.Reader, buf)
15+
return hex.EncodeToString(buf)
16+
}
17+
18+
func hashKey(key string) string {
19+
hash := md5.Sum([]byte(key))
20+
return hex.EncodeToString(hash[:])
21+
}
22+
1023
func newEncryptionKey() []byte {
1124
keyBuf := make([]byte, 32)
1225
io.ReadFull(rand.Reader, keyBuf)
1326
return keyBuf
1427
}
1528

16-
func copyDecrypt(key []byte, src io.Reader, dst io.Writer) (int, error) {
17-
block, err := aes.NewCipher(key)
18-
if err != nil {
19-
return 0, err
20-
}
21-
22-
/*
23-
Read the IV from the given io.Reader
24-
in our case which should be the block.BlockSize bytes we read.
25-
*/
26-
iv := make([]byte, block.BlockSize())
27-
if _, err := src.Read(iv); err != nil {
28-
return 0, err
29-
}
30-
29+
func copyStream(stream cipher.Stream, BlockSize int, src io.Reader, dst io.Writer) (int, error) {
3130
var (
32-
buf = make([]byte, 32*1024)
33-
stream = cipher.NewCTR(block, iv)
34-
nw = block.BlockSize()
31+
buf = make([]byte, 32*1024)
32+
nw = BlockSize
3533
)
3634

3735
for {
@@ -55,6 +53,25 @@ func copyDecrypt(key []byte, src io.Reader, dst io.Writer) (int, error) {
5553
return nw, nil
5654
}
5755

56+
func copyDecrypt(key []byte, src io.Reader, dst io.Writer) (int, error) {
57+
block, err := aes.NewCipher(key)
58+
if err != nil {
59+
return 0, err
60+
}
61+
62+
/*
63+
Read the IV from the given io.Reader
64+
in our case which should be the block.BlockSize bytes we read.
65+
*/
66+
iv := make([]byte, block.BlockSize())
67+
if _, err := src.Read(iv); err != nil {
68+
return 0, err
69+
}
70+
71+
stream := cipher.NewCTR(block, iv)
72+
return copyStream(stream, block.BlockSize(), src, dst)
73+
}
74+
5875
func copyEncrypt(key []byte, src io.Reader, dst io.Writer) (int, error) {
5976
block, err := aes.NewCipher(key)
6077
if err != nil {
@@ -71,30 +88,6 @@ func copyEncrypt(key []byte, src io.Reader, dst io.Writer) (int, error) {
7188
return 0, err
7289
}
7390

74-
var (
75-
buf = make([]byte, 32*1024)
76-
stream = cipher.NewCTR(block, iv)
77-
nw = block.BlockSize()
78-
)
79-
80-
for {
81-
n, err := src.Read(buf)
82-
if n > 0 {
83-
stream.XORKeyStream(buf, buf[:n])
84-
n, err := dst.Write(buf[:n])
85-
if err != nil {
86-
return 0, err
87-
}
88-
nw += n
89-
}
90-
91-
if err == io.EOF {
92-
break
93-
}
94-
if err != nil {
95-
return 0, err
96-
}
97-
}
98-
99-
return nw, nil
91+
stream := cipher.NewCTR(block, iv)
92+
return copyStream(stream, block.BlockSize(), src, dst)
10093
}

main.go

+29-19
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package main
22

33
import (
44
"bytes"
5+
"fmt"
6+
"io"
57
"log"
68
"time"
79

@@ -36,29 +38,37 @@ func makeServer(listenAddr string, nodes ...string) *FileServer {
3638
func main() {
3739

3840
s1 := makeServer(":3000", "")
39-
s2 := makeServer(":4000", ":3000")
41+
s2 := makeServer(":7000", ":3000")
42+
s3 := makeServer(":5000", ":3000", ":7000")
4043

41-
go func() {
42-
log.Fatal(s1.Start())
43-
}()
44+
go func() { log.Fatal(s1.Start()) }()
45+
time.Sleep(500 * time.Millisecond)
46+
go func() { log.Fatal(s2.Start()) }()
4447

4548
time.Sleep(2 * time.Second)
4649

47-
go s2.Start()
50+
go s3.Start()
4851
time.Sleep(2 * time.Second)
4952

50-
data := bytes.NewReader([]byte("my big data file here!"))
51-
s2.Store("cool_picture.jpg", data)
52-
53-
// r, err := s2.Get("cool_picture.jpg")
54-
// if err != nil {
55-
// log.Fatal(err)
56-
// }
57-
//
58-
// b, err := io.ReadAll(r)
59-
// if err != nil {
60-
// log.Fatal(err)
61-
// }
62-
//
63-
// fmt.Println(string(b))
53+
for i := 0; i < 20; i++ {
54+
key := fmt.Sprintf("picture_%d.png", i)
55+
data := bytes.NewReader([]byte("my big data file here!"))
56+
s3.Store(key, data)
57+
58+
if err := s3.storage.Delete(s3.ID, key); err != nil {
59+
log.Fatal(err)
60+
}
61+
62+
r, err := s3.Get(key)
63+
if err != nil {
64+
log.Fatal(err)
65+
}
66+
67+
b, err := io.ReadAll(r)
68+
if err != nil {
69+
log.Fatal(err)
70+
}
71+
72+
fmt.Println(string(b))
73+
}
6474
}

p2p/tcp_transport_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ func TestTCPTransport(t *testing.T) {
1313
Decoder: &DefaultDecoder{},
1414
}
1515
tr := NewTCPTransport(opts)
16-
assert.Equal(t, tr.ListenAddr, ":3000")
16+
assert.Equal(t, tr.ListenAddress, ":3000")
1717

1818
assert.Nil(t, tr.ListenAndAccept())
1919
}

server.go

+33-29
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
)
1515

1616
type FileServerOptions struct {
17+
ID string
1718
EncKey []byte
1819
StorageRoot string
1920
PathTransformFunc PathTransformFunc
@@ -37,6 +38,10 @@ func NewFileServer(opts FileServerOptions) *FileServer {
3738
PathTransformFunc: opts.PathTransformFunc,
3839
}
3940

41+
if len(opts.ID) == 0 {
42+
opts.ID = generateID()
43+
}
44+
4045
return &FileServer{
4146
FileServerOptions: opts,
4247
storage: NewStorage(storageOpts),
@@ -45,16 +50,6 @@ func NewFileServer(opts FileServerOptions) *FileServer {
4550
}
4651
}
4752

48-
func (server *FileServer) stream(msg *Message) error {
49-
peers := []io.Writer{}
50-
for _, peer := range server.peers {
51-
peers = append(peers, peer)
52-
}
53-
54-
mw := io.MultiWriter(peers...)
55-
return gob.NewEncoder(mw).Encode(msg)
56-
}
57-
5853
func (server *FileServer) broadcast(msg *Message) error {
5954
/* encode the message */
6055
buf := new(bytes.Buffer)
@@ -78,26 +73,29 @@ type Message struct {
7873
}
7974

8075
type MessageStoreFile struct {
76+
ID string
8177
Key string
8278
Size int64
8379
}
8480

8581
type MessageGetFile struct {
82+
ID string
8683
Key string
8784
}
8885

8986
func (server *FileServer) Get(key string) (io.Reader, error) {
90-
if server.storage.Has(key) {
87+
if server.storage.Has(server.ID, key) {
9188
fmt.Printf("[%s] serving file (%s) from local disk\n", server.Transport.ListenAddr(), key)
92-
_, r, err := server.storage.Read(key)
89+
_, r, err := server.storage.Read(server.ID, key)
9390
return r, err
9491
}
9592

9693
fmt.Printf("[%s] don't have file (%s) locally, fetching from network...\n", server.Transport.ListenAddr(), key)
9794

9895
msg := Message{
9996
Payload: MessageGetFile{
100-
Key: key,
97+
ID: server.ID,
98+
Key: hashKey(key),
10199
},
102100
}
103101

@@ -109,10 +107,11 @@ func (server *FileServer) Get(key string) (io.Reader, error) {
109107

110108
for _, peer := range server.peers {
111109
/* first read the file size so we can limit the amount of bytes
112-
that wee need from the connection so it will not keep hanging */
110+
that we need from the connection so it will not keep hanging */
113111
var fileSize int64
114112
binary.Read(peer, binary.LittleEndian, &fileSize)
115-
n, err := server.storage.Write(key, io.LimitReader(peer, 22))
113+
114+
n, err := server.storage.writeDecrypt(server.EncKey, server.ID, key, io.LimitReader(peer, fileSize))
116115
if err != nil {
117116
return nil, err
118117
}
@@ -122,7 +121,7 @@ func (server *FileServer) Get(key string) (io.Reader, error) {
122121
peer.CloseStream()
123122
}
124123

125-
_, r, err := server.storage.Read(key)
124+
_, r, err := server.storage.Read(server.ID, key)
126125
return r, err
127126
}
128127

@@ -137,14 +136,15 @@ func (server *FileServer) Store(key string, r io.Reader) error {
137136
tee = io.TeeReader(r, fileBuffer)
138137
)
139138

140-
size, err := server.storage.Write(key, tee)
139+
size, err := server.storage.Write(server.ID, key, tee)
141140
if err != nil {
142141
return nil
143142
}
144143

145144
msg := Message{
146145
Payload: MessageStoreFile{
147-
Key: key,
146+
ID: server.ID,
147+
Key: hashKey(key),
148148
Size: size + 16,
149149
},
150150
}
@@ -155,17 +155,20 @@ func (server *FileServer) Store(key string, r io.Reader) error {
155155

156156
time.Sleep(time.Millisecond * 5)
157157

158-
// TODO: use a multiwriter here.
158+
peers := []io.Writer{}
159159
for _, peer := range server.peers {
160-
peer.Send([]byte{p2p.IncomingStream})
161-
n, err := copyEncrypt(server.EncKey, fileBuffer, peer)
162-
if err != nil {
163-
return err
164-
}
160+
peers = append(peers, peer)
161+
}
165162

166-
fmt.Println("received and written to disk: ", n)
163+
mw := io.MultiWriter(peers...)
164+
mw.Write([]byte{p2p.IncomingStream})
165+
n, err := copyEncrypt(server.EncKey, fileBuffer, mw)
166+
if err != nil {
167+
return err
167168
}
168169

170+
fmt.Printf("[%s] received and written (%d) bytes to disk\n", server.Transport.ListenAddr(), n)
171+
169172
return nil
170173
}
171174

@@ -219,13 +222,13 @@ func (server *FileServer) handleMessage(from string, msg *Message) error {
219222
}
220223

221224
func (server *FileServer) handleMessageGetFile(from string, msg MessageGetFile) error {
222-
if !server.storage.Has(msg.Key) {
225+
if !server.storage.Has(msg.ID, msg.Key) {
223226
return fmt.Errorf("[%s] need to serve file (%s) but it doesn't exist on disk", server.Transport.ListenAddr(), msg.Key)
224227
}
225228

226229
fmt.Printf("[%s] serving file (%s) over the network\n", server.Transport.ListenAddr(), msg.Key)
227230

228-
fileSize, r, err := server.storage.Read(msg.Key)
231+
fileSize, r, err := server.storage.Read(msg.ID, msg.Key)
229232
if err != nil {
230233
return err
231234
}
@@ -261,7 +264,7 @@ func (server *FileServer) handleMessageStoreFile(from string, msg MessageStoreFi
261264
return fmt.Errorf("peer (%s) could not be found in the peer list", from)
262265
}
263266

264-
n, err := server.storage.Write(msg.Key, io.LimitReader(peer, msg.Size))
267+
n, err := server.storage.Write(msg.ID, msg.Key, io.LimitReader(peer, msg.Size))
265268
if err != nil {
266269
return err
267270
}
@@ -280,7 +283,7 @@ func (server *FileServer) bootstrapNetwork() error {
280283
}
281284

282285
go func(addr string) {
283-
fmt.Println("attempting to connect with remote: ", addr)
286+
fmt.Printf("[%s] attempting to connect with remote %s\n", server.Transport.ListenAddr(), addr)
284287

285288
if err := server.Transport.Dial(addr); err != nil {
286289
log.Println("dial error: ", err)
@@ -292,6 +295,7 @@ func (server *FileServer) bootstrapNetwork() error {
292295
}
293296

294297
func (server *FileServer) Start() error {
298+
fmt.Printf("[%s] starting fileserver...\n", server.Transport.ListenAddr())
295299
if err := server.Transport.ListenAndAccept(); err != nil {
296300
return err
297301
}

0 commit comments

Comments
 (0)