Skip to content

Idempotency key #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 132 additions & 0 deletions flake.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

64 changes: 64 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{
description = "Minimal rust wasm32-unknown-unknown example";

inputs = {
flake-utils.url = "github:numtide/flake-utils";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs.nixpkgs.follows = "nixpkgs";
};
nixpkgs.url = "nixpkgs/nixos-unstable";
nixpkgs-fixed = {
url = "github:NixOS/nixpkgs/8dfad603247387df1df4826b8bea58efc5d012d8";
};
};

outputs = { self, nixpkgs, nixpkgs-fixed, flake-utils, rust-overlay }:
flake-utils.lib.eachDefaultSystem (system:
let
overlays = [ rust-overlay.overlays.default ];
pkgs = import nixpkgs { inherit system overlays; };
pkgsFixed = import nixpkgs-fixed { inherit system overlays; };
rust = pkgs.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml;
wrangler = pkgsFixed.nodePackages_latest.wrangler;
inputs = [
rust
pkgs.rust-analyzer
pkgs.openssl
pkgs.zlib
pkgs.gcc
pkgs.pkg-config
pkgs.just
pkgs.wasm-pack
pkgs.wasm-bindgen-cli
pkgs.binaryen
pkgs.clang
pkgs.corepack_20
pkgs.nodejs_20
pkgs.worker-build
wrangler
];
in
{
defaultPackage = pkgs.rustPlatform.buildRustPackage {
src = ./.;

cargoLock = {
lockFile = ./Cargo.lock;
};

nativeBuildInputs = inputs;
};

devShell = pkgs.mkShell {
packages = inputs;
shellHook = ''
export LIBCLANG_PATH=${pkgs.libclang.lib}/lib/
export LD_LIBRARY_PATH=${pkgs.openssl}/lib:$LD_LIBRARY_PATH
export CC_wasm32_unknown_unknown=${pkgs.llvmPackages_14.clang-unwrapped}/bin/clang-14
export CFLAGS_wasm32_unknown_unknown="-I ${pkgs.llvmPackages_14.libclang.lib}/lib/clang/14.0.6/include/"
'';
};
}
);
}
5 changes: 5 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[toolchain]
channel = "nightly-2023-11-27"
components = ["rustfmt", "clippy"]
targets = [ "wasm32-unknown-unknown" ]
profile = "default"
35 changes: 0 additions & 35 deletions src/cloudflare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,41 +107,6 @@ async fn handle_server_stream(
empty_response()
}

// TODO
pub(crate) fn handle_mutiny_to_mutiny(
_req: Request,
_ctx: RouteContext<()>,
) -> worker::Result<Response> {
// For websocket compatibility
let pair = WebSocketPair::new()?;
let server = pair.server;
server.accept()?;
logger::debug("accepted websocket, about to spawn event stream");
wasm_bindgen_futures::spawn_local(async move {
let mut event_stream = server.events().expect("stream error");
logger::debug("spawned event stream, waiting for first message..");
while let Some(event) = event_stream.next().await {
if let Err(e) = event {
logger::error(&format!("error parsing some event: {e}"));
continue;
}
match event.expect("received error in websocket") {
WebsocketEvent::Message(msg) => {
if msg.text().is_none() {
continue;
};
// TODO parse msg when we support m2m here
}
WebsocketEvent::Close(_) => {
logger::debug("closing");
break;
}
}
}
});
Response::from_websocket(pair.client)
}

pub(crate) fn empty_response() -> worker::Result<Response> {
Response::empty()?.with_cors(&cors())
}
Expand Down
53 changes: 48 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,26 @@ use worker::{event, Context, Date, Env, Request};

// All imports for server specific things
#[cfg(feature = "server")]
use axum::{
async_trait,
extract::FromRequest,
extract::{Path, WebSocketUpgrade},
http::{Request, StatusCode},
TypedHeader,
};
#[cfg(feature = "server")]
use axum::{routing::get, Router};
#[cfg(feature = "server")]
use std::collections::HashMap;
#[cfg(feature = "server")]
use std::env;
#[cfg(feature = "server")]
use std::net::SocketAddr;
#[cfg(feature = "server")]
use std::sync::Arc;
#[cfg(feature = "server")]
use tokio::sync::Mutex;
#[cfg(feature = "server")]
use tower_http::trace::{DefaultMakeSpan, TraceLayer};

mod logger;
Expand All @@ -21,17 +35,48 @@ mod cloudflare;
#[cfg(feature = "server")]
mod server;

#[cfg(feature = "server")]
struct IdempotencyKey(Option<String>);

#[cfg(feature = "server")]
#[async_trait]
impl<'a, B> FromRequest<(), B> for IdempotencyKey
where
B: Send + 'static,
{
type Rejection = StatusCode;

async fn from_request(req: Request<B>, _: &()) -> Result<Self, Self::Rejection> {
let headers = req.headers().clone();
let key = headers
.get("Idempotency-Key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
Ok(IdempotencyKey(key))
}
}

/// Main function for running the program as a server
#[tokio::main]
#[cfg(feature = "server")]
async fn main() {
println!("Running ln-websocket-proxy");
tracing_subscriber::fmt::init();

let locks = Arc::new(Mutex::new(HashMap::new()));

let app = Router::new()
.route("/v1/:ip/:port", get(crate::server::ws_handler))
// TODO m2m
//.route("/v1/mutiny/:identifier", get(mutiny_ws_handler))
.route(
"/v1/:ip/:port",
get(
|path: Path<(String, String)>,
ws: WebSocketUpgrade,
user_agent: Option<TypedHeader<headers::UserAgent>>,
idempotency_key: IdempotencyKey| async move {
server::ws_handler(path, ws, user_agent, idempotency_key.0, locks.clone())
},
),
)
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().include_headers(true)),
Expand Down Expand Up @@ -68,8 +113,6 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> worker::Result<worke
// functionality and a `RouteContext` which you can use to and get route parameters and
// Environment bindings like KV Stores, Durable Objects, Secrets, and Variables.
router
// TODO this is for mutiny to mutiny, which is being ignored for now
.get("/v1/mutiny/:id", crate::cloudflare::handle_mutiny_to_mutiny)
.get("/v1/:ip/:port", crate::cloudflare::handle_ws_to_tcp)
.options("/*catchall", |_, _| crate::cloudflare::empty_response())
.run(req, env)
Expand Down
Loading