Skip to content

Commit

Permalink
Merge pull request kubevirt#2872 from rmohr/fix-vnc-reading
Browse files Browse the repository at this point in the history
Fix straming from websockets to tcp
  • Loading branch information
kubevirt-bot authored Nov 7, 2019
2 parents 5f495a7 + 8706b3f commit 0503e3c
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 19 deletions.
2 changes: 2 additions & 0 deletions staging/src/kubevirt.io/client-go/kubecli/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_test(
"vm_test.go",
"vmi_test.go",
"vmipreset_test.go",
"websocket_test.go",
],
embed = [":go_default_library"],
deps = [
Expand All @@ -27,6 +28,7 @@ go_test(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/rand:go_default_library",
],
)

Expand Down
2 changes: 1 addition & 1 deletion staging/src/kubevirt.io/client-go/kubecli/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"

virtv1 "kubevirt.io/kubevirt/pkg/api/v1"
virtv1 "kubevirt.io/client-go/api/v1"
)

var _ = Describe("Kubevirt VirtualMachine Client", func() {
Expand Down
48 changes: 30 additions & 18 deletions staging/src/kubevirt.io/client-go/kubecli/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

const (
WebsocketMessageBufferSize = 10240
wsFrameHeaderSize = 2 + 8 + 4 // Fixed header + length + mask (RFC 6455)
)

func NewUpgrader() *websocket.Upgrader {
Expand All @@ -55,24 +54,16 @@ func Dial(address string, tlsConfig *tls.Config) (*websocket.Conn, *http.Respons
return dialer.Dial(address, nil)
}

func Copy(dst *websocket.Conn, src *websocket.Conn) (written int64, err error) {
return copy(&binaryWriter{conn: dst}, &binaryReader{conn: src})
func Copy(dst *websocket.Conn, src *websocket.Conn) (int64, error) {
return io.Copy(dst.UnderlyingConn(), src.UnderlyingConn())
}

func CopyFrom(dst io.Writer, src *websocket.Conn) (written int64, err error) {
return copy(dst, &binaryReader{conn: src})
return io.Copy(dst, &binaryReader{conn: src})
}

func CopyTo(dst *websocket.Conn, src io.Reader) (written int64, err error) {
return copy(&binaryWriter{conn: dst}, src)
}

func copy(dst io.Writer, src io.Reader) (written int64, err error) {
// our websocket package has an issue where it truncates messages
// when the message+header is greater than the buffer size we allocate.
// thus, we copy in chunks of WebsocketMessageBufferSize-wsFrameHeaderSize
buf := make([]byte, WebsocketMessageBufferSize-wsFrameHeaderSize)
return io.CopyBuffer(dst, src, buf)
return io.Copy(&binaryWriter{conn: dst}, src)
}

type binaryWriter struct {
Expand All @@ -85,27 +76,48 @@ func (s *binaryWriter) Write(p []byte) (int, error) {
return 0, convert(err)
}
defer w.Close()
return w.Write(p)
n, err := w.Write(p)
return n, err
}

type binaryReader struct {
conn *websocket.Conn
conn *websocket.Conn
reader io.Reader
}

func (s *binaryReader) Read(p []byte) (int, error) {
var msgType int
var err error
for {
msgType, r, err := s.conn.NextReader()
if s.reader == nil {
msgType, s.reader, err = s.conn.NextReader()
} else {
msgType = websocket.BinaryMessage
}
if err != nil {
s.reader = nil
return 0, convert(err)
}

switch msgType {
case websocket.BinaryMessage:
n, err := r.Read(p)
n, readErr := s.reader.Read(p)
err = readErr
if err != nil {
s.reader = nil
if err == io.EOF {
if n == 0 {
continue
} else {
return n, nil
}
}
}
return n, convert(err)

case websocket.CloseMessage:
return 0, io.EOF
default:
s.reader = nil
}
}
}
Expand Down
92 changes: 92 additions & 0 deletions staging/src/kubevirt.io/client-go/kubecli/websocket_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package kubecli

import (
"crypto/sha256"
"fmt"
"hash"
"io"
"net/http"
"net/http/httptest"
"strings"

"github.com/gorilla/websocket"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/util/rand"
)

var _ = Describe("Websocket", func() {

Context("data proxied through our websocket proxy", func() {
var proxy *httptest.Server
var target *httptest.Server
var receivedDataHash hash.Hash
var done chan error
BeforeEach(func() {
done = make(chan error)
receivedDataHash = sha256.New()
target = newTargetServer(receivedDataHash, done)
proxy = newProxyServer(target)
})
AfterEach(func() {
proxy.Close()
target.Close()
})
It("should transfer arbitrary sized packets which are bigger and smaller than the websocket buffer", func() {
proxyCon := dial(proxy)
defer proxyCon.Close()
messages := [][]byte{
[]byte(rand.String(WebsocketMessageBufferSize - 10)),
[]byte(rand.String(WebsocketMessageBufferSize + 10)),
[]byte(rand.String(10)),
[]byte(rand.String(WebsocketMessageBufferSize*3 + 10)),
}

expectedDataHash := sha256.New()
writer := binaryWriter{conn: proxyCon}
for _, msg := range messages {
expectedDataHash.Write(msg)
_, err := writer.Write(msg)
Expect(err).ToNot(HaveOccurred())
}
err := proxyCon.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
Expect(err).ToNot(HaveOccurred(), "failed to write close message")
err = <-done
Expect(err).ToNot(HaveOccurred(), "target server did not receive a propler close message")
Expect(fmt.Sprintf("%x", expectedDataHash.Sum(nil))).To(Equal(fmt.Sprintf("%x", receivedDataHash.Sum(nil))))
})
})

})

func newTargetServer(writer io.Writer, done chan error) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer GinkgoRecover()
upgrader := NewUpgrader()
targetCon, err := upgrader.Upgrade(w, r, nil)
Expect(err).ToNot(HaveOccurred())
_, err = CopyFrom(writer, targetCon)
done <- err
}))
}

func newProxyServer(target *httptest.Server) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer GinkgoRecover()
upgrader := NewUpgrader()
src, err := upgrader.Upgrade(w, r, nil)
Expect(err).ToNot(HaveOccurred())
targetURL := "ws" + strings.TrimPrefix(target.URL, "http")
dst, _, err := Dial(targetURL, nil)
Expect(err).ToNot(HaveOccurred())
defer dst.Close()
_, _ = Copy(dst, src)
}))
}

func dial(proxy *httptest.Server) *websocket.Conn {
u := "ws" + strings.TrimPrefix(proxy.URL, "http")
proxyCon, _, err := Dial(u, nil)
Expect(err).ToNot(HaveOccurred())
return proxyCon
}

0 comments on commit 0503e3c

Please sign in to comment.