From cdf648affc1283e5c14d72f20dd5afaa77994aaf Mon Sep 17 00:00:00 2001 From: HMH Date: Tue, 24 Sep 2024 20:25:35 +0200 Subject: [PATCH] Replace websockets with fastwebsockets. This includes an update of hyper, and a change of the protocol. Specifically, the websocket connection does not require a separate port but instead is done via /ws. Furthermore, the client does not poll for frames anymore, instead it sets a preferred frame rate. --- Cargo.lock | 520 +++++++-------------------- Cargo.toml | 9 +- src/config.rs | 10 +- src/gui.rs | 56 +-- src/main.rs | 14 +- src/protocol.rs | 18 +- src/video.rs | 4 +- src/web.rs | 332 +++++++++++------ src/websocket.rs | 744 +++++++++++++++++++-------------------- src/weylus.rs | 142 ++------ ts/lib.ts | 91 ++--- www/templates/index.html | 140 ++++---- 12 files changed, 914 insertions(+), 1166 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ebbe771f..78558e7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,6 +38,12 @@ version = "1.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "10f00e1f6e58a40e807377c75c6a7f97bf9044fab57816f2414e6f5f4499d7b8" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -49,15 +55,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "autocfg" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78" -dependencies = [ - "autocfg 1.3.0", -] - [[package]] name = "autocfg" version = "1.3.0" @@ -98,22 +95,9 @@ dependencies = [ [[package]] name = "base64" -version = "0.9.3" +version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "489d6c0ed21b11d038c31b6ceccca973e65d73ba3bd8ecb9a2babf5546164643" -dependencies = [ - "byteorder", - "safemem", -] - -[[package]] -name = "base64" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b25d992356d2eb0ed82172f5248873db5560c4721f564b13cb5193bda5e668e" -dependencies = [ - "byteorder", -] +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" [[package]] name = "bitflags" @@ -133,42 +117,15 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d8c1fef690941d3e7788d328517591fecc684c084084702d6ff1641e993699a" -[[package]] -name = "block-buffer" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b" -dependencies = [ - "block-padding", - "byte-tools", - "byteorder", - "generic-array 0.12.4", -] - [[package]] name = "block-buffer" version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ - "generic-array 0.14.7", -] - -[[package]] -name = "block-padding" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa79dedbb091f449f1f39e53edf88d5dbe95f895dae6135a8d7b881fb5af73f5" -dependencies = [ - "byte-tools", + "generic-array", ] -[[package]] -name = "byte-tools" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7" - [[package]] name = "bytemuck" version = "1.18.0" @@ -232,15 +189,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "cloudabi" -version = "0.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" -dependencies = [ - "bitflags 1.3.2", -] - [[package]] name = "cmake" version = "0.1.51" @@ -397,7 +345,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ - "generic-array 0.14.7", + "generic-array", "typenum", ] @@ -432,22 +380,13 @@ dependencies = [ "byteorder", ] -[[package]] -name = "digest" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3d0c8c8752312f9713efd397ff63acb9f85585afbf179282e720e7704954dd5" -dependencies = [ - "generic-array 0.12.4", -] - [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer 0.10.4", + "block-buffer", "crypto-common", ] @@ -484,10 +423,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] -name = "fake-simd" -version = "0.1.2" +name = "fastwebsockets" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" +checksum = "26da0c7b5cef45c521a6f9cdfffdfeb6c9f5804fbac332deb5ae254634c7a6be" +dependencies = [ + "base64", + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "pin-project", + "rand 0.8.5", + "sha1", + "simdutf8", + "thiserror", + "tokio", + "utf-8", +] [[package]] name = "fltk" @@ -539,15 +492,9 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456" dependencies = [ - "percent-encoding 2.3.1", + "percent-encoding", ] -[[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - [[package]] name = "futures-channel" version = "0.3.30" @@ -599,15 +546,6 @@ dependencies = [ "slab", ] -[[package]] -name = "generic-array" -version = "0.12.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" -dependencies = [ - "typenum", -] - [[package]] name = "generic-array" version = "0.14.7" @@ -615,7 +553,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", - "version_check 0.9.5", + "version_check", ] [[package]] @@ -838,15 +776,15 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.26" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +checksum = "524e8ac6999421f49a846c2d4411f337e53497d8ec55d67753beffa43c5d9205" dependencies = [ + "atomic-waker", "bytes", "fnv", "futures-core", "futures-sink", - "futures-util", "http", "indexmap", "slab", @@ -861,7 +799,7 @@ version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faa67bab9ff362228eb3d00bd024a4965d8231bbb7921167f0cfa66c6626b225" dependencies = [ - "log 0.4.22", + "log", "pest", "pest_derive", "serde", @@ -913,9 +851,9 @@ checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "http" -version = "0.2.12" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", @@ -924,12 +862,24 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.6" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", + "futures-util", "http", + "http-body", "pin-project-lite", ] @@ -947,32 +897,12 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.10.16" +version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a0652d9a2609a968c14be1a9ea00bf4b1d64e2e1f53a1b51b6fff3a6e829273" -dependencies = [ - "base64 0.9.3", - "httparse", - "language-tags", - "log 0.3.9", - "mime", - "num_cpus", - "time", - "traitobject", - "typeable", - "unicase", - "url 1.7.2", -] - -[[package]] -name = "hyper" -version = "0.14.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" dependencies = [ "bytes", "futures-channel", - "futures-core", "futures-util", "h2", "http", @@ -981,22 +911,24 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "smallvec", "tokio", - "tower-service", - "tracing", "want", ] [[package]] -name = "idna" -version = "0.1.5" +name = "hyper-util" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38f09e0f0b1fb55fdee1f17470ad800da77af5186a1a76c026b679358b7e844e" +checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" dependencies = [ - "matches", - "unicode-bidi", - "unicode-normalization", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "pin-project-lite", + "tokio", ] [[package]] @@ -1084,12 +1016,6 @@ dependencies = [ "rayon", ] -[[package]] -name = "language-tags" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a91d884b6667cd606bb5a69aa0c99ba811a115fc68915e7056ec08a46e93199a" - [[package]] name = "lazy_static" version = "1.5.0" @@ -1121,15 +1047,6 @@ dependencies = [ "libc", ] -[[package]] -name = "log" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e19e8d5c34a3e0e2223db8e060f9e8264aeeb5c5fc64a4ee9965c062211c024b" -dependencies = [ - "log 0.4.22", -] - [[package]] name = "log" version = "0.4.22" @@ -1151,27 +1068,12 @@ dependencies = [ "libc", ] -[[package]] -name = "matches" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" - [[package]] name = "memchr" version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "mime" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba626b8a6de5da682e1caa06bdb42a335aee5a84db8e5046a3e8ab17ba0a3ae0" -dependencies = [ - "log 0.3.9", -] - [[package]] name = "miniz_oxide" version = "0.3.7" @@ -1250,7 +1152,7 @@ version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" dependencies = [ - "autocfg 1.3.0", + "autocfg", "num-integer", "num-traits", ] @@ -1261,7 +1163,7 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c000134b5dbf44adc5cb772486d335293351644b801551abe8f75c84cfa4aef" dependencies = [ - "autocfg 1.3.0", + "autocfg", "num-integer", "num-traits", ] @@ -1272,7 +1174,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07" dependencies = [ - "autocfg 1.3.0", + "autocfg", "num-integer", "num-traits", ] @@ -1293,7 +1195,7 @@ version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" dependencies = [ - "autocfg 1.3.0", + "autocfg", ] [[package]] @@ -1330,12 +1232,6 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" -[[package]] -name = "opaque-debug" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" - [[package]] name = "option-operations" version = "0.4.1" @@ -1357,12 +1253,6 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" -[[package]] -name = "percent-encoding" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" - [[package]] name = "percent-encoding" version = "2.3.1" @@ -1414,6 +1304,26 @@ dependencies = [ "sha2", ] +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2 1.0.86", + "quote 1.0.37", + "syn 2.0.77", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -1523,7 +1433,7 @@ dependencies = [ "proc-macro2 1.0.86", "quote 1.0.37", "syn 1.0.109", - "version_check 0.9.5", + "version_check", ] [[package]] @@ -1534,7 +1444,7 @@ checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" dependencies = [ "proc-macro2 1.0.86", "quote 1.0.37", - "version_check 0.9.5", + "version_check", ] [[package]] @@ -1583,25 +1493,6 @@ dependencies = [ "proc-macro2 1.0.86", ] -[[package]] -name = "rand" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" -dependencies = [ - "autocfg 0.1.8", - "libc", - "rand_chacha 0.1.1", - "rand_core 0.4.2", - "rand_hc 0.1.0", - "rand_isaac", - "rand_jitter", - "rand_os", - "rand_pcg", - "rand_xorshift", - "winapi", -] - [[package]] name = "rand" version = "0.7.3" @@ -1612,17 +1503,18 @@ dependencies = [ "libc", "rand_chacha 0.2.2", "rand_core 0.5.1", - "rand_hc 0.2.0", + "rand_hc", ] [[package]] -name = "rand_chacha" -version = "0.1.1" +name = "rand" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ - "autocfg 0.1.8", - "rand_core 0.3.1", + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", ] [[package]] @@ -1636,20 +1528,15 @@ dependencies = [ ] [[package]] -name = "rand_core" +name = "rand_chacha" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ - "rand_core 0.4.2", + "ppv-lite86", + "rand_core 0.6.4", ] -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - [[package]] name = "rand_core" version = "0.5.1" @@ -1660,12 +1547,12 @@ dependencies = [ ] [[package]] -name = "rand_hc" -version = "0.1.0" +name = "rand_core" +version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "rand_core 0.3.1", + "getrandom 0.2.15", ] [[package]] @@ -1677,59 +1564,6 @@ dependencies = [ "rand_core 0.5.1", ] -[[package]] -name = "rand_isaac" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08" -dependencies = [ - "rand_core 0.3.1", -] - -[[package]] -name = "rand_jitter" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b" -dependencies = [ - "libc", - "rand_core 0.4.2", - "winapi", -] - -[[package]] -name = "rand_os" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071" -dependencies = [ - "cloudabi", - "fuchsia-cprng", - "libc", - "rand_core 0.4.2", - "rdrand", - "winapi", -] - -[[package]] -name = "rand_pcg" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44" -dependencies = [ - "autocfg 0.1.8", - "rand_core 0.4.2", -] - -[[package]] -name = "rand_xorshift" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c" -dependencies = [ - "rand_core 0.3.1", -] - [[package]] name = "rayon" version = "1.10.0" @@ -1750,15 +1584,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", -] - [[package]] name = "redox_users" version = "0.4.6" @@ -1782,12 +1607,6 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" -[[package]] -name = "safemem" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef703b7cb59335eae2eb93ceb664c0eb7ea6bf567079d843e09420219668e072" - [[package]] name = "scoped_threadpool" version = "0.1.9" @@ -1842,15 +1661,14 @@ dependencies = [ ] [[package]] -name = "sha-1" -version = "0.8.2" +name = "sha1" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d94d0bede923b3cea61f3f1ff57ff8cdfd77b400fb8f9998949e0cf04163df" +checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ - "block-buffer 0.7.3", - "digest 0.8.1", - "fake-simd", - "opaque-debug", + "cfg-if", + "cpufeatures", + "digest", ] [[package]] @@ -1861,7 +1679,7 @@ checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.7", + "digest", ] [[package]] @@ -1898,13 +1716,19 @@ dependencies = [ "libc", ] +[[package]] +name = "simdutf8" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a" + [[package]] name = "slab" version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" dependencies = [ - "autocfg 1.3.0", + "autocfg", ] [[package]] @@ -2056,17 +1880,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "time" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - [[package]] name = "tinyvec" version = "1.8.0" @@ -2176,12 +1989,6 @@ dependencies = [ "winnow 0.6.18", ] -[[package]] -name = "tower-service" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" - [[package]] name = "tracing" version = "0.1.40" @@ -2239,12 +2046,6 @@ dependencies = [ "tracing-serde", ] -[[package]] -name = "traitobject" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd1f82c56340fdf16f2a953d7bda4f8fdffba13d93b00844c25572110b26079" - [[package]] name = "try-lock" version = "0.2.5" @@ -2257,12 +2058,6 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5be21190ff5d38e8b4a2d3b6a3ae57f612cc39c96e83cedeaf7abc338a8bac4a" -[[package]] -name = "typeable" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1410f6f91f21d1612654e7cc69193b0334f909dcf2c790c4826254fbb86f8887" - [[package]] name = "typenum" version = "1.17.0" @@ -2275,15 +2070,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" -[[package]] -name = "unicase" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f4765f83163b74f957c797ad9253caf97f103fb064d3999aea9568d09fc8a33" -dependencies = [ - "version_check 0.1.5", -] - [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2323,17 +2109,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc72304796d0818e357ead4e000d19c9c174ab23dc11093ac919054d20a6a7fc" -[[package]] -name = "url" -version = "1.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd4e7c0d531266369519a4aa4f399d748bd37043b00bde1e4ff1f60a120b355a" -dependencies = [ - "idna 0.1.5", - "matches", - "percent-encoding 1.0.1", -] - [[package]] name = "url" version = "2.5.2" @@ -2341,10 +2116,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" dependencies = [ "form_urlencoded", - "idna 0.5.0", - "percent-encoding 2.3.1", + "idna", + "percent-encoding", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "valuable" version = "0.1.0" @@ -2357,12 +2138,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "852e951cb7832cb45cb1169900d19760cfa39b82bc0ea9c0e5a14ae88411c98b" -[[package]] -name = "version_check" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd" - [[package]] name = "version_check" version = "0.9.5" @@ -2384,65 +2159,37 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" -[[package]] -name = "websocket" -version = "0.26.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "723abe6b75286edc51d8ecabb38a2353f62a9e9b0588998b59111474f1dcd637" -dependencies = [ - "hyper 0.10.16", - "rand 0.6.5", - "unicase", - "url 1.7.2", - "websocket-base", -] - -[[package]] -name = "websocket-base" -version = "0.26.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49aec794b07318993d1db16156d5a9c750120597a5ee40c6b928d416186cb138" -dependencies = [ - "base64 0.10.1", - "bitflags 1.3.2", - "byteorder", - "rand 0.6.5", - "sha-1", -] - [[package]] name = "weylus" version = "0.11.4" dependencies = [ "autopilot", "bitflags 1.3.2", + "bytes", "cc", "core-foundation 0.9.4", "core-graphics 0.22.3", "dbus", "dirs", + "fastwebsockets", "fltk", "gstreamer", "gstreamer-app", "gstreamer-video", "handlebars", - "hyper 0.14.30", + "http-body-util", + "hyper", + "hyper-util", "image 0.22.5", "image 0.23.14", "num_cpus", - "percent-encoding 2.3.1", + "percent-encoding", "pnet_datalink", "qrcode", "serde", @@ -2453,8 +2200,7 @@ dependencies = [ "toml 0.5.11", "tracing", "tracing-subscriber", - "url 2.5.2", - "websocket", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1c027bee..1dc45eed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,10 +9,14 @@ description = "Use your iPad or Android tablet as graphic tablet." [dependencies] autopilot = { git = "https://github.com/H-M-H/autopilot-rs.git", rev = "63eed09c715bfb665bb23172a3930a528e11691c" } bitflags = "^1.3" +bytes = "1.7.1" dirs = "^4.0" +fastwebsockets = { version = "0.8.0", features = ["upgrade", "unstable-split"] } fltk = { version = "^1", features = ["no-pango"] } handlebars = "^4.1" -hyper = { version = "^0.14", features = ["server", "tcp", "http1", "http2"] } +http-body-util = "0.1.2" +hyper = { version = "^1.4", features = ["server", "http1", "http2"] } +hyper-util = { version = "0.1.8", features = ["tokio"] } image = { version = "^0.23", features = ["png"], default-features = false } image_autopilot = { package = "image", version = "0.22.5", features = [], default-features = false } percent-encoding = "2.1.0" @@ -21,12 +25,11 @@ serde = { version = "^1.0", features = ["derive"] } serde_json = "^1.0" signal-hook = "0.3.17" structopt = { version = "^0.3", features = ["color", "suggestions"], default-features = false } -tokio = { version = "^1", features = ["macros", "rt-multi-thread", "sync"] } +tokio = { version = "^1", features = ["fs", "macros", "rt-multi-thread", "sync"] } toml = "^0.5" tracing = "^0.1" tracing-subscriber = { version = "^0.3", features = ["ansi", "json"], default-features = false } url = "^2.2" -websocket = { version = "=0.26.2", features = ["sync"], default-features = false } [build-dependencies] cc = "^1.0" diff --git a/src/config.rs b/src/config.rs index 52d16c21..34879c39 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,5 @@ -use std::fs; use std::net::IpAddr; +use std::{fs, path::PathBuf}; use serde::{Deserialize, Serialize}; use structopt::StructOpt; @@ -69,16 +69,16 @@ pub struct Config { help = "Use custom template of index.html to be served by Weylus." )] #[serde(skip)] - pub custom_index_html: Option, + pub custom_index_html: Option, #[structopt(long, help = "Use custom access.html to be served by Weylus.")] #[serde(skip)] - pub custom_access_html: Option, + pub custom_access_html: Option, #[structopt(long, help = "Use custom style.css to be served by Weylus.")] #[serde(skip)] - pub custom_style_css: Option, + pub custom_style_css: Option, #[structopt(long, help = "Use custom lib.js to be served by Weylus.")] #[serde(skip)] - pub custom_lib_js: Option, + pub custom_lib_js: Option, #[structopt(long, help = "Print shell completions for given shell.")] #[serde(skip)] diff --git a/src/gui.rs b/src/gui.rs index 84ff3463..328fb5ff 100644 --- a/src/gui.rs +++ b/src/gui.rs @@ -19,7 +19,7 @@ use fltk::{ use pnet_datalink as datalink; use crate::config::{write_config, Config}; -use crate::websocket::Ws2UiMessage; +use crate::web::Web2UiMessage::UInputInaccessible; pub fn run(config: &Config, log_receiver: mpsc::Receiver) { let width = 200; @@ -218,33 +218,29 @@ pub fn run(config: &Config, log_receiver: mpsc::Receiver) { config.try_mediafoundation = check_native_hw_accel.is_checked(); } } - if !weylus.start( - &config, - |_| {}, - |message| match message { - Ws2UiMessage::UInputInaccessible => { - let w = 500; - let h = 300; - let mut pop_up = Window::default() - .with_size(w, h) - .center_screen() - .with_label("Weylus - UInput inaccessible!"); - pop_up.set_xclass("weylus"); - - let buf = TextBuffer::default(); - let mut pop_up_text = TextDisplay::default().with_size(w, h); - pop_up_text.set_buffer(buf); - pop_up_text.wrap_mode(fltk::text::WrapMode::AtBounds, 5); - let mut buf = pop_up_text.buffer().unwrap(); - buf.set_text(std::include_str!("strings/uinput_error.txt")); - - pop_up.end(); - pop_up.make_modal(true); - pop_up.show(); - } - _ => {} - }, - ) { + if !weylus.start(&config, |message| match message { + UInputInaccessible => { + let w = 500; + let h = 300; + let mut pop_up = Window::default() + .with_size(w, h) + .center_screen() + .with_label("Weylus - UInput inaccessible!"); + pop_up.set_xclass("weylus"); + + let buf = TextBuffer::default(); + let mut pop_up_text = TextDisplay::default().with_size(w, h); + pop_up_text.set_buffer(buf); + pop_up_text.wrap_mode(fltk::text::WrapMode::AtBounds, 5); + let mut buf = pop_up_text.buffer().unwrap(); + buf.set_text(std::include_str!("strings/uinput_error.txt")); + + pop_up.end(); + pop_up.make_modal(true); + pop_up.show(); + } + _ => {} + }) { return Ok(()); } is_server_running = true; @@ -349,4 +345,8 @@ pub fn run(config: &Config, log_receiver: mpsc::Receiver) { but_toggle.set_callback(toggle_server); app.run().expect("Failed to run Gui!"); + + // TODO: Remove when https://github.com/fltk-rs/fltk-rs/issues/1480 is fixed + // this is required to drop the callback and do a graceful shutdown of the web server + but_toggle.set_callback(|_| ()); } diff --git a/src/main.rs b/src/main.rs index d8958446..9595858c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -72,15 +72,11 @@ fn main() { if conf.no_gui { let mut weylus = crate::weylus::Weylus::new(); - weylus.start( - &conf, - |_| {}, - |msg| { - if let crate::websocket::Ws2UiMessage::UInputInaccessible = msg { - warn!(std::include_str!("strings/uinput_error.txt")); - } - }, - ); + weylus.start(&conf, |msg| { + if let crate::web::Web2UiMessage::UInputInaccessible = msg { + warn!(std::include_str!("strings/uinput_error.txt")); + } + }); #[cfg(unix)] { let mut signals = Signals::new(TERM_SIGNALS).unwrap(); diff --git a/src/protocol.rs b/src/protocol.rs index d15e130c..d578b169 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -9,6 +9,7 @@ pub struct ClientConfiguration { pub max_width: usize, pub max_height: usize, pub client_name: Option, + pub frame_rate: f64, } #[derive(Serialize, Deserialize, Debug)] @@ -16,10 +17,6 @@ pub enum MessageInbound { PointerEvent(PointerEvent), WheelEvent(WheelEvent), KeyboardEvent(KeyboardEvent), - // request a video frame from the server - // like this the client can partially control the framerate by sending requests at some given - // rate. However, the server may drop a request if encoding is too slow. - TryGetFrame, GetCapturableList, Config(ClientConfiguration), } @@ -151,3 +148,16 @@ pub struct WheelEvent { pub dy: i32, pub timestamp: u64, } + +pub trait WeylusSender { + type Error: std::error::Error; + fn send_message(&mut self, message: MessageOutbound) -> Result<(), Self::Error>; + fn send_video(&mut self, bytes: &[u8]) -> Result<(), Self::Error>; +} + +pub trait WeylusReceiver: Iterator> { + type Error: std::error::Error; + fn recv_message(&mut self) -> Option> { + self.next() + } +} diff --git a/src/video.rs b/src/video.rs index 731812f4..984ded95 100644 --- a/src/video.rs +++ b/src/video.rs @@ -70,7 +70,7 @@ pub struct VideoEncoder { height_in: usize, width_out: usize, height_out: usize, - write_data: Box, + write_data: Box, start_time: Instant, } @@ -80,7 +80,7 @@ impl VideoEncoder { height_in: usize, width_out: usize, height_out: usize, - write_data: impl Fn(&[u8]) + 'static, + mut write_data: impl FnMut(&[u8]) + 'static, options: EncoderOptions, ) -> Result, CError> { let mut video_encoder = Box::new(Self { diff --git a/src/web.rs b/src/web.rs index c343f4ae..5fef7eb7 100644 --- a/src/web.rs +++ b/src/web.rs @@ -1,30 +1,51 @@ +use bytes::Bytes; +use fastwebsockets::upgrade; use handlebars::Handlebars; -use hyper::service::{make_service_fn, service_fn}; -use hyper::{server::conn::AddrStream, Body, Method, Request, Response, Server, StatusCode}; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::body::Incoming; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; use serde::Serialize; use std::collections::HashMap; +use std::convert::Infallible; use std::net::SocketAddr; -use std::sync::mpsc; -use std::sync::mpsc::SendError; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use tokio::sync::mpsc as mpsc_tokio; +use std::time::Duration; +use tokio::net::TcpListener; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, warn}; +use crate::websocket::{weylus_websocket_channel, WeylusClientConfig, WeylusClientHandler}; + +#[derive(Debug)] +pub enum WebStartUpMessage { + Start, + Error, +} + +pub enum Web2UiMessage { + UInputInaccessible, +} + pub const INDEX_HTML: &str = std::include_str!("../www/templates/index.html"); pub const ACCESS_HTML: &str = std::include_str!("../www/static/access_code.html"); pub const STYLE_CSS: &str = std::include_str!("../www/static/style.css"); pub const LIB_JS: &str = std::include_str!("../www/static/lib.js"); #[derive(Serialize)] -struct WebConfig { +struct IndexTemplateContext { access_code: Option, - websocket_port: u16, uinput_enabled: bool, capture_cursor_enabled: bool, log_level: String, } -fn response_from_str(s: &str, content_type: &str) -> Response { +fn response_from_str(s: &str, content_type: &str) -> Response> { Response::builder() .status(StatusCode::OK) .header("content-type", content_type) @@ -32,7 +53,7 @@ fn response_from_str(s: &str, content_type: &str) -> Response { .unwrap() } -fn response_not_found() -> Response { +fn response_not_found() -> Response> { Response::builder() .status(StatusCode::NOT_FOUND) .header("content-type", "text/html; charset=utf-8") @@ -40,13 +61,13 @@ fn response_not_found() -> Response { .unwrap() } -fn response_from_path_or_default( - path: Option<&String>, +async fn response_from_path_or_default( + path: Option<&PathBuf>, default: &str, content_type: &str, -) -> Response { +) -> Response> { match path { - Some(path) => match std::fs::read_to_string(path) { + Some(path) => match tokio::fs::read_to_string(path).await { Ok(s) => response_from_str(&s, content_type), Err(err) => { warn!("Failed to load file: {}", err); @@ -59,14 +80,17 @@ fn response_from_path_or_default( async fn serve( addr: SocketAddr, - req: Request, + mut req: Request, context: Arc>, - _sender: mpsc::Sender, -) -> Result, hyper::Error> { + sender_ui: mpsc::Sender, + num_clients: Arc, + semaphore_websocket_shutdown: Arc, + notify_disconnect: Arc, +) -> Result>, hyper::Error> { debug!("Got request: {:?}", req); let mut authed = false; - if let Some(access_code) = &context.access_code { - if req.method() == Method::GET && req.uri().path() == "/" { + if let Some(access_code) = &context.web_config.access_code { + if req.method() == Method::GET && (req.uri().path() == "/" || req.uri().path() == "/ws") { use url::form_urlencoded; if let Some(query) = req.uri().query() { let params = form_urlencoded::parse(query.as_bytes()) @@ -84,27 +108,27 @@ async fn serve( authed = true; } if req.method() != Method::GET { - return Ok(response_not_found()); + return Ok(response_not_found().map(|r| r.boxed())); } match req.uri().path() { "/" => { if !authed { return Ok(response_from_path_or_default( - context.custom_access_html.as_ref(), + context.web_config.custom_access_html.as_ref(), ACCESS_HTML, "text/html; charset=utf-8", - )); + ) + .await + .map(|r| r.boxed())); } - debug!(address = ?addr, "Client connected."); - let config = WebConfig { - access_code: context.access_code.clone(), - websocket_port: context.ws_port, + let config = IndexTemplateContext { + access_code: context.web_config.access_code.clone(), uinput_enabled: cfg!(target_os = "linux"), capture_cursor_enabled: cfg!(not(target_os = "windows")), log_level: crate::log::get_log_level().to_string(), }; - let html = if let Some(path) = context.custom_index_html.as_ref() { + let html = if let Some(path) = context.web_config.custom_index_html.as_ref() { let mut reg = Handlebars::new(); if let Err(err) = reg.register_template_file("index", path) { warn!("Failed to register template from path: {}", err); @@ -117,63 +141,102 @@ async fn serve( }; match html { - Ok(html) => Ok(response_from_str(&html, "text/html; charset=utf-8")), + Ok(html) => { + Ok(response_from_str(&html, "text/html; charset=utf-8").map(|r| r.boxed())) + } Err(err) => { error!("Failed to render index template: {}", err); - Ok(response_not_found()) + Ok(response_not_found().map(|r| r.boxed())) } } } + "/ws" => { + if !authed { + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body("unauthorized".to_string().boxed()) + .unwrap()); + } + + let (response, fut) = upgrade::upgrade(&mut req).unwrap(); + num_clients.fetch_add(1, Ordering::Relaxed); + + let config = context.weylus_client_config.clone(); + tokio::spawn(async move { + match fut.await { + Ok(ws) => { + let (sender, receiver) = + weylus_websocket_channel(ws, semaphore_websocket_shutdown); + std::thread::spawn(move || { + let client = WeylusClientHandler::new( + sender, + receiver, + || { + if let Err(err) = + sender_ui.blocking_send(Web2UiMessage::UInputInaccessible) + { + warn!( + "Failed to send message 'UInputInaccessible': {err}." + ); + } + }, + config, + ); + client.run(); + num_clients.fetch_sub(1, Ordering::Relaxed); + notify_disconnect.notify_waiters(); + }); + } + Err(err) => { + eprintln!("Error in websocket connection: {}", err); + num_clients.fetch_sub(1, Ordering::Relaxed); + notify_disconnect.notify_waiters(); + } + } + }); + + Ok(response.map(|r| r.boxed())) + } "/style.css" => Ok(response_from_path_or_default( - context.custom_style_css.as_ref(), + context.web_config.custom_style_css.as_ref(), STYLE_CSS, "text/css; charset=utf-8", - )), + ) + .await + .map(|r| r.boxed())), "/lib.js" => Ok(response_from_path_or_default( - context.custom_lib_js.as_ref(), + context.web_config.custom_lib_js.as_ref(), LIB_JS, "text/javascript; charset=utf-8", - )), - _ => Ok(response_not_found()), + ) + .await + .map(|r| r.boxed())), + _ => Ok(response_not_found().map(|r| r.boxed())), } } -#[derive(Debug)] -pub enum Ui2WebMessage { - Shutdown, -} -pub enum Web2UiMessage { - Start, - Error(String), -} - -fn log_send_error(res: Result<(), SendError>) { - if let Err(err) = res { - warn!("Webserver: Failed to send message to gui: {}", err); - } +#[derive(Clone)] +pub struct WebServerConfig { + pub bind_addr: SocketAddr, + pub access_code: Option, + pub custom_index_html: Option, + pub custom_access_html: Option, + pub custom_style_css: Option, + pub custom_lib_js: Option, } struct Context<'a> { - bind_addr: SocketAddr, - ws_port: u16, - access_code: Option, - custom_index_html: Option, - custom_access_html: Option, - custom_style_css: Option, - custom_lib_js: Option, + web_config: WebServerConfig, + weylus_client_config: WeylusClientConfig, templates: Handlebars<'a>, } pub fn run( - sender: mpsc::Sender, - receiver: mpsc_tokio::Receiver, - bind_addr: SocketAddr, - ws_port: u16, - access_code: Option, - custom_index_html: Option, - custom_access_html: Option, - custom_style_css: Option, - custom_lib_js: Option, + sender_ui: tokio::sync::mpsc::Sender, + sender_startup: oneshot::Sender, + notify_shutdown: Arc, + web_server_config: WebServerConfig, + weylus_client_config: WeylusClientConfig, ) -> std::thread::JoinHandle<()> { let mut templates = Handlebars::new(); templates @@ -181,61 +244,122 @@ pub fn run( .unwrap(); let context = Context { - bind_addr, - ws_port, - access_code, - custom_index_html, - custom_access_html, - custom_style_css, - custom_lib_js, + web_config: web_server_config, + weylus_client_config, templates, }; - std::thread::spawn(move || run_server(context, sender, receiver)) + std::thread::spawn(move || run_server(context, sender_ui, sender_startup, notify_shutdown)) } #[tokio::main] async fn run_server( context: Context<'static>, - sender: mpsc::Sender, - mut receiver: mpsc_tokio::Receiver, + sender_ui: tokio::sync::mpsc::Sender, + sender_startup: oneshot::Sender, + notify_shutdown: Arc, ) { - let addr = context.bind_addr; - let context = Arc::new(context); + let addr = context.web_config.bind_addr; - let sender = sender.clone(); - let sender2 = sender.clone(); - let service = make_service_fn(move |s: &AddrStream| { - let addr = s.remote_addr(); - let context = context.clone(); - let sender = sender.clone(); - async move { - Ok::<_, hyper::Error>(service_fn(move |req| { - let context = context.clone(); - serve(addr, req, context, sender.clone()) - })) - } - }); - let server = match Server::try_bind(&addr) { - Ok(builder) => builder.serve(service), + let listener = match TcpListener::bind(addr).await { + Ok(listener) => listener, Err(err) => { - log_send_error(sender2.send(Web2UiMessage::Error(format!( - "Failed to start webserver: {}", - err - )))); + error!("Failed to bind to socket: {err}."); + sender_startup.send(WebStartUpMessage::Error).unwrap(); return; } }; - let server = server.with_graceful_shutdown(async move { - loop { - match receiver.recv().await { - Some(Ui2WebMessage::Shutdown) => break, - None => break, + + sender_startup.send(WebStartUpMessage::Start).unwrap(); + + let context = Arc::new(context); + + let broadcast_shutdown = Arc::new(tokio::sync::Notify::new()); + + let num_clients = Arc::new(AtomicUsize::new(0)); + let notify_disconnect = Arc::new(tokio::sync::Notify::new()); + let semaphore_websocket_shutdown = Arc::new(tokio::sync::Semaphore::new(0)); + + loop { + let (tcp, remote_address) = tokio::select! { + res = listener.accept() => { + match res { + Ok(conn) => conn, + Err(err) => { + warn!("Connection failed: {err}."); + continue; + } + } + }, + _ = notify_shutdown.notified() => { + info!("Webserver is shutting down."); + broadcast_shutdown.notify_waiters(); + break; } + }; + + debug!(address = ?remote_address, "Client connected."); + + let io = TokioIo::new(tcp); + + let sender_ui = sender_ui.clone(); + let broadcast_shutdown = broadcast_shutdown.clone(); + let context = context.clone(); + let num_clients = num_clients.clone(); + let semaphore_websocket_shutdown = semaphore_websocket_shutdown.clone(); + let notify_disconnect = notify_disconnect.clone(); + + tokio::task::spawn(async move { + let conn = http1::Builder::new().serve_connection( + io, + service_fn({ + move |req| { + let context = context.clone(); + let num_clients = num_clients.clone(); + let semaphore_websocket_shutdown = semaphore_websocket_shutdown.clone(); + let notify_disconnect = notify_disconnect.clone(); + serve( + remote_address, + req, + context, + sender_ui.clone(), + num_clients, + semaphore_websocket_shutdown, + notify_disconnect, + ) + } + }), + ); + + let conn = conn.with_upgrades(); + + tokio::select! { + conn = conn => match conn { + Ok(_) => (), + Err(err) => { + warn!("Error polling connection ({remote_address}): {err}.") + } + }, + _ = broadcast_shutdown.notified() => { + info!("Closing connection to: {remote_address}."); + } + } + }); + } + + semaphore_websocket_shutdown.add_permits(num_clients.load(Ordering::Relaxed)); + + loop { + let remaining_clients = num_clients.load(Ordering::Relaxed); + if remaining_clients == 0 { + break; + } else { + debug!("Waiting for remaining clients ({remaining_clients}) to disconnect."); } - }); - info!("Webserver listening at {}...", addr); - log_send_error(sender2.send(Web2UiMessage::Start)); - if let Err(err) = server.await { - error!("Webserver exited error: {}", err) - }; + tokio::select! { + _ = notify_disconnect.notified() => (), + _ = tokio::time::sleep(Duration::from_secs(1)) => { + semaphore_websocket_shutdown.add_permits(num_clients.load(Ordering::Relaxed)); + }, + } + } } diff --git a/src/websocket.rs b/src/websocket.rs index 3fd7e1d7..c4060ab1 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,405 +1,141 @@ -use std::collections::HashMap; -use std::net::{SocketAddr, TcpStream}; -use std::sync::mpsc::{SendError, TryRecvError}; -use std::sync::{mpsc, Arc, Mutex}; -use std::thread::spawn; -use tracing::{debug, error, info, warn}; - -use websocket::sender::Writer; -use websocket::server::upgrade::{sync::Buffer as WsBuffer, WsUpgrade}; -use websocket::sync::Server; -use websocket::{Message, OwnedMessage, WebSocketError}; +use fastwebsockets::{FragmentCollectorRead, Frame, OpCode, WebSocket, WebSocketError}; +use hyper::upgrade::Upgraded; +use hyper_util::rt::TokioIo; +use std::convert::Infallible; +use std::sync::mpsc::{RecvTimeoutError, SendError}; +use std::sync::{mpsc, Arc}; +use std::thread::{spawn, JoinHandle}; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc::channel; +use tracing::{debug, error, warn}; use crate::capturable::{get_capturables, Capturable, Recorder}; use crate::input::device::{InputDevice, InputDeviceType}; use crate::protocol::{ - ClientConfiguration, KeyboardEvent, MessageInbound, MessageOutbound, PointerEvent, WheelEvent, + ClientConfiguration, KeyboardEvent, MessageInbound, MessageOutbound, PointerEvent, + WeylusReceiver, WeylusSender, WheelEvent, }; use crate::cerror::CErrorCode; use crate::video::{EncoderOptions, VideoEncoder}; -type WsWriter = Arc>>; -type WsClients = Arc>>; - -pub enum Ws2UiMessage { - Start, - UInputInaccessible, - Error(String), -} - -pub enum Ui2WsMessage { - Shutdown, -} - -#[derive(Clone)] -pub struct WsConfig { - pub address: SocketAddr, - pub access_code: Option, - pub encoder_options: EncoderOptions, - #[cfg(target_os = "linux")] - pub wayland_support: bool, -} - fn log_send_error(res: Result<(), SendError>) { if let Err(err) = res { warn!("Websocket: Failed to send message to ui: {}", err); } } -pub fn run( - sender: mpsc::Sender, - receiver: mpsc::Receiver, - config: WsConfig, -) -> std::thread::JoinHandle<()> { - spawn(move || { - let clients: WsClients = Arc::new(Mutex::new(HashMap::new())); - - let mut server = match Server::bind(config.address) { - Ok(s) => s, - Err(e) => { - log_send_error(sender.send(Ws2UiMessage::Error(format!( - "Failed binding to socket: {}", - e - )))); - return; - } - }; - - if let Err(err) = server.set_nonblocking(true) { - warn!( - "Could not set websocket to non-blocking, graceful shutdown may be impossible now: {}", - err - ); - } - - log_send_error(sender.send(Ws2UiMessage::Start)); - - loop { - std::thread::sleep(std::time::Duration::from_millis(10)); - - match receiver.try_recv() { - Err(TryRecvError::Disconnected) | Ok(Ui2WsMessage::Shutdown) => { - let clients = clients.lock().unwrap(); - for client in clients.values() { - let client = client.lock().unwrap(); - if let Err(err) = client.shutdown_all() { - error!("Could not shutdown websocket client: {}", err); - } - } - info!("Shutting down websocket: {}", config.address); - return; - } - _ => {} - } - match server.accept() { - Ok(request) => { - let clients = clients.clone(); - let config = config.clone(); - let sender = sender.clone(); - spawn(move || handle_connection(request, clients, config, sender)); - } - _ => {} - }; - } - }) -} - -fn handle_connection( - request: WsUpgrade>, - clients: WsClients, - config: WsConfig, - gui_sender: mpsc::Sender, -) { - let client = match request.accept() { - Ok(c) => c, - Err((_, err)) => { - warn!("Failed to accept client: {}", err); - return; - } - }; - - if let Err(err) = client.set_nonblocking(false) { - warn!("Failed to set client to blocking mode: {}", err); - } - - let peer_addr = match client.peer_addr() { - Ok(p) => p, - Err(err) => { - warn!("Failed to retrieve client address: {}", err); - return; - } - }; - - let (mut ws_receiver, ws_sender) = match client.split() { - Ok(s) => s, - Err(err) => { - warn!("Failed to setup connection: {}", err); - return; - } - }; - - let ws_sender = Arc::new(Mutex::new(ws_sender)); - - { - let mut clients = clients.lock().unwrap(); - clients.insert(peer_addr, ws_sender.clone()); - info!(address = ?peer_addr, "Client connected."); - } - - let mut ws_handler = WsHandler::new(ws_sender, config.clone(), gui_sender, peer_addr); - - let mut authed = config.access_code.is_none(); - let access_code = config.access_code.unwrap_or_else(|| "".into()); - for msg in ws_receiver.incoming_messages() { - match msg { - Ok(msg) => { - if !authed { - if let OwnedMessage::Text(pw) = &msg { - if pw == &access_code { - authed = true; - info!(address = ?peer_addr, "WS-Client authenticated!"); - } else { - warn!( - address = ?peer_addr, - access_code = %pw, - "Authentication failed, wrong access code", - ); - let mut clients = clients.lock().unwrap(); - clients.remove(&peer_addr); - return; - } - } - } else { - ws_handler.process(&msg); - } - if msg.is_close() { - let mut clients = clients.lock().unwrap(); - info!(address = ?peer_addr, "Client disconnected."); - clients.remove(&peer_addr); - return; - } - } - Err(err) => { - match err { - // this happens on calling shutdown, no need to log this - WebSocketError::NoDataAvailable => (), - _ => warn!("Error reading message from websocket, closing ({})", err), - } - - let mut clients = clients.lock().unwrap(); - clients.remove(&peer_addr); - return; - } - } - } -} - -fn send_msg(sender: &WsWriter, msg: &MessageOutbound) { - if let Err(err) = sender - .lock() - .unwrap() - .send_message(&Message::text(serde_json::to_string(msg).unwrap())) - { - warn!("Failed to send message to websocket: {}", err); - } -} - struct VideoConfig { capturable: Box, capture_cursor: bool, max_width: usize, max_height: usize, + frame_rate: f64, } enum VideoCommands { Start(VideoConfig), - TryGetFrame, } -fn handle_video(receiver: mpsc::Receiver, sender: WsWriter, config: WsConfig) { - let mut recorder: Option> = None; - let mut video_encoder: Option> = None; - - let mut max_width = 1920; - let mut max_height = 1080; - - loop { - // stop thread once the channel is closed - let mut msg = match receiver.recv() { - Ok(m) => m, - Err(_) => return, - }; - - // drop frames if the client is requesting frames at a higher rate than they can be - // produced here - if let VideoCommands::TryGetFrame = msg { - loop { - match receiver.try_recv() { - Err(mpsc::TryRecvError::Empty) => break, - Err(mpsc::TryRecvError::Disconnected) => return, - Ok(VideoCommands::TryGetFrame) => continue, - Ok(tmp_msg) => { - msg = tmp_msg; - break; - } - } - } - } - match msg { - VideoCommands::TryGetFrame => { - if recorder.is_none() { - warn!("Screen capture not initalized, can not send video frame!"); - continue; - } - let pixel_data = recorder.as_mut().unwrap().capture(); - if let Err(err) = pixel_data { - warn!("Error capturing screen: {}", err); - continue; - } - let pixel_data = pixel_data.unwrap(); - let (width_in, height_in) = pixel_data.size(); - let scale = - (max_width as f64 / width_in as f64).min(max_height as f64 / height_in as f64); - // limit video to 4K - let scale_max = (3840.0 / width_in as f64).min(2160.0 / height_in as f64); - let scale = scale.min(scale_max); - let mut width_out = width_in; - let mut height_out = height_in; - if scale < 1.0 { - width_out = (width_out as f64 * scale) as usize; - height_out = (height_out as f64 * scale) as usize; - } - // video encoder is not setup or setup for encoding the wrong size: restart it - if video_encoder.is_none() - || !video_encoder - .as_ref() - .unwrap() - .check_size(width_in, height_in, width_out, height_out) - { - send_msg(&sender, &MessageOutbound::NewVideo); - let sender = sender.clone(); - let res = VideoEncoder::new( - width_in, - height_in, - width_out, - height_out, - move |data| { - let msg = Message::binary(data); - if let Err(err) = sender.lock().unwrap().send_message(&msg) { - match err { - WebSocketError::IoError(err) => { - // ignore broken pipe errors as those are caused by - // intentionally shutting down the websocket - if err.kind() == std::io::ErrorKind::BrokenPipe { - debug!("Error sending video: {}", err); - } else { - warn!("Error sending video: {}", err); - } - } - _ => warn!("Error sending video: {}", err), - } - } - }, - config.encoder_options, - ); - match res { - Ok(r) => video_encoder = Some(r), - Err(e) => { - warn!("{}", e); - continue; - } - }; - } - let video_encoder = video_encoder.as_mut().unwrap(); - video_encoder.encode(pixel_data); - } - VideoCommands::Start(config) => { - #[allow(unused_assignments)] - { - // gstpipewire can not handle setting a pipeline's state to Null after another - // pipeline has been created and its state has been set to Play. - // This line makes sure that there always is only a single recorder and thus - // single pipeline in this thread by forcing rust to call the destructor of the - // current pipeline here, right before creating a new pipeline. - // See: https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/986 - // - // This shouldn't affect other Recorder trait objects. - recorder = None; - } - match config.capturable.recorder(config.capture_cursor) { - Ok(r) => { - recorder = Some(r); - max_width = config.max_width; - max_height = config.max_height; - send_msg(&sender, &MessageOutbound::ConfigOk); - } - Err(err) => { - warn!("Failed to init screen cast: {}!", err); - send_msg( - &sender, - &MessageOutbound::Error("Failed to init screen cast!".into()), - ) - } - } - } - } +fn send_message(sender: &mut S, message: MessageOutbound) +where + S: WeylusSender, +{ + if let Err(err) = sender.send_message(message) { + warn!("Failed to send message to client: {err}"); } } -struct WsHandler { - sender: WsWriter, +pub struct WeylusClientHandler { + sender: S, + receiver: Option, video_sender: mpsc::Sender, input_device: Option>, capturables: Vec>, - gui_sender: mpsc::Sender, - ws_config: WsConfig, + on_uinput_inaccessible: FnUInput, + config: WeylusClientConfig, #[cfg(target_os = "linux")] capture_cursor: bool, client_name: Option, - client_address: SocketAddr, + video_thread: JoinHandle<()>, +} + +#[derive(Clone, Copy)] +pub struct WeylusClientConfig { + pub encoder_options: EncoderOptions, + #[cfg(target_os = "linux")] + pub wayland_support: bool, } -impl WsHandler { - fn new( - sender: WsWriter, - config: WsConfig, - gui_sender: mpsc::Sender, - client_address: SocketAddr, - ) -> Self { +impl WeylusClientHandler { + pub fn new( + sender: S, + receiver: R, + on_uinput_inaccessible: FnUInput, + config: WeylusClientConfig, + ) -> Self + where + R: WeylusReceiver, + S: WeylusSender + Clone + Send + Sync + 'static, + { let (video_sender, video_receiver) = mpsc::channel::(); - { + let video_thread = { let sender = sender.clone(); - let config = config.clone(); // offload creating the videostream to another thread to avoid blocking the thread that // is receiving messages from the websocket - spawn(move || handle_video(video_receiver, sender, config)); - } + spawn(move || handle_video(video_receiver, sender, config.encoder_options)) + }; Self { sender, + receiver: Some(receiver), video_sender, input_device: None, capturables: vec![], - gui_sender, - ws_config: config, + on_uinput_inaccessible, + config, #[cfg(target_os = "linux")] capture_cursor: false, client_name: None, - client_address, + video_thread, } } - fn send_msg(&self, msg: &MessageOutbound) { - send_msg(&self.sender, msg) + pub fn run(mut self) + where + R: WeylusReceiver, + S: WeylusSender + Clone + Send + Sync + 'static, + FnUInput: Fn(), + { + for message in self.receiver.take().unwrap() { + match message { + Ok(message) => match message { + MessageInbound::PointerEvent(event) => self.process_pointer_event(&event), + MessageInbound::WheelEvent(event) => self.process_wheel_event(&event), + MessageInbound::KeyboardEvent(event) => self.process_keyboard_event(&event), + MessageInbound::GetCapturableList => self.send_capturable_list(), + MessageInbound::Config(config) => self.update_config(config), + }, + Err(err) => { + warn!("Failed to read message {err}!"); + self.send_message(MessageOutbound::Error( + "Failed to read message!".to_string(), + )); + } + } + } + + drop(self.video_sender); + if let Err(err) = self.video_thread.join() { + warn!("Failed to join video thread: {err:?}"); + } } - // Enqueue a request to send a new video frame. - // - // This does not do any further work in order not to block receiving messages. `handle_video` - // is resposible to do the actual work. - fn queue_try_send_video_frame(&mut self) { - self.video_sender.send(VideoCommands::TryGetFrame).unwrap(); + fn send_message(&mut self, message: MessageOutbound) + where + S: WeylusSender, + { + send_message(&mut self.sender, message) } fn process_wheel_event(&mut self, event: &WheelEvent) { @@ -431,21 +167,28 @@ impl WsHandler { } } - fn send_capturable_list(&mut self) { + fn send_capturable_list(&mut self) + where + S: WeylusSender, + { let mut windows = Vec::::new(); self.capturables = get_capturables( #[cfg(target_os = "linux")] - self.ws_config.wayland_support, + self.config.wayland_support, #[cfg(target_os = "linux")] self.capture_cursor, ); self.capturables.iter().for_each(|c| { windows.push(c.name()); }); - self.send_msg(&MessageOutbound::CapturableList(windows)); + self.send_message(MessageOutbound::CapturableList(windows)); } - fn setup(&mut self, config: ClientConfiguration) { + fn update_config(&mut self, config: ClientConfiguration) + where + S: WeylusSender, + FnUInput: Fn(), + { let client_name_changed = if self.client_name != config.client_name { self.client_name = config.client_name; true @@ -474,13 +217,9 @@ impl WsHandler { Err(e) => { error!("Failed to create uinput device: {}", e); if let CErrorCode::UInputNotAccessible = e.to_enum() { - if let Err(err) = - self.gui_sender.send(Ws2UiMessage::UInputInaccessible) - { - warn!("Failed to send message to gui thread: {}!", err); - } + (self.on_uinput_inaccessible)(); } - self.send_msg(&MessageOutbound::ConfigError( + self.send_message(MessageOutbound::ConfigError( "Failed to create uinput device!".to_string(), )); return; @@ -516,54 +255,279 @@ impl WsHandler { capture_cursor: config.capture_cursor, max_width: config.max_width, max_height: config.max_height, + frame_rate: config.frame_rate, })) .unwrap(); } else { error!("Got invalid id for capturable: {}", config.capturable_id); - self.send_msg(&MessageOutbound::ConfigError( + self.send_message(MessageOutbound::ConfigError( "Invalid id for capturable!".to_string(), )); } } +} - fn process(&mut self, message: &OwnedMessage) { - match message { - OwnedMessage::Text(s) => { - let message: Result = serde_json::from_str(s); - match message { - Ok(message) => { - if let MessageInbound::TryGetFrame = message { - } else { - debug!( - client_message = %s, - address = ?self.client_address, - "Got message from client." - ); - } - match message { - MessageInbound::WheelEvent(event) => { - self.process_wheel_event(&event); - } - MessageInbound::PointerEvent(event) => { - self.process_pointer_event(&event); +fn handle_video( + receiver: mpsc::Receiver, + mut sender: S, + encoder_options: EncoderOptions, +) { + const EFFECTIVE_INIFINIT: Duration = Duration::from_secs(3600 * 24 * 365 * 200); + + let mut recorder: Option> = None; + let mut video_encoder: Option> = None; + + let mut max_width = 1920; + let mut max_height = 1080; + let mut frame_duration = EFFECTIVE_INIFINIT; + let mut last_frame = Instant::now(); + + loop { + let now = Instant::now(); + let elapsed = now - last_frame; + let frames_passed = (elapsed.as_secs_f64() / frame_duration.as_secs_f64()) as u32; + let next_frame = last_frame + (frames_passed + 1) * frame_duration; + let timeout = next_frame - now; + last_frame = next_frame; + + if frames_passed > 0 { + debug!("Dropped {frames_passed} frame(s)!"); + } + + match receiver.recv_timeout(timeout) { + Ok(VideoCommands::Start(config)) => { + #[allow(unused_assignments)] + { + // gstpipewire can not handle setting a pipeline's state to Null after another + // pipeline has been created and its state has been set to Play. + // This line makes sure that there always is only a single recorder and thus + // single pipeline in this thread by forcing rust to call the destructor of the + // current pipeline here, right before creating a new pipeline. + // See: https://gitlab.freedesktop.org/pipewire/pipewire/-/issues/986 + // + // This shouldn't affect other Recorder trait objects. + recorder = None; + } + match config.capturable.recorder(config.capture_cursor) { + Ok(r) => { + recorder = Some(r); + max_width = config.max_width; + max_height = config.max_height; + send_message(&mut sender, MessageOutbound::ConfigOk); + } + Err(err) => { + warn!("Failed to init screen cast: {}!", err); + send_message( + &mut sender, + MessageOutbound::Error("Failed to init screen cast!".into()), + ) + } + } + last_frame = Instant::now(); + + // The Duration type can not handle infinity, if the frame rate is set to 0 we just + // set the duration between two frames to a very long one, which is effectively + // infinity. + let d = 1.0 / config.frame_rate; + frame_duration = if d.is_finite() { + Duration::from_secs_f64(d) + } else { + EFFECTIVE_INIFINIT + }; + frame_duration = frame_duration.min(EFFECTIVE_INIFINIT); + } + Err(RecvTimeoutError::Timeout) => { + if recorder.is_none() { + warn!("Screen capture not initalized, can not send video frame!"); + continue; + } + let pixel_data = recorder.as_mut().unwrap().capture(); + if let Err(err) = pixel_data { + warn!("Error capturing screen: {}", err); + continue; + } + let pixel_data = pixel_data.unwrap(); + let (width_in, height_in) = pixel_data.size(); + let scale = + (max_width as f64 / width_in as f64).min(max_height as f64 / height_in as f64); + // limit video to 4K + let scale_max = (3840.0 / width_in as f64).min(2160.0 / height_in as f64); + let scale = scale.min(scale_max); + let mut width_out = width_in; + let mut height_out = height_in; + if scale < 1.0 { + width_out = (width_out as f64 * scale) as usize; + height_out = (height_out as f64 * scale) as usize; + } + // video encoder is not setup or setup for encoding the wrong size: restart it + if video_encoder.is_none() + || !video_encoder + .as_ref() + .unwrap() + .check_size(width_in, height_in, width_out, height_out) + { + send_message(&mut sender, MessageOutbound::NewVideo); + let mut sender = sender.clone(); + let res = VideoEncoder::new( + width_in, + height_in, + width_out, + height_out, + move |data| { + if let Err(err) = sender.send_video(data) { + warn!("Failed to send video frame: {err}!"); } - MessageInbound::KeyboardEvent(event) => { - self.process_keyboard_event(&event); + }, + encoder_options, + ); + match res { + Ok(r) => video_encoder = Some(r), + Err(e) => { + warn!("{}", e); + continue; + } + }; + } + let video_encoder = video_encoder.as_mut().unwrap(); + video_encoder.encode(pixel_data); + } + // stop thread once the channel is closed + Err(RecvTimeoutError::Disconnected) => return, + }; + } +} + +pub struct WsWeylusReceiver { + recv: tokio::sync::mpsc::Receiver, +} + +impl Iterator for WsWeylusReceiver { + type Item = Result; + + fn next(&mut self) -> Option { + self.recv.blocking_recv().map(Ok) + } +} + +impl WeylusReceiver for WsWeylusReceiver { + type Error = Infallible; +} + +pub enum WsMessage { + Frame(Frame<'static>), + Video(Vec), + MessageOutbound(MessageOutbound), +} + +unsafe impl Send for WsMessage {} + +#[derive(Clone)] +pub struct WsWeylusSender { + sender: tokio::sync::mpsc::Sender, +} + +impl WeylusSender for WsWeylusSender { + type Error = tokio::sync::mpsc::error::SendError; + + fn send_message(&mut self, message: MessageOutbound) -> Result<(), Self::Error> { + self.sender + .blocking_send(WsMessage::MessageOutbound(message)) + } + + fn send_video(&mut self, bytes: &[u8]) -> Result<(), Self::Error> { + self.sender.blocking_send(WsMessage::Video(bytes.to_vec())) + } +} + +pub fn weylus_websocket_channel( + websocket: WebSocket>, + semaphore_shutdown: Arc, +) -> (WsWeylusSender, WsWeylusReceiver) { + let (rx, mut tx) = websocket.split(|ws| tokio::io::split(ws)); + + let mut rx = FragmentCollectorRead::new(rx); + + let (sender_inbound, receiver_inbound) = channel::(32); + let (sender_outbound, mut receiver_outbound) = channel::(32); + + { + let sender_outbound = sender_outbound.clone(); + tokio::spawn(async move { + let mut send_fn = |frame| async { + if let Err(err) = sender_outbound.send(WsMessage::Frame(frame)).await { + warn!("Failed to send websocket frame while receiving fragmented frame: {err}.") + }; + Ok(()) + }; + + loop { + let fut = rx.read_frame::<_, WebSocketError>(&mut send_fn); + + let frame = tokio::select! { + _ = semaphore_shutdown.acquire() => break, + frame = fut => frame.unwrap(), + }; + match frame.opcode { + OpCode::Close => break, + OpCode::Text => match serde_json::from_slice(&frame.payload) { + Ok(msg) => { + if let Err(err) = sender_inbound.send(msg).await { + warn!("Failed to forward inbound message to WeylusClientHandler: {err}."); } - MessageInbound::TryGetFrame => self.queue_try_send_video_frame(), - MessageInbound::GetCapturableList => self.send_capturable_list(), - MessageInbound::Config(config) => self.setup(config), } + Err(err) => warn!("Failed to parse message: {err}"), + }, + _ => {} + } + } + }); + } + + tokio::spawn(async move { + loop { + let msg = if let Some(msg) = receiver_outbound.recv().await { + msg + } else { + break; + }; + + match msg { + WsMessage::Frame(frame) => { + if let Err(err) = tx.write_frame(frame).await { + if let WebSocketError::ConnectionClosed = err { + break; + } + warn!("Failed to send frame: {err}"); } - Err(err) => { - warn!("Unable to parse message: {} ({})", s, err); - self.send_msg(&MessageOutbound::Error( - "Failed to parse message!".to_string(), - )); + } + WsMessage::Video(data) => { + if let Err(err) = tx.write_frame(Frame::binary(data.into())).await { + if let WebSocketError::ConnectionClosed = err { + break; + } + warn!("Failed to send video frame: {err}"); + } + } + WsMessage::MessageOutbound(msg) => { + let json_string = serde_json::to_string(&msg).unwrap(); + let data = json_string.as_bytes(); + if let Err(err) = tx.write_frame(Frame::text(data.into())).await { + if let WebSocketError::ConnectionClosed = err { + break; + } + warn!("Failed to send outbound message: {err}"); } } } - _ => (), } - } + }); + + ( + WsWeylusSender { + sender: sender_outbound, + }, + WsWeylusReceiver { + recv: receiver_inbound, + }, + ) } diff --git a/src/weylus.rs b/src/weylus.rs index d81acf35..96125ab6 100644 --- a/src/weylus.rs +++ b/src/weylus.rs @@ -1,29 +1,21 @@ use std::net::SocketAddr; -use std::sync::mpsc; -use tokio::sync::mpsc as mpsc_tokio; -use tracing::{error, warn}; +use std::sync::{mpsc, Arc}; +use tracing::error; use crate::config::Config; use crate::video::EncoderOptions; -use crate::web::{Ui2WebMessage, Web2UiMessage}; -use crate::websocket::{Ui2WsMessage, Ws2UiMessage, WsConfig}; - -struct Channels { - sender_ui2ws: mpsc::Sender, - sender_ui2web: mpsc_tokio::Sender, -} +use crate::web::{Web2UiMessage, WebServerConfig, WebStartUpMessage}; +use crate::websocket::WeylusClientConfig; pub struct Weylus { - channels: Option, - ws_thread: Option>, + notify_shutdown: Arc, web_thread: Option>, } impl Weylus { pub fn new() -> Self { Self { - channels: None, - ws_thread: None, + notify_shutdown: Arc::new(tokio::sync::Notify::new()), web_thread: None, } } @@ -32,11 +24,7 @@ impl Weylus { &mut self, config: &Config, mut on_web_message: impl FnMut(Web2UiMessage) + Send + 'static, - mut on_ws_message: impl FnMut(Ws2UiMessage) + Send + 'static, ) -> bool { - if self.channels.is_some() { - return false; - } let encoder_options = EncoderOptions { #[cfg(target_os = "linux")] try_vaapi: config.try_vaapi, @@ -59,68 +47,34 @@ impl Weylus { try_mediafoundation: false, }; - let ws_config = WsConfig { - address: SocketAddr::new(config.bind_address, config.websocket_port), - access_code: config.access_code.clone(), - encoder_options, - #[cfg(target_os = "linux")] - wayland_support: config.wayland_support, - }; - - let (sender_ui2ws, receiver_ui2ws) = mpsc::channel(); - let (sender_ui2web, receiver_ui2web) = mpsc_tokio::channel(100); - - let (sender_ws2ui, receiver_ws2ui) = mpsc::channel(); - let (sender_web2ui, receiver_web2ui) = mpsc::channel(); - - let ws_thread = crate::websocket::run(sender_ws2ui, receiver_ui2ws, ws_config); - match receiver_ws2ui.recv() { - Ok(Ws2UiMessage::Start) => {} - Ok(Ws2UiMessage::Error(err)) => { - error!("Failed to start websocket server: {}", err); - if ws_thread.join().is_err() { - error!("Websocketserver thread panicked."); - } - return false; - } - Ok(Ws2UiMessage::UInputInaccessible) => unreachable!(), - Err(err) => { - error!("Error communicating with websocketserver thread: {}", err); - if ws_thread.join().is_err() { - error!("Websocketserver thread panicked."); - } - return false; - } - } + let (sender_ui, mut receiver_ui) = tokio::sync::mpsc::channel(100); + let (sender_startup, receiver_startup) = tokio::sync::oneshot::channel(); let web_thread = crate::web::run( - sender_web2ui, - receiver_ui2web, - SocketAddr::new(config.bind_address, config.web_port), - config.websocket_port, - config.access_code.clone(), - config.custom_index_html.clone(), - config.custom_access_html.clone(), - config.custom_style_css.clone(), - config.custom_lib_js.clone(), + sender_ui, + sender_startup, + self.notify_shutdown.clone(), + WebServerConfig { + bind_addr: SocketAddr::new(config.bind_address, config.web_port), + access_code: config.access_code.clone(), + custom_index_html: config.custom_index_html.clone(), + custom_access_html: config.custom_access_html.clone(), + custom_style_css: config.custom_style_css.clone(), + custom_lib_js: config.custom_lib_js.clone(), + }, + WeylusClientConfig { + encoder_options, + #[cfg(target_os = "linux")] + wayland_support: config.wayland_support, + }, ); - match receiver_web2ui.recv() { - Ok(Web2UiMessage::Start) => (), - Ok(Web2UiMessage::Error(err)) => { - error!("Webserver error: {}", err); + + match receiver_startup.blocking_recv() { + Ok(WebStartUpMessage::Start) => (), + Ok(WebStartUpMessage::Error) => { if web_thread.join().is_err() { error!("Webserver thread panicked."); } - - if let Err(err) = sender_ui2ws.send(Ui2WsMessage::Shutdown) { - warn!( - "Failed to send shutdown command to websocketserver: {}", - err - ); - } - if ws_thread.join().is_err() { - error!("Websocketserver thread panicked."); - } return false; } Err(err) => { @@ -128,60 +82,24 @@ impl Weylus { if web_thread.join().is_err() { error!("Webserver thread panicked."); } - - if let Err(err) = sender_ui2ws.send(Ui2WsMessage::Shutdown) { - warn!( - "Failed to send shutdown command to websocketserver: {}", - err - ); - } - if ws_thread.join().is_err() { - error!("Websocketserver thread panicked."); - } return false; } } - self.ws_thread = Some(ws_thread); self.web_thread = Some(web_thread); - self.channels = Some(Channels { - sender_ui2ws, - sender_ui2web, - }); std::thread::spawn(move || { - for msg in receiver_web2ui.iter() { + while let Some(msg) = receiver_ui.blocking_recv() { on_web_message(msg); } }); - std::thread::spawn(move || { - for msg in receiver_ws2ui.iter() { - on_ws_message(msg); - } - }); true } pub fn stop(&mut self) { - if let Some(channels) = self.channels.as_mut() { - if let Err(err) = channels.sender_ui2ws.send(Ui2WsMessage::Shutdown) { - warn!( - "Failed to send shutdown command to websocketserver: {}", - err - ); - } - if let Err(err) = channels.sender_ui2web.try_send(Ui2WebMessage::Shutdown) { - warn!("Failed to send shutdown command to webserver: {}", err); - } - } + self.notify_shutdown.notify_one(); self.wait(); - self.channels = None; } fn wait(&mut self) { - if let Some(t) = self.ws_thread.take() { - if t.join().is_err() { - error!("Websocket thread panicked."); - } - } if let Some(t) = self.web_thread.take() { if t.join().is_err() { error!("Web thread panicked."); diff --git a/ts/lib.ts b/ts/lib.ts index 744b7c00..ebef111a 100644 --- a/ts/lib.ts +++ b/ts/lib.ts @@ -16,7 +16,7 @@ let last_fps_calc: number = performance.now(); let check_video: HTMLInputElement; -function run(access_code: string, websocket_port: number, level: string) { +function run(level: string) { window.onload = () => { log_pre = document.getElementById("log") as HTMLPreElement; log_pre.textContent = ""; @@ -39,7 +39,7 @@ function run(access_code: string, websocket_port: number, level: string) { } return false; }, true) - init(access_code, websocket_port) + init(); }; } @@ -53,12 +53,12 @@ function log(level: LogLevel, msg: string) { log_pre.textContent += LogLevel[level] + ": " + msg + "\n"; } -function frame_update_scale(x: number) { - return Math.pow(x / 100, 3); +function frame_rate_scale(x: number) { + return Math.pow(x / 100, 1.5); } -function frame_update_scale_inv(x: number) { - return 100 * Math.pow(x, 1 / 3); +function frame_rate_scale_inv(x: number) { + return 100 * Math.pow(x, 2 / 3); } @@ -101,8 +101,8 @@ class Settings { webSocket: WebSocket; checks: Map; capturable_select: HTMLSelectElement; - frame_update_limit_input: HTMLInputElement; - frame_update_limit_output: HTMLOutputElement; + frame_rate_input: HTMLInputElement; + frame_rate_output: HTMLOutputElement; scale_video_input: HTMLInputElement; scale_video_output: HTMLOutputElement; range_min_pressure: HTMLInputElement; @@ -115,16 +115,16 @@ class Settings { this.webSocket = webSocket; this.checks = new Map(); this.capturable_select = document.getElementById("window") as HTMLSelectElement; - this.frame_update_limit_input = document.getElementById("frame_update_limit") as HTMLInputElement; - this.frame_update_limit_input.min = frame_update_scale_inv(1).toString(); - this.frame_update_limit_input.max = frame_update_scale_inv(1000).toString(); - this.frame_update_limit_output = this.frame_update_limit_input.nextElementSibling as HTMLOutputElement; + this.frame_rate_input = document.getElementById("frame_rate") as HTMLInputElement; + this.frame_rate_input.min = frame_rate_scale_inv(0).toString(); + this.frame_rate_input.max = frame_rate_scale_inv(120).toString(); + this.frame_rate_output = this.frame_rate_input.nextElementSibling as HTMLOutputElement; this.scale_video_input = document.getElementById("scale_video") as HTMLInputElement; this.scale_video_output = this.scale_video_input.nextElementSibling as HTMLOutputElement; this.range_min_pressure = document.getElementById("min_pressure") as HTMLInputElement; this.client_name_input = document.getElementById("client_name") as HTMLInputElement; - this.frame_update_limit_input.oninput = (e) => { - this.frame_update_limit_output.value = Math.round(frame_update_scale(this.frame_update_limit_input.valueAsNumber)).toString(); + this.frame_rate_input.oninput = (e) => { + this.frame_rate_output.value = Math.round(frame_rate_scale(this.frame_rate_input.valueAsNumber)).toString(); } this.scale_video_input.oninput = (e) => { let [w, h] = calc_max_video_resolution(this.scale_video_input.valueAsNumber) @@ -189,7 +189,7 @@ class Settings { toggle_energysaving((e.target as HTMLInputElement).checked); }; - this.frame_update_limit_input.onchange = () => this.save_settings(); + this.frame_rate_input.onchange = () => this.save_settings(); this.range_min_pressure.onchange = () => this.save_settings(); // server @@ -198,6 +198,7 @@ class Settings { this.checks.get("capture_cursor").onchange = upd_server_config; this.scale_video_input.onchange = upd_server_config; this.client_name_input.onchange = upd_server_config; + this.frame_rate_input.onchange = upd_server_config; document.getElementById("refresh").onclick = () => this.webSocket.send('"GetCapturableList"'); this.capturable_select.onchange = () => this.send_server_config(); @@ -213,6 +214,7 @@ class Settings { let [w, h] = calc_max_video_resolution(this.scale_video_input.valueAsNumber); config["max_width"] = w; config["max_height"] = h; + config["frame_rate"] = frame_rate_scale(this.frame_rate_input.valueAsNumber); if (this.client_name_input.value) config["client_name"] = this.client_name_input.value; this.webSocket.send(JSON.stringify({ "Config": config })); @@ -222,7 +224,7 @@ class Settings { let settings = Object(null); for (const [key, elem] of this.checks.entries()) settings[key] = elem.checked; - settings["frame_update_limit"] = frame_update_scale(this.frame_update_limit_input.valueAsNumber).toString(); + settings["frame_rate"] = frame_rate_scale(this.frame_rate_input.valueAsNumber).toString(); settings["scale_video"] = this.scale_video_input.value; settings["min_pressure"] = this.range_min_pressure.value; settings["client_name"] = this.client_name_input.value; @@ -232,8 +234,8 @@ class Settings { load_settings() { let settings_string = localStorage.getItem("settings"); if (settings_string === null) { - this.frame_update_limit_input.value = frame_update_scale_inv(33).toString(); - this.frame_update_limit_output.value = (33).toString(); + this.frame_rate_input.value = frame_rate_scale_inv(30).toString(); + this.frame_rate_output.value = (30).toString(); let [w, h] = calc_max_video_resolution(this.scale_video_input.valueAsNumber) this.scale_video_output.value = w + "x" + h; return; @@ -244,12 +246,12 @@ class Settings { if (typeof settings[key] === "boolean") elem.checked = settings[key]; } - let upd_limit = settings["frame_update_limit"]; + let upd_limit = settings["frame_rate"]; if (upd_limit) - this.frame_update_limit_input.value = frame_update_scale_inv(upd_limit).toString(); + this.frame_rate_input.value = frame_rate_scale_inv(upd_limit).toString(); else - this.frame_update_limit_input.value = frame_update_scale_inv(33).toString(); - this.frame_update_limit_output.value = Math.round(frame_update_scale(this.frame_update_limit_input.valueAsNumber)).toString(); + this.frame_rate_input.value = frame_rate_scale_inv(30).toString(); + this.frame_rate_output.value = Math.round(frame_rate_scale(this.frame_rate_input.valueAsNumber)).toString(); let scale_video = settings["scale_video"]; if (scale_video) @@ -302,10 +304,6 @@ class Settings { return ptrs; } - frame_update_limit() { - return frame_update_scale(this.frame_update_limit_input.valueAsNumber) - } - toggle() { this.settings.classList.toggle("hide"); this.visible = !this.visible; @@ -315,7 +313,7 @@ class Settings { let current_selection = undefined; if (this.capturable_select.selectedOptions[0]) current_selection = this.capturable_select.selectedOptions[0].textContent; - let new_index; + let new_index: number; this.capturable_select.innerText = ""; window_names.forEach((name, i) => { let option = document.createElement("option"); @@ -728,27 +726,13 @@ class KeyboardHandler { } } -function frame_timer(webSocket: WebSocket) { - // Closing or closed, so no more frames - if (webSocket.readyState > webSocket.OPEN) - return; - +function frame_rate_stats() { let t = performance.now(); - if (t - last_fps_calc > 1500) { - let fps = Math.round(frame_count / (t - last_fps_calc) * 10000) / 10; - fps_out.value = fps.toString(); - frame_count = 0; - last_fps_calc = t; - } - - if (document.hidden) { - requestAnimationFrame(() => frame_timer(webSocket)); - return; - } - - if (webSocket.readyState === webSocket.OPEN && check_video.checked) - webSocket.send('"TryGetFrame"'); - setTimeout(() => frame_timer(webSocket), settings.frame_update_limit()); + let fps = Math.round(frame_count / (t - last_fps_calc) * 10000) / 10; + fps_out.value = fps.toString(); + frame_count = 0; + last_fps_calc = t; + setTimeout(() => frame_rate_stats(), 1500); } function handle_messages( @@ -854,12 +838,11 @@ function check_apis() { } } -function init(access_code: string, websocket_port: number) { +function init() { check_apis(); - let authed = false; let protocol = document.location.protocol == "https:" ? "wss://" : "ws://"; - let webSocket = new WebSocket(protocol + window.location.hostname + ":" + websocket_port); + let webSocket = new WebSocket(protocol + window.location.hostname + ":" + window.location.port + "/ws"); webSocket.binaryType = "arraybuffer"; settings = new Settings(webSocket); @@ -904,8 +887,7 @@ function init(access_code: string, websocket_port: number) { canvas.height = window.innerHeight * window.devicePixelRatio; let [w, h] = calc_max_video_resolution(settings.scale_video_input.valueAsNumber); settings.scale_video_output.value = w + "x" + h; - if (authed) - settings.send_server_config(); + settings.send_server_config(); } video.controls = false; video.onloadeddata = () => stretch_video(); @@ -914,7 +896,6 @@ function init(access_code: string, websocket_port: number) { if (!is_connected) { new KeyboardHandler(webSocket); new PointerHandler(webSocket); - frame_timer(webSocket); is_connected = true; } }, @@ -923,12 +904,10 @@ function init(access_code: string, websocket_port: number) { ); window.onunload = () => { webSocket.close(); } webSocket.onopen = function(event) { - if (access_code) - webSocket.send(access_code); - authed = true; webSocket.send('"GetCapturableList"'); settings.send_server_config(); } + frame_rate_stats(); } // object-fit: fill; <-- this is unfortunately not supported on iOS, so we use the following diff --git a/www/templates/index.html b/www/templates/index.html index 6c734456..83d8396d 100644 --- a/www/templates/index.html +++ b/www/templates/index.html @@ -1,72 +1,80 @@ - - - - - - Weylus - - - - - -
- - -
-
-
-
-

Settings

-
- - - -
-

Video

-
- - - - - - - - -
-

Input

-
- - - - - -
-
- -
-
- - -
-
- -
-
-
-
+ + + + + + Weylus + + + + + + +
+ + +
+
+
+
+

Settings

+
+ + + +
+

Video

+
+ + + + + + + + +
+

Input

+
+ + + + + +
+
+ +
+
+ + +
+
+ +
+
+
- +
+