Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion engine/packages/actor-kv/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::result::Result::Ok;
use anyhow::*;
use universaldb::prelude::*;

use rivet_runner_protocol as rp;
use rivet_runner_protocol::mk2 as rp;

use crate::key::KeyWrapper;

Expand Down
2 changes: 1 addition & 1 deletion engine/packages/actor-kv/src/key.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use rivet_runner_protocol as rp;
use rivet_runner_protocol::mk2 as rp;
use universaldb::tuple::{
Bytes, PackResult, TupleDepth, TuplePack, TupleUnpack, VersionstampOffset,
};
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/actor-kv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use entry::{EntryBaseKey, EntryBuilder, EntryMetadataKey, EntryValueChunkKey};
use futures_util::{StreamExt, TryStreamExt};
use gas::prelude::*;
use key::{KeyWrapper, ListKeyWrapper};
use rivet_runner_protocol as rp;
use rivet_runner_protocol::mk2 as rp;
use universaldb::prelude::*;
use universaldb::tuple::Subspace;
use utils::{validate_entries, validate_keys};
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/actor-kv/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::result::Result::Ok;

use anyhow::*;
use rivet_runner_protocol as rp;
use rivet_runner_protocol::mk2 as rp;

use crate::{
MAX_KEY_SIZE, MAX_KEYS, MAX_PUT_PAYLOAD_SIZE, MAX_STORAGE_SIZE, MAX_VALUE_SIZE, key::KeyWrapper,
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ thiserror.workspace = true
tokio-tungstenite.workspace = true
tokio.workspace = true
tracing.workspace = true
universaldb.workspace = true
universalpubsub.workspace = true
vbare.workspace = true
72 changes: 47 additions & 25 deletions engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use tokio_tungstenite::tungstenite::{
Message,
protocol::frame::{CloseFrame, coding::CloseCode},
};
use universaldb::utils::IsolationLevel::*;

use crate::shared_state::{InFlightRequestHandle, SharedState};

Expand All @@ -46,7 +47,7 @@ pub struct WebsocketPendingLimitReached;

#[derive(Debug)]
enum LifecycleResult {
ServerClose(protocol::ToServerWebSocketClose),
ServerClose(protocol::mk2::ToServerWebSocketClose),
ClientClose(Option<CloseFrame>),
Aborted,
}
Expand Down Expand Up @@ -153,10 +154,22 @@ impl CustomServeTrait for PegboardGateway {
.context("failed to read body")?
.to_bytes();

let mut stopped_sub = self
.ctx
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id))
.await?;
let udb = self.ctx.udb()?;
let runner_id = self.runner_id;
let (mut stopped_sub, runner_protocol_version) = tokio::try_join!(
self.ctx
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id)),
// Read runner protocol version
udb.run(|tx| async move {
tx.with_subspace(pegboard::keys::subspace());

tx.read(
&pegboard::keys::runner::ProtocolVersionKey::new(runner_id),
Serializable,
)
.await
})
)?;

// Build subject to publish to
let tunnel_subject =
Expand All @@ -169,12 +182,12 @@ impl CustomServeTrait for PegboardGateway {
..
} = self
.shared_state
.start_in_flight_request(tunnel_subject, request_id)
.start_in_flight_request(tunnel_subject, runner_protocol_version, request_id)
.await;

// Start request
let message = protocol::ToClientTunnelMessageKind::ToClientRequestStart(
protocol::ToClientRequestStart {
let message = protocol::mk2::ToClientTunnelMessageKind::ToClientRequestStart(
protocol::mk2::ToClientRequestStart {
actor_id: actor_id.clone(),
method,
path: self.path.clone(),
Expand All @@ -197,12 +210,12 @@ impl CustomServeTrait for PegboardGateway {
res = msg_rx.recv() => {
if let Some(msg) = res {
match msg {
protocol::ToServerTunnelMessageKind::ToServerResponseStart(
protocol::mk2::ToServerTunnelMessageKind::ToServerResponseStart(
response_start,
) => {
return anyhow::Ok(response_start);
}
protocol::ToServerTunnelMessageKind::ToServerResponseAbort => {
protocol::mk2::ToServerTunnelMessageKind::ToServerResponseAbort => {
tracing::warn!("request aborted");
return Err(ServiceUnavailable.build());
}
Expand Down Expand Up @@ -277,9 +290,6 @@ impl CustomServeTrait for PegboardGateway {
request_id: protocol::RequestId,
after_hibernation: bool,
) -> Result<Option<CloseFrame>> {
// Use the actor ID from the gateway instance
let actor_id = self.actor_id.to_string();

// Extract headers
let mut request_headers = HashableMap::new();
for (name, value) in headers {
Expand All @@ -288,10 +298,22 @@ impl CustomServeTrait for PegboardGateway {
}
}

let mut stopped_sub = self
.ctx
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id))
.await?;
let udb = self.ctx.udb()?;
let runner_id = self.runner_id;
let (mut stopped_sub, runner_protocol_version) = tokio::try_join!(
self.ctx
.subscribe::<pegboard::workflows::actor::Stopped>(("actor_id", self.actor_id)),
// Read runner protocol version
udb.run(|tx| async move {
tx.with_subspace(pegboard::keys::subspace());

tx.read(
&pegboard::keys::runner::ProtocolVersionKey::new(runner_id),
Serializable,
)
.await
})
)?;

// Build subject to publish to
let tunnel_subject =
Expand All @@ -304,7 +326,7 @@ impl CustomServeTrait for PegboardGateway {
new,
} = self
.shared_state
.start_in_flight_request(tunnel_subject.clone(), request_id)
.start_in_flight_request(tunnel_subject.clone(), runner_protocol_version, request_id)
.await;

ensure!(
Expand All @@ -317,9 +339,9 @@ impl CustomServeTrait for PegboardGateway {
true
} else {
// Send WebSocket open message
let open_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketOpen(
protocol::ToClientWebSocketOpen {
actor_id: actor_id.clone(),
let open_message = protocol::mk2::ToClientTunnelMessageKind::ToClientWebSocketOpen(
protocol::mk2::ToClientWebSocketOpen {
actor_id: self.actor_id.to_string(),
path: self.path.clone(),
headers: request_headers,
},
Expand All @@ -338,10 +360,10 @@ impl CustomServeTrait for PegboardGateway {
res = msg_rx.recv() => {
if let Some(msg) = res {
match msg {
protocol::ToServerTunnelMessageKind::ToServerWebSocketOpen(msg) => {
protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketOpen(msg) => {
return anyhow::Ok(msg);
}
protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(close) => {
protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketClose(close) => {
tracing::warn!(?close, "websocket closed before opening");
return Err(WebSocketServiceUnavailable.build());
}
Expand Down Expand Up @@ -538,8 +560,8 @@ impl CustomServeTrait for PegboardGateway {
Ok(_) => (CloseCode::Normal.into(), None),
Err(_) => (CloseCode::Error.into(), Some("ws.downstream_closed".into())),
};
let close_message = protocol::ToClientTunnelMessageKind::ToClientWebSocketClose(
protocol::ToClientWebSocketClose {
let close_message = protocol::mk2::ToClientTunnelMessageKind::ToClientWebSocketClose(
protocol::mk2::ToClientWebSocketClose {
code: Some(close_code.into()),
reason: close_reason.map(|x| x.as_str().to_string()),
},
Expand Down
Loading
Loading