Skip to content

Commit

Permalink
Add asyncConnect and TimeOut to netty Transceiver
Browse files Browse the repository at this point in the history
Signed-off-by: Sébastien GLON <[email protected]>
  • Loading branch information
Sébastien GLON committed Aug 30, 2016
1 parent 185b8a4 commit 002ce83
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 141 deletions.
91 changes: 0 additions & 91 deletions Transceiver.go

This file was deleted.

14 changes: 6 additions & 8 deletions examples/flume/client.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package main
import (
"github.com/sebglon/goavro"
"net"
"log"

"github.com/sebglon/goavro/transceiver/netty"
"time"
)

func main() {
//t.SkipNow()
rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
transceiver,err := netty.NewTransceiver(netty.Config{AsyncConnect:true, NettyHost:"10.98.80.113"})
if err != nil {
log.Fatal(err)
}
defer conn.Close()

transceiver := goavro.NewNettyTransceiver(conn)
protocol, err := goavro.NewProtocol()
if err != nil {
log.Fatal(err)
Expand All @@ -30,12 +26,14 @@ func main() {
flumeRecord.Set("headers", headers)
flumeRecord.Set("body", []byte("2016-08-02 14:45:38|flume.composantTechnique_IS_UNDEFINED|flume.application_IS_UNDEFINED|flume.client_IS_UNDEFINED|flume.plateforme_IS_UNDEFINED|instance_IS_UNDEFINED|logname_IS_UNDEFINED|WARN |test.LogGenerator|test !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"))
requestor := goavro.NewRequestor(protocol, transceiver)


err = requestor.Request("append", flumeRecord)

if err != nil {
log.Fatal("Request: ", err)
}

time.Sleep(5 * time.Second)
err = requestor.Request("append", flumeRecord)

if err != nil {
Expand Down
1 change: 0 additions & 1 deletion protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func TestGetCodec(t *testing.T) {
t.Fatal(errFlume)
}
headers := make(map[string]interface{})
headers[AVRO_SCHEMA_LITERAL_HEADER] = stringSchema
headers["host_header"] = "127.0.0.1"
flumeRecord.Set("headers", headers)
flumeRecord.Set("body", []byte("test"))
Expand Down
30 changes: 17 additions & 13 deletions requestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"log"
"github.com/sebglon/goavro/transceiver"
)

var REMOTE_HASHES map[string][]byte
Expand All @@ -20,7 +21,7 @@ var HANDSHAKE_REQUESTOR_READER Codec
type Requestor struct {
// Base class for the client side of protocol interaction.
local_protocol Protocol
transceiver Transceiver
transceiver transceiver.Transceiver
remote_protocol Protocol
remote_hash []byte
send_protocol bool
Expand All @@ -43,7 +44,7 @@ func init() {

}

func NewRequestor(localProto Protocol, transceiver Transceiver) *Requestor {
func NewRequestor(localProto Protocol, transceiver transceiver.Transceiver) *Requestor {
return &Requestor{
local_protocol: localProto,
transceiver: transceiver,
Expand All @@ -57,12 +58,12 @@ func NewRequestor(localProto Protocol, transceiver Transceiver) *Requestor {

func (a *Requestor) RemoteProtocol(proto Protocol) {
a.remote_protocol = proto
REMOTE_PROTOCOLS[a.transceiver.RemoteName()] = proto
REMOTE_PROTOCOLS[proto.Name] = proto
}

func (a *Requestor) RemoteHash(hash []byte) {
a.remote_hash = hash
REMOTE_HASHES[a.transceiver.RemoteName()] = hash
REMOTE_HASHES[a.remote_protocol.Name] = hash
}

func (a *Requestor) Request(message_name string, request_datum interface{}) error {
Expand Down Expand Up @@ -94,18 +95,21 @@ func (a *Requestor) Request(message_name string, request_datum interface{}) er
}
//buffer_decoder := bytes.NewBuffer(decoder)
// process the handshake and call response
ok, err := a.read_handshake_response(responses[0])
if err!=nil {
return err
}
a.send_handshake= !ok

if ok {
a.read_call_responseCode(responses[1])
if err!=nil {
if len(responses) >0 {
ok, err := a.read_handshake_response(responses[0])
if err != nil {
return err
}
// a.Request(message_name, request_datum)
a.send_handshake = !ok

if ok {
a.read_call_responseCode(responses[1])
if err != nil {
return err
}
// a.Request(message_name, request_datum)
}
}
return nil
}
Expand Down
52 changes: 25 additions & 27 deletions requestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"bytes"
"reflect"
netty "github.com/sebglon/goavro/transceiver/netty"
)

func TestWrite_handshake_request(t *testing.T) {
Expand All @@ -16,7 +17,10 @@ func TestWrite_handshake_request(t *testing.T) {
}
defer conn.Close()

transceiver := NewNettyTransceiver(conn)
transceiver, err := netty.NewTransceiver(netty.Config{})
if err != nil {
t.Fatal(err)
}
protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -75,14 +79,12 @@ func TestRead_handshake_reponse(t *testing.T) {
t.Fatal(err)
}

rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)

transceiver, err := netty.NewTransceiver(netty.Config{})
if err != nil {
t.Fatal(err)
}
defer conn.Close()

transceiver := NewNettyTransceiver(conn)
protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
Expand All @@ -96,17 +98,21 @@ func TestRead_handshake_reponse(t *testing.T) {

}

type Conn struct {
bytes.Buffer
}
func (c *Conn) Close() error {
return nil
}

func TestWrite_call_request(t *testing.T) {
//t.SkipNow()
rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

transceiver := NewNettyTransceiver(conn)

transceiver, err := netty.NewTransceiver(netty.Config{})
buf := &Conn{}
transceiver.Conn = buf

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -147,14 +153,10 @@ func TestWrite_call_request(t *testing.T) {

func TestWrite_call_requestHeader(t *testing.T) {
//t.SkipNow()
rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
transceiver, err := netty.NewTransceiver(netty.Config{})
buf := &Conn{}
transceiver.Conn = buf

transceiver := NewNettyTransceiver(conn)
protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
Expand All @@ -173,16 +175,12 @@ func TestWrite_call_requestHeader(t *testing.T) {
}

func TestRead_call_responseMessage(t *testing.T) {
//t.SkipNow()

rAddr, err := net.ResolveTCPAddr("tcp", "10.98.80.113:63001")
conn, err := net.DialTCP("tcp", nil, rAddr)
if err != nil {
t.Fatal(err)
}
defer conn.Close()

transceiver := NewNettyTransceiver(conn)
transceiver, err := netty.NewTransceiver(netty.Config{})
buf := &Conn{}
transceiver.Conn = buf

protocol, err := NewProtocol()
if err != nil {
t.Fatal(err)
Expand Down
11 changes: 11 additions & 0 deletions transceiver/Tranceiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package transceiver

import (
"bytes"
"io"
)

type Transceiver interface {
Transceive(request []bytes.Buffer) ([]io.Reader, error)

}
Loading

0 comments on commit 002ce83

Please sign in to comment.