Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/cloudflare/internal/test/instrumentation-test-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ export function findSpanByName(state, name, filterFn = () => true) {
* @param {Array} expectedSpans - The expected spans to compare against
* @param {Object} options - Options for the test
* @param {Function} options.mapFn - Map function to transform spans before comparison (default: x => x)
* @param {Function} options.filterFn - Filter function for spans (default: filters out jsRpcSession)
* @param {Function} options.filterFn - Filter function for spans (default: filters out jsRpcSession and jsRpcCall)
* @param {string} options.testName - Name for the test (default: 'instrumentation')
* @param {boolean} options.logReceived - Log received spans for debugging (default: false)
*
Expand All @@ -185,7 +185,8 @@ export async function runInstrumentationTest(
) {
const {
mapFn = (x) => x,
filterFn = (span) => span.name !== 'jsRpcSession',
filterFn = (span) =>
span.name !== 'jsRpcSession' && span.name !== 'jsRpcCall',
testName = 'instrumentation',
logReceived = false,
} = options;
Expand Down
7 changes: 5 additions & 2 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -962,12 +962,14 @@ class FacetOutgoingFactory final: public Fetcher::OutgoingFactory {
name(kj::mv(name)),
getStartInfo(kj::mv(getStartInfo)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override {
Result newSingleUseClient(kj::Maybe<kj::String> cfStr) override {
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
kj::Maybe<TraceContextParent> spanParents;
auto client = context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
tracing.setTag("facet_name"_kjc, name.asPtr());
spanParents = tracing.getSpanParents();

// Lazily initialize actorChannel
if (actorChannel == kj::none) {
Expand All @@ -982,6 +984,7 @@ class FacetOutgoingFactory final: public Fetcher::OutgoingFactory {
{.inHouse = true,
.wrapMetrics = true,
.operationName = kj::ConstString("facet_subrequest"_kjc)}));
return {.client = kj::mv(client), .spanParents = kj::mv(spanParents)};
}

private:
Expand Down
21 changes: 15 additions & 6 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

namespace workerd::api {

kj::Own<WorkerInterface> LocalActorOutgoingFactory::newSingleUseClient(
Fetcher::OutgoingFactory::Result LocalActorOutgoingFactory::newSingleUseClient(
kj::Maybe<kj::String> cfStr) {
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
kj::Maybe<TraceContextParent> spanParents;
auto client = context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
tracing.setTag("objectId"_kjc, actorId.asPtr());
spanParents = tracing.getSpanParents();

// Lazily initialize actorChannel
if (actorChannel == kj::none) {
Expand All @@ -37,15 +39,18 @@ kj::Own<WorkerInterface> LocalActorOutgoingFactory::newSingleUseClient(
{.inHouse = true,
.wrapMetrics = true,
.operationName = kj::ConstString("durable_object_subrequest"_kjc)}));
return {.client = kj::mv(client), .spanParents = kj::mv(spanParents)};
}

kj::Own<WorkerInterface> GlobalActorOutgoingFactory::newSingleUseClient(
Fetcher::OutgoingFactory::Result GlobalActorOutgoingFactory::newSingleUseClient(
kj::Maybe<kj::String> cfStr) {
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
kj::Maybe<TraceContextParent> spanParents;
auto client = context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
tracing.setTag("objectId"_kjc, id->toString());
spanParents = tracing.getSpanParents();

// Lazily initialize actorChannel
if (actorChannel == kj::none) {
Expand All @@ -70,15 +75,18 @@ kj::Own<WorkerInterface> GlobalActorOutgoingFactory::newSingleUseClient(
{.inHouse = true,
.wrapMetrics = true,
.operationName = kj::ConstString("durable_object_subrequest"_kjc)}));
return {.client = kj::mv(client), .spanParents = kj::mv(spanParents)};
}

kj::Own<WorkerInterface> ReplicaActorOutgoingFactory::newSingleUseClient(
Fetcher::OutgoingFactory::Result ReplicaActorOutgoingFactory::newSingleUseClient(
kj::Maybe<kj::String> cfStr) {
auto& context = IoContext::current();

return context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
kj::Maybe<TraceContextParent> spanParents;
auto client = context.getMetrics().wrapActorSubrequestClient(context.getSubrequest(
[&](TraceContext& tracing, IoChannelFactory& ioChannelFactory) {
tracing.setTag("objectId"_kjc, actorId.asPtr());
spanParents = tracing.getSpanParents();

// Unlike in `GlobalActorOutgoingFactory`, we do not create this lazily, since our channel was
// already open prior to this DO starting up.
Expand All @@ -89,6 +97,7 @@ kj::Own<WorkerInterface> ReplicaActorOutgoingFactory::newSingleUseClient(
{.inHouse = true,
.wrapMetrics = true,
.operationName = kj::ConstString("durable_object_subrequest"_kjc)}));
return {.client = kj::mv(client), .spanParents = kj::mv(spanParents)};
}

jsg::Ref<Fetcher> ColoLocalActorNamespace::get(jsg::Lock& js, kj::String actorId) {
Expand Down
6 changes: 3 additions & 3 deletions src/workerd/api/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class GlobalActorOutgoingFactory final: public Fetcher::OutgoingFactory {
routingMode(routingMode),
version(kj::mv(version)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override;
Result newSingleUseClient(kj::Maybe<kj::String> cfStr) override;

private:
ChannelIdOrFactory channelIdOrFactory;
Expand All @@ -329,7 +329,7 @@ class LocalActorOutgoingFactory final: public Fetcher::OutgoingFactory {
: channelId(channelId),
actorId(kj::mv(actorId)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override;
Result newSingleUseClient(kj::Maybe<kj::String> cfStr) override;

private:
uint channelId;
Expand All @@ -348,7 +348,7 @@ class ReplicaActorOutgoingFactory final: public Fetcher::OutgoingFactory {
: actorChannel(kj::mv(channel)),
actorId(kj::mv(actorId)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override;
Result newSingleUseClient(kj::Maybe<kj::String> cfStr) override;

private:
kj::Own<IoChannelFactory::ActorChannel> actorChannel;
Expand Down
6 changes: 4 additions & 2 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
Expand Up @@ -781,11 +781,13 @@ class Container::TcpPortOutgoingFactory final: public Fetcher::OutgoingFactory {
headerTable(headerTable),
port(kj::mv(port)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override {
Result newSingleUseClient(kj::Maybe<kj::String> cfStr) override {
// At present we have no use for `cfStr`.
return IoContext::current().getSubrequestNoChecks([&](auto& tracing, auto& channelFactory) {
auto client = IoContext::current().getSubrequestNoChecks(
[&](auto& tracing, auto& channelFactory) -> kj::Own<WorkerInterface> {
return kj::heap<TcpPortWorkerInterface>(byteStreamFactory, entropySource, headerTable, port);
}, {.inHouse = false, .wrapMetrics = false});
return {.client = kj::mv(client), .spanParents = kj::none};
}

private:
Expand Down
51 changes: 36 additions & 15 deletions src/workerd/api/http.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2118,10 +2118,19 @@ kj::Maybe<jsg::Ref<JsRpcProperty>> Fetcher::getRpcMethodInternal(jsg::Lock& js,
return js.alloc<JsRpcProperty>(JSG_THIS, kj::mv(name));
}

rpc::JsRpcTarget::Client Fetcher::getClientForOneCall(
kj::LiteralStringConst Fetcher::getRpcTargetKind() {
return "fetcher"_kjc;
}

JsRpcClientProvider::ClientForOneCall Fetcher::getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) {
auto& ioContext = IoContext::current();
auto worker = getClient(ioContext, kj::none, "jsRpcSession"_kjc);
// The "jsRpcSession" trace context is attached to the customEvent task below so
// it covers the whole session.
auto clientWithTracing = getClientWithTracing(ioContext, kj::none, "jsRpcSession"_kjc);
kj::Maybe<TraceContextParent> callSpanParents =
clientWithTracing.traceContext.map([](TraceContext& tc) { return tc.getSpanParents(); });
auto worker = kj::mv(clientWithTracing.client);
auto event = kj::heap<api::JsRpcSessionCustomEvent>(
JsRpcSessionCustomEvent::WORKER_RPC_EVENT_TYPE);

Expand All @@ -2132,12 +2141,14 @@ rpc::JsRpcTarget::Client Fetcher::getClientForOneCall(
// propagated the exception to any RPC calls that we're waiting on, so we even ignore errors
// here -- otherwise they'll end up logged as "uncaught exceptions" even if they were, in fact,
// caught elsewhere.
ioContext.addTask(worker->customEvent(kj::mv(event)).attach(kj::mv(worker)).then([](auto&&) {
}, [](kj::Exception&&) {}));
ioContext.addTask(
worker->customEvent(kj::mv(event))
.attach(kj::mv(worker), kj::mv(clientWithTracing.traceContext))
.then([](auto&&) {}, [](kj::Exception&&) {}));

// (Don't extend `path` because we're the root.)

return result;
return {.client = kj::mv(result), .callSpanParents = kj::mv(callSpanParents)};
}

void Fetcher::serialize(jsg::Lock& js, jsg::Serializer& serializer) {
Expand Down Expand Up @@ -2409,9 +2420,21 @@ kj::Own<WorkerInterface> Fetcher::getClient(

Fetcher::ClientWithTracing Fetcher::getClientWithTracing(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName) {
return buildClient(ioContext, kj::mv(cfStr), kj::mv(operationName));
}

Fetcher::ClientWithTracing Fetcher::wrapWithInnerSpan(
OutgoingFactory::Result result, kj::ConstString operationName) {
KJ_IF_SOME(parents, result.spanParents) {
return ClientWithTracing{kj::mv(result.client), parents.newChild(kj::mv(operationName))};
}
return ClientWithTracing{kj::mv(result.client), kj::none};
}

Fetcher::ClientWithTracing Fetcher::buildClient(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName) {
KJ_SWITCH_ONEOF(channelOrClientFactory) {
KJ_CASE_ONEOF(channel, uint) {
// For channels, create trace context
auto traceContext = ioContext.makeUserTraceSpan(kj::mv(operationName));
auto client = ioContext.getSubrequestChannel(channel, isInHouse, kj::mv(cfStr), traceContext);
return ClientWithTracing{kj::mv(client), kj::mv(traceContext)};
Expand All @@ -2431,17 +2454,15 @@ Fetcher::ClientWithTracing Fetcher::getClientWithTracing(
return ClientWithTracing{kj::mv(client), kj::mv(traceContext)};
}
KJ_CASE_ONEOF(outgoingFactory, IoOwn<OutgoingFactory>) {
// Outgoing factories are responsible for routing through getSubrequestNoChecks() (or
// getSubrequest()) internally if they create HTTP connections, to ensure external memory
// adjustment and other subrequest accounting are applied.
auto client = outgoingFactory->newSingleUseClient(kj::mv(cfStr));
return ClientWithTracing{kj::mv(client), kj::none};
// The factory creates its own outer dispatch span (e.g. durable_object_subrequest)
// and exposes it as `result.spanParents`. Nest our inner span under it so the trace
// tree shows `outerSpan -> operationName`.
auto result = outgoingFactory->newSingleUseClient(kj::mv(cfStr));
return wrapWithInnerSpan(kj::mv(result), kj::mv(operationName));
}
KJ_CASE_ONEOF(outgoingFactory, kj::Own<CrossContextOutgoingFactory>) {
// Same as OutgoingFactory above -- the factory is responsible for routing through
// getSubrequestNoChecks() internally.
auto client = outgoingFactory->newSingleUseClient(ioContext, kj::mv(cfStr));
return ClientWithTracing{kj::mv(client), kj::none};
auto result = outgoingFactory->newSingleUseClient(ioContext, kj::mv(cfStr));
return wrapWithInnerSpan(kj::mv(result), kj::mv(operationName));
}
}
KJ_UNREACHABLE;
Expand Down
31 changes: 24 additions & 7 deletions src/workerd/api/http.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,15 @@ class Fetcher: public JsRpcClientProvider {
// is almost the same thing.
class OutgoingFactory {
public:
virtual kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) = 0;
struct Result {
kj::Own<WorkerInterface> client;
// Parents of the dispatch-site span (e.g. durable_object_subrequest) that the
// caller can use to nest an inner operation span underneath. SpanParent holds an
// owning refcount on the underlying SpanObserver, so these are independently
// valid regardless of `client`'s lifetime. kj::none if no span was created.
kj::Maybe<TraceContextParent> spanParents;
};
virtual Result newSingleUseClient(kj::Maybe<kj::String> cfStr) = 0;

// Get a `SubrequestChannel` representing this Fetcher. This is used especially when the
// Fetcher is being passed to another isolate.
Expand All @@ -306,7 +314,7 @@ class Fetcher: public JsRpcClientProvider {
// IoContext::getSubrequestNoChecks() internally.
class CrossContextOutgoingFactory {
public:
virtual kj::Own<WorkerInterface> newSingleUseClient(
virtual OutgoingFactory::Result newSingleUseClient(
IoContext& context, kj::Maybe<kj::String> cfStr) = 0;

virtual kj::Own<IoChannelFactory::SubrequestChannel> getSubrequestChannel(IoContext& context) {
Expand All @@ -333,7 +341,7 @@ class Fetcher: public JsRpcClientProvider {
requiresHost(requiresHost),
isInHouse(isInHouse) {}

// Returns an `WorkerInterface` that is only valid for the lifetime of the current
// Returns a `WorkerInterface` that is only valid for the lifetime of the current
// `IoContext`.
kj::Own<WorkerInterface> getClient(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName);
Expand All @@ -344,8 +352,8 @@ class Fetcher: public JsRpcClientProvider {
kj::Maybe<TraceContext> traceContext;
};

// Get client and optionally create trace context, all in one call
ClientWithTracing getClientWithTracing(
// Get client and optionally create trace context, all in one call.
[[nodiscard]] ClientWithTracing getClientWithTracing(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName);

// Get a SubrequestChannel representing this Fetcher.
Expand Down Expand Up @@ -437,8 +445,9 @@ class Fetcher: public JsRpcClientProvider {
return getRpcMethod(js, kj::mv(name));
}

rpc::JsRpcTarget::Client getClientForOneCall(
jsg::Lock& js, kj::Vector<kj::StringPtr>& path) override;
ClientForOneCall getClientForOneCall(jsg::Lock& js, kj::Vector<kj::StringPtr>& path) override;

kj::LiteralStringConst getRpcTargetKind() override;

JSG_RESOURCE_TYPE(Fetcher, CompatibilityFlags::Reader flags) {
// WARNING: New JSG_METHODs on Fetcher must be gated via compatibility flag to prevent
Expand Down Expand Up @@ -520,6 +529,14 @@ class Fetcher: public JsRpcClientProvider {
JSG_SERIALIZABLE(rpc::SerializationTag::SERVICE_STUB);

private:
[[nodiscard]] ClientWithTracing buildClient(
IoContext& ioContext, kj::Maybe<kj::String> cfStr, kj::ConstString operationName);

// Wraps an OutgoingFactory result with an inner span nested under the factory's
// outer dispatch span (or under the request root if there is none).
[[nodiscard]] static ClientWithTracing wrapWithInnerSpan(
OutgoingFactory::Result result, kj::ConstString operationName);

kj::OneOf<uint,
IoOwn<IoChannelFactory::SubrequestChannel>,
kj::Own<CrossContextOutgoingFactory>,
Expand Down
9 changes: 6 additions & 3 deletions src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ class StreamOutgoingFactory final: public Fetcher::OutgoingFactory, public kj::R
httpClient(
kj::newHttpClient(headerTable, *this->stream, {.entropySource = entropySource})) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override;
Result newSingleUseClient(kj::Maybe<kj::String> cfStr) override;

private:
kj::Own<kj::AsyncIoStream> stream;
Expand Down Expand Up @@ -654,14 +654,17 @@ class StreamWorkerInterface final: public WorkerInterface {
kj::Own<StreamOutgoingFactory> factory;
};

kj::Own<WorkerInterface> StreamOutgoingFactory::newSingleUseClient(kj::Maybe<kj::String> cfStr) {
Fetcher::OutgoingFactory::Result StreamOutgoingFactory::newSingleUseClient(
kj::Maybe<kj::String> cfStr) {
JSG_ASSERT(stream.get() != nullptr, Error,
"Fetcher created from internalNewHttpClient can only be used once");
// Create a WorkerInterface that wraps the stream, routing through getSubrequestNoChecks to apply
// external memory adjustment for GC pressure.
return IoContext::current().getSubrequestNoChecks([&](auto& tracing, auto& channelFactory) {
auto client = IoContext::current().getSubrequestNoChecks(
[&](auto& tracing, auto& channelFactory) -> kj::Own<WorkerInterface> {
return kj::heap<StreamWorkerInterface>(kj::addRef(*this));
}, {.inHouse = false, .wrapMetrics = false});
return {.client = kj::mv(client), .spanParents = kj::none};
}

jsg::Promise<jsg::Ref<Fetcher>> SocketsModule::internalNewHttpClient(
Expand Down
10 changes: 10 additions & 0 deletions src/workerd/api/tests/actor-kv-test-tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ export const test = {
objectId:
'aa299662980ce671dbcb09a5d7ab26ab30e45465bcd12f263f2bdd7d5edd804a',
},
{
name: 'fetch',
closed: true,
'network.protocol.name': 'http',
'network.protocol.version': 'HTTP/1.1',
'http.request.method': 'GET',
'url.full': 'http://test.example/kv-test',
'http.response.status_code': 200n,
'http.response.body.size': 34n,
},
{ name: 'durable_object_storage_put', closed: true },
{ name: 'durable_object_storage_put', closed: true },
{ name: 'durable_object_storage_get', closed: true },
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/api/tests/sql-test-tail.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ export const test = {
durable_object_storage_get: 18,
durable_object_storage_transaction: 8,
durable_object_subrequest: 47,
// Inner spans nested under durable_object_subrequest: jsRpcSession for RPC
// dispatches (one per call, client-side only -- the server's jsrpc-typed
// onset is its equivalent), fetch for fetch dispatches.
jsRpcSession: 35,
fetch: 12,
// jsRpcCall: 35 client-side + 35 server-side.
jsRpcCall: 70,
durable_object_storage_deleteAll: 1,
createStringTable: 4,
runActorFunc: 4,
Expand Down
Loading
Loading