Skip to content

Commit

Permalink
Temporary fix for issue #2.
Browse files Browse the repository at this point in the history
  • Loading branch information
prataprc committed Apr 19, 2016
1 parent e89b291 commit b740dc0
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 1 deletion.
2 changes: 1 addition & 1 deletion TODO.rst
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
* find a way to add SendHeartbeat() in verify/{client.go,server.go}
* logs are commented, wrap them under log flag.
* document reserved tags.
* refactor verify and enable random and race.
Expand All @@ -15,3 +14,4 @@
* try gofast on raspberry-pi.
* support snappy compression.
* add code coverage for verification.
* find a way to add SendHeartbeat() in verify/{client.go,server.go}
8 changes: 8 additions & 0 deletions rx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ func (t *Transport) syncRx() {
streamupdate := func(stream *Stream) {
_, ok := livestreams[stream.opaque]
if ok && stream.Rxch == nil {
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v ##%d stream closed ...\n"
//log.Debugf(fmsg, t.logprefix, stream.opaque)
delete(livestreams, stream.opaque)
} else if stream.Rxch != nil {
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v ##%d stream started ...\n"
//log.Verbosef(fmsg, t.logprefix, stream.opaque)
livestreams[stream.opaque] = stream
Expand All @@ -53,18 +55,21 @@ func (t *Transport) syncRx() {
stream, streamok := livestreams[rxpkt.opaque]

if streamok && rxpkt.finish {
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v ##%d stream closed by remote ...\n"
//log.Debugf(fmsg, t.logprefix, stream.opaque)
t.putstream(rxpkt.opaque, stream, false /*tellrx*/)
delete(livestreams, rxpkt.opaque)
atomic.AddUint64(&t.n_rxfin, 1)
return
} else if rxpkt.finish {
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v ##%d unknown stream-fin from remote ...\n"
//log.Debugf(fmsg, t.logprefix, rxpkt.opaque)
atomic.AddUint64(&t.n_mdrops, 1)
return
}
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v received msg %#v streamok:%v\n"
//log.Debugf(fmsg, t.logprefix, rxpkt.msg, streamok)
if streamok == false { // post, request, stream-start
Expand Down Expand Up @@ -147,6 +152,7 @@ func (t *Transport) doRx() {
if err != nil {
break
}
//TODO: Issue #2, remove or prevent value escape to heap
//log.Debugf("%v %v ; received pkt\n", t.logprefix, rxpkt)
if t.putch(t.rxch, rxpkt) == false {
break
Expand Down Expand Up @@ -178,6 +184,7 @@ func (t *Transport) unframepkt(
log.Errorf("%v\n", err)
return
}
//TODO: Issue #2, remove or prevent value escape to heap
//log.Debugf("%v doRx() io.ReadFull() first %v\n", t.logprefix, pad)
// check cbor-prefix
n = 3
Expand All @@ -203,6 +210,7 @@ func (t *Transport) unframepkt(
return
}
atomic.AddUint64(&t.n_rxbyte, uint64(9+m))
//TODO: Issue #2, remove or prevent value escape to heap
//log.Debugf("%v doRx() io.ReadFull() second %v\n", t.logprefix, packet[:ln])

// first tag is opaque
Expand Down
3 changes: 3 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ type Stream struct {
// constructor used for remote streams.
func (t *Transport) newremotestream(opaque uint64) *Stream {
stream := t.fromrxstrm()

//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v ##%d(remote:%v) stream created ...\n"
//log.Verbosef(fmsg, t.logprefix, opaque, remote)

// reset all fields (it is coming from a pool)
stream.transport, stream.remote, stream.opaque = t, true, opaque
stream.Rxch = nil
Expand Down
4 changes: 4 additions & 0 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ func (t *Transport) FlushPeriod(ms time.Duration) {
if t.tx([]byte{} /*empty*/, true /*flush*/) != nil {
return
}

//TODO: Issue #2, remove or prevent value escape to heap
//log.Debugf("%v flushed ... \n", t.logprefix)

select {
Expand All @@ -288,6 +290,8 @@ func (t *Transport) SendHeartbeat(ms time.Duration) {
return
}
count++

//TODO: Issue #2, remove or prevent value escape to heap
//log.Debugf("%v posted heartbeat %v\n", t.logprefix, count)

select {
Expand Down
2 changes: 2 additions & 0 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (t *Transport) doTx() {
}
// send.
if n > 0 {
//TODO: Issue #2, remove or prevent value escape to heap
//fmsg := "%v doTx() socket write %v:%v\n"
//log.Debugf(fmsg, t.logprefix, n, tcpwrite_buf[:n])
m, err = t.conn.Write(tcpwrite_buf[:n])
Expand All @@ -191,6 +192,7 @@ func (t *Transport) doTx() {
arg.respch <- arg
}
}
//TODO: Issue #2, remove or prevent value escape to heap
//log.Debugf("%v drained %v packets\n", t.logprefix, len(batch))
batch = batch[:0] // reset the batch
}
Expand Down

0 comments on commit b740dc0

Please sign in to comment.