Skip to content

Commit

Permalink
add read Handshake Response
Browse files Browse the repository at this point in the history
  • Loading branch information
Sébastien GLON committed Aug 25, 2016
1 parent 4d3728f commit d02902a
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 17 deletions.
37 changes: 32 additions & 5 deletions Transceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"net"
"encoding/binary"
"fmt"
"os"
"io"
)

type Transceiver interface {
Transceive(request []bytes.Buffer) ([]byte, error)
Transceive(request []bytes.Buffer) ([]io.Reader, error)
RemoteName() string
SetRemoteName(string)
}
Expand All @@ -33,7 +35,7 @@ func (t NettyTransceiver) SetRemoteName(name string) {
t.remoteName = name
}

func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]byte, error){
func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]io.Reader, error){
nettyFrame := new(bytes.Buffer)
t.Pack(nettyFrame, requests)

Expand All @@ -42,11 +44,14 @@ func (t NettyTransceiver) Transceive(requests []bytes.Buffer) ([]byte, error){
if err!=nil {
return nil, fmt.Errorf("Fail to write on socket %v", err)
}
//sfmt.Fprintf(os.Stdout, "BufferSize %v", nettyFrame)

// Read Response
bodyBytes := make([]byte, 1024)
t.sock.Read(bodyBytes)
return bodyBytes, nil
_, err = t.sock.Read(bodyBytes)
if err!=nil {
return nil, fmt.Errorf("Fail to read on socket %v", err)
}
return t.Unpack(bodyBytes)
}

func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) {
Expand All @@ -67,4 +72,26 @@ func (t *NettyTransceiver) Pack(frame *bytes.Buffer, requests []bytes.Buffer) {
frame.Write(requestSize)
frame.Write(request.Bytes())
}
}

func (t *NettyTransceiver) Unpack(frame []byte) ([]io.Reader, error) {

nettyNumberFame := binary.BigEndian.Uint32(frame[4:8])
result := make([]io.Reader, nettyNumberFame)
startFrame := uint32(8)
i:=uint32(0)
for i < nettyNumberFame {


messageSize := uint32(binary.BigEndian.Uint32(frame[startFrame:startFrame+4]))
fmt.Fprintf(os.Stdout, "\nnettyNumberFrame %v %v ", startFrame, frame[startFrame:startFrame+4])
message := frame[startFrame+4:startFrame+4+messageSize]
fmt.Fprintf(os.Stdout, "\nmessage: %v", message)
startFrame = startFrame+4+messageSize
br := bytes.NewReader(message)
result[i] = br
i++
}

return result, nil
}
23 changes: 23 additions & 0 deletions Transceiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"bytes"
"reflect"
"io/ioutil"
)

func TestPack(t *testing.T) {
Expand All @@ -26,3 +27,25 @@ func TestPack(t *testing.T) {
t.Fatalf("Frame not equals to %x: %x / %x",expectedSize, frame.Len(), frame.Bytes())
}
}

func TestUnpack(t *testing.T) {
transceiver := new(NettyTransceiver)
frame := []byte("\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x01\x0a\x00\x00\x00\x01\x0b")
respons, err := transceiver.Unpack(frame)
if err != nil {
t.Fatalf("%v",err)
}

if len(respons)!=2 {
t.Fatalf("Number of reponse frame not equals %x / %x",2, len(respons))
}

var resp1 []byte
var resp2 []byte
resp1, _ =ioutil.ReadAll(respons[0])
respons[1].Read(resp2)
if !reflect.DeepEqual(resp1, []byte("\x0a")) && !reflect.DeepEqual(resp2, []byte("\x0b")) {
t.Fatalf("Reponse message not equals (0) %x/%x; (1) %x/%x","\x0a", respons[0], "\x0b", respons[1])
}

}
6 changes: 6 additions & 0 deletions ipc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,11 @@ func TestRequestor(t *testing.T) {
if err != nil {
t.Fatal("Request: ", err)
}

err = requestor.Request("append", flumeRecord)

if err != nil {
t.Fatal("Request: ", err)
}
}

