Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 54 additions & 9 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ kj::Own<IoChannelFactory::ActorClassChannel> DurableObjectClass::getChannel(IoCo
}

void DurableObjectClass::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
auto channel = getChannel(IoContext::current());
auto& ioctx = IoContext::current();
auto channel = getChannel(ioctx);
channel->requireAllowsTransfer();

KJ_IF_SOME(handler, serializer.getExternalHandler()) {
Expand All @@ -232,10 +233,34 @@ void DurableObjectClass::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
JSG_REQUIRE(FeatureFlags::get(js).getWorkerdExperimental(), DOMDataCloneError,
"DurableObjectClass serialization requires the 'experimental' compat flag.");

auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
rpcHandler.write([token = kj::mv(token)](rpc::JsValue::External::Builder builder) {
builder.setActorClassChannelToken(token);
});
KJ_SWITCH_ONEOF(channel->getTokenMaybeSync(IoChannelFactory::ChannelTokenUsage::RPC)) {
KJ_CASE_ONEOF(token, kj::Array<byte>) {
rpcHandler.write([token = kj::mv(token)](rpc::JsValue::External::Builder builder) {
builder.setActorClassChannelToken(token);
});
}
KJ_CASE_ONEOF(promise, kj::Promise<kj::Array<byte>>) {
// Token isn't available synchronously, so we have to send a promise.
auto paf = kj::newPromiseAndFulfiller<
rpc::JsValue::ExternalPusher::DelayedChannelToken::Client>();

// Arrange to send the token when it's ready.
ioctx.addTask(
promise.then([pusher = rpcHandler.getExternalPusher(),
fulfiller = kj::mv(paf.fulfiller)](kj::Array<byte> token) mutable {
auto req = pusher.pushDelayedChannelTokenRequest(
capnp::MessageSize{4 + token.size() / sizeof(capnp::word), 0});
req.setToken(token);
fulfiller->fulfill(req.send().getCap());
}));

// Write the promise for now.
rpcHandler.write(
[promise = kj::mv(paf.promise)](rpc::JsValue::External::Builder builder) mutable {
builder.setDelayedActorClassChannelToken(kj::mv(promise));
});
}
}
return;
}
// TODO(someday): structuredClone() should have special handling that just reproduces the same
Expand All @@ -246,7 +271,16 @@ void DurableObjectClass::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
// is temporary, anyone using this will lose their data later.
JSG_REQUIRE(FeatureFlags::get(js).getAllowIrrevocableStubStorage(), DOMDataCloneError,
"DurableObjectClass cannot be serialized in this context.");
serializer.writeLengthDelimited(channel->getToken(IoChannelFactory::ChannelTokenUsage::STORAGE));
KJ_SWITCH_ONEOF(channel->getTokenMaybeSync(IoChannelFactory::ChannelTokenUsage::STORAGE)) {
KJ_CASE_ONEOF(token, kj::Array<byte>) {
serializer.writeLengthDelimited(token);
}
KJ_CASE_ONEOF(promise, kj::Promise<kj::Array<byte>>) {
// TODO(stub-storage): Eventually we'll serialize by pointing to an external table.
KJ_UNIMPLEMENTED(
"tried to store ActorClassChannel whose token is not synchronously available");
}
}
}

jsg::Ref<DurableObjectClass> DurableObjectClass::deserialize(
Expand All @@ -273,10 +307,21 @@ jsg::Ref<DurableObjectClass> DurableObjectClass::deserialize(
"DurableObjectClass serialization requires the 'experimental' compat flag.");

auto external = rpcHandler.read();
KJ_REQUIRE(external.isActorClassChannelToken());
auto& ioctx = IoContext::current();
auto channel = ioctx.getIoChannelFactory().actorClassFromToken(
IoChannelFactory::ChannelTokenUsage::RPC, external.getActorClassChannelToken());
kj::Own<IoChannelFactory::ActorClassChannel> channel;

if (external.isDelayedActorClassChannelToken()) {
auto promise = ioctx.getExternalPusher()->unwrapDelayedChannelToken(
external.getDelayedActorClassChannelToken());
channel = ioctx.getIoChannelFactory().actorClassFromToken(
IoChannelFactory::ChannelTokenUsage::RPC, kj::mv(promise));
} else if (external.isActorClassChannelToken()) {
channel = ioctx.getIoChannelFactory().actorClassFromToken(
IoChannelFactory::ChannelTokenUsage::RPC, external.getActorClassChannelToken());
} else {
KJ_FAIL_REQUIRE("wrong external type for DurableObjectClass", external.which());
}

return js.alloc<DurableObjectClass>(ioctx.addObject(kj::mv(channel)));
}
}
Expand Down
91 changes: 12 additions & 79 deletions src/workerd/api/basics.c++
Original file line number Diff line number Diff line change
Expand Up @@ -572,56 +572,6 @@ class AbortTriggerRpcClient final {
rpc::AbortTrigger::Client client;
};

