From b740dc00c9f8c101211413bf27c326866da3e79f Mon Sep 17 00:00:00 2001 From: prataprc Date: Tue, 19 Apr 2016 20:41:00 +0530 Subject: [PATCH] Temporary fix for issue #2. --- TODO.rst | 2 +- rx.go | 8 ++++++++ stream.go | 3 +++ transport.go | 4 ++++ tx.go | 2 ++ 5 files changed, 18 insertions(+), 1 deletion(-) diff --git a/TODO.rst b/TODO.rst index 69e2e25..d63ff72 100644 --- a/TODO.rst +++ b/TODO.rst @@ -1,4 +1,3 @@ -* find a way to add SendHeartbeat() in verify/{client.go,server.go} * logs are commented, wrap them under log flag. * document reserved tags. * refactor verify and enable random and race. @@ -15,3 +14,4 @@ * try gofast on raspberry-pi. * support snappy compression. * add code coverage for verification. +* find a way to add SendHeartbeat() in verify/{client.go,server.go} diff --git a/rx.go b/rx.go index c5bfdce..d388b74 100644 --- a/rx.go +++ b/rx.go @@ -39,10 +39,12 @@ func (t *Transport) syncRx() { streamupdate := func(stream *Stream) { _, ok := livestreams[stream.opaque] if ok && stream.Rxch == nil { + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v ##%d stream closed ...\n" //log.Debugf(fmsg, t.logprefix, stream.opaque) delete(livestreams, stream.opaque) } else if stream.Rxch != nil { + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v ##%d stream started ...\n" //log.Verbosef(fmsg, t.logprefix, stream.opaque) livestreams[stream.opaque] = stream @@ -53,6 +55,7 @@ func (t *Transport) syncRx() { stream, streamok := livestreams[rxpkt.opaque] if streamok && rxpkt.finish { + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v ##%d stream closed by remote ...\n" //log.Debugf(fmsg, t.logprefix, stream.opaque) t.putstream(rxpkt.opaque, stream, false /*tellrx*/) @@ -60,11 +63,13 @@ func (t *Transport) syncRx() { atomic.AddUint64(&t.n_rxfin, 1) return } else if rxpkt.finish { + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v ##%d unknown stream-fin from remote ...\n" //log.Debugf(fmsg, t.logprefix, rxpkt.opaque) atomic.AddUint64(&t.n_mdrops, 1) return } + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v received msg %#v streamok:%v\n" //log.Debugf(fmsg, t.logprefix, rxpkt.msg, streamok) if streamok == false { // post, request, stream-start @@ -147,6 +152,7 @@ func (t *Transport) doRx() { if err != nil { break } + //TODO: Issue #2, remove or prevent value escape to heap //log.Debugf("%v %v ; received pkt\n", t.logprefix, rxpkt) if t.putch(t.rxch, rxpkt) == false { break @@ -178,6 +184,7 @@ func (t *Transport) unframepkt( log.Errorf("%v\n", err) return } + //TODO: Issue #2, remove or prevent value escape to heap //log.Debugf("%v doRx() io.ReadFull() first %v\n", t.logprefix, pad) // check cbor-prefix n = 3 @@ -203,6 +210,7 @@ func (t *Transport) unframepkt( return } atomic.AddUint64(&t.n_rxbyte, uint64(9+m)) + //TODO: Issue #2, remove or prevent value escape to heap //log.Debugf("%v doRx() io.ReadFull() second %v\n", t.logprefix, packet[:ln]) // first tag is opaque diff --git a/stream.go b/stream.go index fedbea0..5de5e6c 100644 --- a/stream.go +++ b/stream.go @@ -13,8 +13,11 @@ type Stream struct { // constructor used for remote streams. func (t *Transport) newremotestream(opaque uint64) *Stream { stream := t.fromrxstrm() + + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v ##%d(remote:%v) stream created ...\n" //log.Verbosef(fmsg, t.logprefix, opaque, remote) + // reset all fields (it is coming from a pool) stream.transport, stream.remote, stream.opaque = t, true, opaque stream.Rxch = nil diff --git a/transport.go b/transport.go index bc78d11..1d8ef02 100644 --- a/transport.go +++ b/transport.go @@ -266,6 +266,8 @@ func (t *Transport) FlushPeriod(ms time.Duration) { if t.tx([]byte{} /*empty*/, true /*flush*/) != nil { return } + + //TODO: Issue #2, remove or prevent value escape to heap //log.Debugf("%v flushed ... \n", t.logprefix) select { @@ -288,6 +290,8 @@ func (t *Transport) SendHeartbeat(ms time.Duration) { return } count++ + + //TODO: Issue #2, remove or prevent value escape to heap //log.Debugf("%v posted heartbeat %v\n", t.logprefix, count) select { diff --git a/tx.go b/tx.go index 8714173..7ff5bc8 100644 --- a/tx.go +++ b/tx.go @@ -173,6 +173,7 @@ func (t *Transport) doTx() { } // send. if n > 0 { + //TODO: Issue #2, remove or prevent value escape to heap //fmsg := "%v doTx() socket write %v:%v\n" //log.Debugf(fmsg, t.logprefix, n, tcpwrite_buf[:n]) m, err = t.conn.Write(tcpwrite_buf[:n]) @@ -191,6 +192,7 @@ func (t *Transport) doTx() { arg.respch <- arg } } + //TODO: Issue #2, remove or prevent value escape to heap //log.Debugf("%v drained %v packets\n", t.logprefix, len(batch)) batch = batch[:0] // reset the batch }