36 changes: 24 additions & 12 deletions requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"log"
"os"
)

var REMOTE_HASHES map[string][]byte
Expand All @@ -25,6 +25,7 @@ type Requestor struct {
remote_protocol Protocol
remote_hash []byte
send_protocol bool
send_handshake bool
}
func init() {
var err error
Expand All @@ -49,7 +50,8 @@ func NewRequestor(localProto Protocol, transceiver Transceiver) *Requestor {
transceiver: transceiver,
// remote_protocol: nil,
// remote_hash: nil,
// send_protocol: nil,
send_protocol: false,
send_handshake: true,
}
}

Expand Down Expand Up @@ -86,25 +88,32 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er

// sen the handshake and call request; block until call response
buffer_writers := []bytes.Buffer{*frame1, *frame2}
decoder, err := a.transceiver.Transceive(buffer_writers)
responses, err := a.transceiver.Transceive(buffer_writers)

if err!=nil {
return err
}
buffer_decoder := bytes.NewBuffer(decoder)
//buffer_decoder := bytes.NewBuffer(decoder)
// process the handshake and call response
//ok, err := a.read_handshake_response(buffer_decoder)
fmt.Sprintf("Response %v", buffer_decoder)
//if err!=nil {
// return err
//} else if ok {
fmt.Fprintf(os.Stdout, "\nresponsee %#v", responses)
ok, err := a.read_handshake_response(responses[0])
if err!=nil {
return err
}
a.send_handshake= !ok

if ok {
// a.read_call_response(message_name, buffer_decoder)
//} else {
// a.Request(message_name, request_datum)
//}
}
return nil
}

func (a *Requestor) write_handshake_request( buffer io.Writer ) (err error) {
if !a.send_handshake {
return nil
}
local_hash :=a.local_protocol.MD5
remote_name := a.remote_protocol.Name
remote_hash := REMOTE_HASHES[remote_name]
Expand Down Expand Up @@ -180,11 +189,14 @@ func (a *Requestor) write_request(request_codec Codec, request_datum interface{}
}

func (a *Requestor) read_handshake_response(decoder io.Reader) (bool, error) {
resp, _ := ioutil.ReadAll(decoder)
if !a.send_handshake {
return true, nil
}

datum, err := HANDSHAKE_REQUESTOR_READER.Decode(decoder)
if err != nil {

return false,fmt.Errorf("Fail to decode %v with error %v", resp, err)
return false,fmt.Errorf("Fail to decode %v with error %v", decoder, err)
}

record, ok := datum.(*Record)
Expand Down
48 changes: 48 additions & 0 deletions requestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,54 @@ func TestWrite_handshake_request(t *testing.T) {

}

func TestRead_handshake_reponse(t *testing.T) {
codecHandshake, err := NewCodec(handshakeResponseshema)
if err != nil {
t.Fatal(err)
}
record, err := NewRecord(RecordSchema(handshakeResponseshema))
if err != nil {
t.Fatal(err)
}
record.Set("match", Enum{"match","BOTH"})
record.Set("serverProtocol", nil)
record.Set("serverHash", nil)
record.Set("meta", nil)

bb := new(bytes.Buffer)
err = codecHandshake.Encode(bb, record)
if err != nil {
t.Fatal(err)
}
t.Logf("Encode HandshakeResponse %v", bb.Bytes())


_, err = codecHandshake.Decode(bytes.NewReader(bb.Bytes()))
if err != nil {
t.Fatal(err)
}

rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

transceiver := NewNettyTransceiver(conn)
protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
}
requestor := NewRequestor(protocol, transceiver)

_, err = requestor.read_handshake_response(bytes.NewReader(bb.Bytes()))
if err != nil {
t.Fatal(err)
}

}


func TestWrite_call_request(t *testing.T) {
//t.SkipNow()
Expand Down

0 comments on commit d02902a

Please sign in to comment.