namespace {
// The jsrpc handler that receives aborts from the remote and triggers them locally
//
// TODO(cleanup): This class has been copied to external-pusher.c++. The copy here can be
// deleted as soon as we've switched from StreamSink to ExternalPusher and can delete all the
// StreamSink-related code. For now I'm not trying to avoid duplication.
class AbortTriggerRpcServer final: public rpc::AbortTrigger::Server {
public:
AbortTriggerRpcServer(kj::Own<kj::PromiseFulfiller<void>> fulfiller,
kj::Own<AbortSignal::PendingReason>&& pendingReason)
: fulfiller(kj::mv(fulfiller)),
pendingReason(kj::mv(pendingReason)) {}

kj::Promise<void> abort(AbortContext abortCtx) override {
auto params = abortCtx.getParams();
auto reason = params.getReason().getV8Serialized();

pendingReason->getWrapped() = kj::heapArray(reason.asBytes());
fulfiller->fulfill();
return kj::READY_NOW;
}

kj::Promise<void> release(ReleaseContext releaseCtx) override {
released = true;
return kj::READY_NOW;
}

~AbortTriggerRpcServer() noexcept(false) {
if (pendingReason->getWrapped() != nullptr) {
// Already triggered
return;
}

if (!released) {
pendingReason->getWrapped() = JSG_KJ_EXCEPTION(FAILED, DOMAbortError,
"An AbortSignal received over RPC was implicitly aborted because the connection back to "
"its trigger was lost.");
}

// Always fulfill the promise in case the AbortSignal was waiting
fulfiller->fulfill();
}

private:
kj::Own<kj::PromiseFulfiller<void>> fulfiller;
kj::Own<AbortSignal::PendingReason> pendingReason;
bool released = false;
};
} // namespace

AbortSignal::AbortSignal(kj::Maybe<kj::Exception> exception,
jsg::Optional<jsg::JsRef<jsg::JsValue>> maybeReason,
Flag flag)
Expand Down Expand Up @@ -863,21 +813,16 @@ void AbortSignal::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
}

auto triggerCap = [&]() -> rpc::AbortTrigger::Client {
KJ_IF_SOME(pusher, externalHandler->getExternalPusher()) {
auto pipeline = pusher.pushAbortSignalRequest(capnp::MessageSize{2, 0}).sendForPipeline();
auto pipeline = externalHandler->getExternalPusher()
.pushAbortSignalRequest(capnp::MessageSize{2, 0})
.sendForPipeline();

externalHandler->write(
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortSignal(kj::mv(signal));
});
externalHandler->write(
[signal = pipeline.getSignal()](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortSignal(kj::mv(signal));
});

return pipeline.getTrigger();
} else {
return externalHandler
->writeStream([&](rpc::JsValue::External::Builder builder) mutable {
builder.setAbortTrigger();
}).castAs<rpc::AbortTrigger>();
}
return pipeline.getTrigger();
}();

auto& ioContext = IoContext::current();
Expand Down Expand Up @@ -914,24 +859,12 @@ jsg::Ref<AbortSignal> AbortSignal::deserialize(
auto& ioctx = IoContext::current();

auto reader = externalHandler->read();
if (reader.isAbortTrigger()) {
// Old-style StreamSink.
// TODO(cleanup): Remove this once the ExternalPusher autogate has rolled out.
auto paf = kj::newPromiseAndFulfiller<void>();
auto pendingReason = ioctx.addObject(kj::refcounted<PendingReason>());

externalHandler->setLastStream(
kj::heap<AbortTriggerRpcServer>(kj::mv(paf.fulfiller), kj::addRef(*pendingReason)));
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(paf.promise)));
signal->pendingReason = kj::mv(pendingReason);
} else {
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");
KJ_REQUIRE(reader.isAbortSignal(), "external table slot type does't match serialization tag");

auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());
auto resolvedSignal = ioctx.getExternalPusher()->unwrapAbortSignal(reader.getAbortSignal());

signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));
}
signal->rpcAbortPromise = ioctx.addObject(kj::heap(kj::mv(resolvedSignal.signal)));
signal->pendingReason = ioctx.addObject(kj::mv(resolvedSignal.reason));

return signal;
}
Expand Down
54 changes: 40 additions & 14 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -382,46 +382,65 @@ jsg::Promise<void> Container::interceptOutboundHttp(
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);
return ioctx.awaitIo(js, interceptOutboundHttpImpl(*rpcClient, kj::mv(addr), kj::mv(channel)));
}

kj::Promise<void> Container::interceptOutboundHttpImpl(rpc::Container::Client rpcClient,
kj::String addr,
kj::Own<IoChannelFactory::SubrequestChannel> channel) {
// Get a channel token for RPC usage, the container runtime can use this
// token later to redeem a Fetcher.
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
kj::Array<byte> token = co_await channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
{ auto drop = kj::mv(channel); } // no longer needed

auto req = rpcClient->setEgressHttpRequest();
auto req = rpcClient.setEgressHttpRequest();
req.setHostPort(addr);
req.setChannelToken(token);
return ioctx.awaitIo(js, req.sendIgnoringResult());
co_await req.send();
}

jsg::Promise<void> Container::interceptAllOutboundHttp(jsg::Lock& js, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
return ioctx.awaitIo(js, interceptAllOutboundHttpImpl(*rpcClient, kj::mv(channel)));
}

kj::Promise<void> Container::interceptAllOutboundHttpImpl(
rpc::Container::Client rpcClient, kj::Own<IoChannelFactory::SubrequestChannel> channel) {
auto token = co_await channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
{ auto drop = kj::mv(channel); } // no longer needed

// Register for all IPv4 and IPv6 addresses (on port 80)
auto reqV4 = rpcClient->setEgressHttpRequest();
auto reqV4 = rpcClient.setEgressHttpRequest();
reqV4.setHostPort("0.0.0.0/0"_kj);
reqV4.setChannelToken(token);

auto reqV6 = rpcClient->setEgressHttpRequest();
auto reqV6 = rpcClient.setEgressHttpRequest();
reqV6.setHostPort("::/0"_kj);
reqV6.setChannelToken(token);

return ioctx.awaitIo(js,
kj::joinPromisesFailFast(kj::arr(reqV4.sendIgnoringResult(), reqV6.sendIgnoringResult())));
co_await kj::joinPromisesFailFast(
kj::arr(reqV4.sendIgnoringResult(), reqV6.sendIgnoringResult()));
}

jsg::Promise<void> Container::interceptOutboundHttps(
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
return ioctx.awaitIo(js, interceptOutboundHttpsImpl(*rpcClient, kj::mv(addr), kj::mv(channel)));
}

auto req = rpcClient->setEgressHttpsRequest();
kj::Promise<void> Container::interceptOutboundHttpsImpl(rpc::Container::Client rpcClient,
kj::String addr,
kj::Own<IoChannelFactory::SubrequestChannel> channel) {
auto token = co_await channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
{ auto drop = kj::mv(channel); } // no longer needed

auto req = rpcClient.setEgressHttpsRequest();
req.setHostPort(addr);
req.setChannelToken(token);

return ioctx.awaitIo(js, req.sendIgnoringResult());
co_await req.send();
}

jsg::Promise<jsg::Ref<ExecProcess>> Container::exec(
Expand Down Expand Up @@ -557,15 +576,22 @@ jsg::Promise<void> Container::interceptOutboundTcp(
jsg::Lock& js, kj::String addr, jsg::Ref<Fetcher> binding) {
auto& ioctx = IoContext::current();
auto channel = binding->getSubrequestChannel(ioctx);
return ioctx.awaitIo(js, interceptOutboundTcpImpl(*rpcClient, kj::mv(addr), kj::mv(channel)));
}

kj::Promise<void> Container::interceptOutboundTcpImpl(rpc::Container::Client rpcClient,
kj::String addr,
kj::Own<IoChannelFactory::SubrequestChannel> channel) {

// Get a channel token for RPC usage, the container runtime can use this
// token later to redeem a Fetcher whose connect() handler processes the TCP stream.
auto token = channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
auto token = co_await channel->getToken(IoChannelFactory::ChannelTokenUsage::RPC);
{ auto drop = kj::mv(channel); } // no longer needed

auto req = rpcClient->setEgressTcpRequest();
auto req = rpcClient.setEgressTcpRequest();
req.setHostPort(addr);
req.setChannelToken(token);
return ioctx.awaitIo(js, req.sendIgnoringResult());
co_await req.send();
}

jsg::Promise<void> Container::monitor(jsg::Lock& js) {
Expand Down
14 changes: 14 additions & 0 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,20 @@ class Container: public jsg::Object {

class TcpPortWorkerInterface;
class TcpPortOutgoingFactory;

// These helpers are static since they will leave the IoContext on the first co_await, so we
// don't want them trying to access `rpcClient` via the `IoOwn`.
static kj::Promise<void> interceptOutboundHttpImpl(rpc::Container::Client rpcClient,
kj::String addr,
kj::Own<IoChannelFactory::SubrequestChannel> channel);
static kj::Promise<void> interceptAllOutboundHttpImpl(
rpc::Container::Client rpcClient, kj::Own<IoChannelFactory::SubrequestChannel> channel);
static kj::Promise<void> interceptOutboundHttpsImpl(rpc::Container::Client rpcClient,
kj::String addr,
kj::Own<IoChannelFactory::SubrequestChannel> channel);
static kj::Promise<void> interceptOutboundTcpImpl(rpc::Container::Client rpcClient,
kj::String addr,
kj::Own<IoChannelFactory::SubrequestChannel> channel);
};

#define EW_CONTAINER_ISOLATE_TYPES \
Expand Down
Loading
Loading