Skip to content

Adding large payload support for the standalone SDK#280

Open
bachuv wants to merge 5 commits intomainfrom
vabachu/java-large-payloads
Open

Adding large payload support for the standalone SDK#280
bachuv wants to merge 5 commits intomainfrom
vabachu/java-large-payloads

Conversation

@bachuv
Copy link
Copy Markdown
Contributor

@bachuv bachuv commented Apr 13, 2026

Issue describing the changes in this PR

Adding large payload support for the standalone SDK

resolves #issue_for_this_pr

Pull request checklist

  • My changes do not require documentation changes
    • Otherwise: Documentation issue linked to PR
  • My changes are added to the CHANGELOG.md
  • I have added all required tests (Unit tests, E2E tests)

@bachuv bachuv requested a review from a team as a code owner April 13, 2026 16:11
Copilot AI review requested due to automatic review settings April 13, 2026 16:11
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds “large payload” support to the standalone Durable Task Java SDK by introducing an Azure Blob Storage–backed payload externalization module and wiring gRPC interceptors + orchestrator-response chunking into the core client/worker to avoid gRPC message size limits.

Changes:

  • Introduces new :azure-blob-payloads module implementing payload externalization/resolution via a gRPC ClientInterceptor and an Azure Blob–backed PayloadStore.
  • Adds gRPC interceptor support to DurableTaskGrpcClientBuilder/DurableTaskGrpcWorkerBuilder, plus worker capability announcement and orchestrator completion chunking.
  • Adds a runnable sample and unit/integration tests covering token handling, interceptor behavior, and chunking behavior.

Reviewed changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
settings.gradle Adds the new :azure-blob-payloads Gradle module to the build.
samples/src/main/java/io/durabletask/samples/LargePayloadSample.java New sample demonstrating end-to-end large payload externalization with DTS + Azurite.
samples/build.gradle Adds a runLargePayloadSample task and depends on :azure-blob-payloads.
internal/durabletask-protobuf/protos/orchestrator_service.proto Updates protobuf contract (tags, rewind action, purge timeout).
internal/durabletask-protobuf/PROTO_SOURCE_COMMIT_HASH Updates upstream proto source commit hash.
client/src/test/java/com/microsoft/durabletask/OrchestratorChunkingTest.java Adds unit tests for worker chunking + action-size validation + capability announcement.
client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorkerBuilder.java Adds interceptor registration, LP capability flag, and configurable chunk size.
client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java Applies interceptors, announces LP capability, and implements orchestrator-response chunking/validation.
client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClientBuilder.java Adds interceptor registration support to the client builder.
client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java Applies registered interceptors to the client channel.
client/build.gradle Adds grpc-inprocess for new in-process gRPC unit tests.
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/PayloadStore.java Introduces PayloadStore abstraction for out-of-band payload storage.
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/PayloadStorageException.java Defines exception type for permanent storage failures.
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadWorkerExtensions.java Adds worker-side helper methods to enable externalized payloads + capability flag.
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadStorageOptions.java Adds configuration options (threshold/max/container/auth/compression).
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadInterceptor.java Implements interceptor that externalizes outbound payloads and resolves inbound tokens.
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadClientExtensions.java Adds client-side helper methods to enable externalized payloads.
azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java Implements Azure Blob Storage payload store with optional gzip compression.
azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/PayloadTokenTest.java Unit tests for token encode/decode and token detection.
azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadStorageOptionsTest.java Unit tests for options defaults and validation behavior.
azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadInterceptorTest.java Unit tests for request externalization + response resolution across message types.
azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadIntegrationTest.java Integration tests requiring DTS emulator + Azurite for end-to-end validation.
azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreTest.java Unit tests for blob upload/download/compression behavior using mocks.
azure-blob-payloads/spotbugs-exclude.xml SpotBugs exclusions for the new module.
azure-blob-payloads/build.gradle Build/test/spotbugs config and dependencies for the new module.

Comment thread client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java Outdated
Comment thread samples/src/main/java/io/durabletask/samples/LargePayloadSample.java Outdated
Copy link
Copy Markdown
Member

@YunchuWang YunchuWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Change intent: Add large payload externalization support to the standalone Durable Task Java SDK via a new azure-blob-payloads module. Payloads exceeding a configurable threshold are transparently uploaded to Azure Blob Storage through a gRPC ClientInterceptor, with automatic orchestrator response chunking for oversized gRPC messages.

Overall risk: Moderate
Merge recommendation: Safe with fixes — two performance issues (F1, F2) should be addressed before merge.

Architecture

This is a well-designed refactoring that moves payload externalization from application-level (manual per-call-site handling) down to the transport layer (gRPC interceptor). Key strengths:

  • Transparency — application code (orchestrations, activities) is completely unaware of large payloads
  • Separation of concerns — externalization logic concentrated in one interceptor
  • ExtensibilityaddInterceptor API is generic, future interceptors (encryption, compression) can be added
  • Cross-SDK alignment — chunking logic matches .NET SDK behavior
  • Test quality — comprehensive coverage with mock unit tests and Azurite-backed integration tests

Findings by Severity

  • High: 2
  • Medium: 3
  • Low: 2
  • Nit: 2

Proto changes note (N1): The proto file includes unrelated additions (ActivityRequest.tags, RewindOrchestrationAction, PurgeInstanceFilter.timeout). These appear to be upstream proto sync. No functional impact, but worth noting in the PR description if intentional.

Sample note (N2): LargePayloadSample.java line 123 hardcodes the blob:v1: token prefix check as a defensive assertion. Acceptable for a sample, but will need updating if the token format ever changes.

private static int utf8ByteLength(String s) {
return s.getBytes(StandardCharsets.UTF_8).length;
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F1] High / P1 — utf8ByteLength allocates a large byte array on every call

s.getBytes(StandardCharsets.UTF_8).length allocates a full copy of the string as a byte array just to measure its length. maybeExternalize is called on every StringValue field in every gRPC message — an OrchestratorResponse with multiple actions can trigger this many times. For strings near the 900KB threshold, each call allocates ~900KB+ of garbage.

Impact: Significant GC pressure under high-throughput scenarios (many concurrent orchestrations returning large payloads).

Suggestion: Compute UTF-8 length by iterating chars — zero allocation:

private static int utf8ByteLength(String s) {
    int count = 0;
    for (int i = 0; i < s.length(); i++) {
        char c = s.charAt(i);
        if (c <= 0x7F) count++;
        else if (c <= 0x7FF) count += 2;
        else if (Character.isHighSurrogate(c)) { count += 4; i++; }
        else count += 3;
    }
    return count;
}


return estimate.build().getSerializedSize();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F2] High / P1 — estimateChunkSerializedSize clones and builds the entire protobuf on every iteration

The inner loop calls chunk.clone() ... .build().getSerializedSize() for each action added. This is O(N²) — each clone copies all previously-accumulated actions. For orchestrations with many actions (e.g., 100+ callActivity fan-outs), this creates significant latency.

Suggestion: Cache the envelope overhead size (computed once), then incrementally track size by adding each new action's getSerializedSize() + ~5 bytes varint overhead. Only do the expensive clone+build when the estimate is close to the limit (within 1% margin).

throw new PayloadStorageException(
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F3] Medium / P2 — createIfNotExists called on every upload

In steady state the container already exists, but every upload() call makes an extra HTTP request to check. Under high-frequency upload scenarios (multiple activities returning large payloads concurrently), this adds unnecessary latency and Azure Storage API calls.

Suggestion: Use an AtomicBoolean to skip after first confirmation:

private final AtomicBoolean containerVerified = new AtomicBoolean(false);

// In upload():
if (!containerVerified.get()) {
    containerClient.createIfNotExists();
    containerVerified.set(true);
}

.build())
.build();
}
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F4] Medium / P2 — Non-permanent storage exceptions become RuntimeException, may leave orchestrations stuck

When externalizeOrchestratorResponse catches a non-permanent exception (e.g., transient network failure), it re-throws as RuntimeException. This propagates through gRPC sendMessage, causing the call to fail. The worker's startAndBlock loop only retries on StatusRuntimeException — other RuntimeExceptions from individual work item processing are not gracefully handled. The orchestration won't receive a completion (success or failure), potentially leaving it stuck until timeout.

Suggestion: Consider converting transient storage failures into a retriable orchestration failure (isNonRetriable=false), or wrap the completeOrchestratorTask call in the worker loop with a try-catch that abandons the work item on unexpected exceptions.

}
}

private OrchestratorAction externalizeOrchestratorAction(OrchestratorAction action) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F5] Medium / P2 — externalizeOrchestratorAction uses independent if blocks for a protobuf oneof field

OrchestratorAction.orchestratorActionType is a protobuf oneof — exactly one case is active at a time. But this method checks all 6 cases with independent if blocks instead of else if or switch. Functionally correct (only one matches), but wasteful and inconsistent with resolveEventPayloads which already uses switch (e.getEventTypeCase()).

Suggestion: Use switch (action.getOrchestratorActionTypeCase()) for consistency and clarity.

}
this.maxPayloadBytes = maxPayloadBytes;
return this;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F7] Low / P3 — No validation that maxPayloadBytes >= thresholdBytes

It's possible to set maxPayloadBytes smaller than thresholdBytes. In that case, any payload exceeding the threshold would also exceed the max, causing maybeExternalize to throw PayloadStorageException instead of externalizing — effectively disabling the feature with a misleading error message.

Suggestion: Validate maxPayloadBytes >= thresholdBytes in the LargePayloadInterceptor constructor or in the options setters.

Copy link
Copy Markdown
Member

@YunchuWang YunchuWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Change intent: Add large payload externalization support to the standalone Durable Task Java SDK via a new azure-blob-payloads module. Payloads exceeding a configurable threshold are transparently uploaded to Azure Blob Storage through a gRPC ClientInterceptor, with automatic orchestrator response chunking for oversized gRPC messages.

Overall risk: Moderate
Merge recommendation: Safe with fixes — two performance issues (F1, F2) should be addressed before merge.

Architecture

This is a well-designed refactoring that moves payload externalization from application-level (manual per-call-site handling) down to the transport layer (gRPC interceptor). Key strengths:

  • Transparency — application code (orchestrations, activities) is completely unaware of large payloads
  • Separation of concerns — externalization logic concentrated in one interceptor
  • ExtensibilityaddInterceptor API is generic, future interceptors can be added
  • Cross-SDK alignment — chunking logic matches .NET SDK behavior
  • Test quality — comprehensive coverage with mock unit tests and Azurite-backed integration tests

Findings by Severity

  • High: 2 (F1, F2)
  • Medium: 3 (F3, F4, F5)
  • Low: 2 (F6, F7)
  • Nit: 2 (N1, N2)

Additional Notes

[F6] Low / P3 — maybeResolve (download path) has no error handling: payloadStore.download() in maybeResolve can throw if a blob is deleted or there's a network failure. Unlike the upload path (which has try-catch in externalizeActivityResponse/externalizeOrchestratorResponse), the download path has no protection. An exception here propagates through onMessage, breaking the entire getWorkItems streaming call. Consider adding try-catch around download calls to at least log the error and let the orchestration fail explicitly rather than silently disconnecting the stream.

[N1] Nit — Proto changes include unrelated additions: ActivityRequest.tags, RewindOrchestrationAction, PurgeInstanceFilter.timeout appear to be upstream proto sync additions unrelated to large payloads. No functional impact, but worth noting in the PR description.

[N2] Nit — Sample hardcodes token prefix: LargePayloadSample.java line 123 checks payload.startsWith("blob:v1:") as a defensive assertion. Acceptable for a sample, but will need updating if the token format changes.

private static int utf8ByteLength(String s) {
return s.getBytes(StandardCharsets.UTF_8).length;
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F1] High / P1 — utf8ByteLength allocates a large byte array on every call

s.getBytes(StandardCharsets.UTF_8).length allocates a full copy of the string as a byte array just to measure its length. maybeExternalize is called on every StringValue field in every gRPC message — an OrchestratorResponse with multiple actions can trigger this many times. For strings near the 900KB threshold, each call allocates ~900KB+ of garbage.

Impact: Significant GC pressure under high-throughput scenarios.

Suggestion: Compute UTF-8 length by iterating chars (zero allocation):

private static int utf8ByteLength(String s) {
    int count = 0;
    for (int i = 0; i < s.length(); i++) {
        char c = s.charAt(i);
        if (c <= 0x7F) count++;
        else if (c <= 0x7FF) count += 2;
        else if (Character.isHighSurrogate(c)) { count += 4; i++; }
        else count += 3;
    }
    return count;
}


return estimate.build().getSerializedSize();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F2] High / P1 — estimateChunkSerializedSize clones and builds entire protobuf on every iteration

The inner loop calls chunk.clone()...build().getSerializedSize() for each action added. This is O(N²) — each clone copies all previously-accumulated actions. For orchestrations with many actions (e.g., 100+ callActivity fan-outs), this creates significant latency.

Suggestion: Cache the envelope overhead size (computed once), then incrementally track size by adding each new action's getSerializedSize() + ~5 bytes varint overhead. Only do the expensive clone+build when the running estimate is close to the limit.

throw new PayloadStorageException(
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F3] Medium / P2 — createIfNotExists called on every upload

In steady state the container already exists, but every upload() call makes an extra HTTP round-trip to check. Under high-frequency upload scenarios (multiple activities returning large payloads concurrently), this adds unnecessary latency.

Suggestion: Use an AtomicBoolean to skip after first confirmation:

private final AtomicBoolean containerVerified = new AtomicBoolean(false);

// In upload():
if (!containerVerified.get()) {
    containerClient.createIfNotExists();
    containerVerified.set(true);
}

.build())
.build();
}
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F4] Medium / P2 — Non-permanent storage exceptions become RuntimeException, may leave orchestrations stuck

When externalizeOrchestratorResponse catches a non-permanent exception (e.g., transient network failure), it re-throws as RuntimeException. This propagates through gRPC sendMessage, causing the call to fail. The worker's main loop only retries on StatusRuntimeException — other exceptions from individual work items are not gracefully handled. The orchestration won't receive a completion notification, potentially leaving it stuck until timeout.

Suggestion: Consider converting transient storage failures into a retriable orchestration failure (isNonRetriable=false), or wrap completeOrchestratorTask in the worker loop with a try-catch that abandons the work item on unexpected exceptions.

}
}

private OrchestratorAction externalizeOrchestratorAction(OrchestratorAction action) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F5] Medium / P2 — externalizeOrchestratorAction uses independent if blocks for a protobuf oneof field

OrchestratorAction.orchestratorActionType is a protobuf oneof — exactly one case is active at a time. But this method checks all 6 cases with independent if blocks instead of else if or switch. Functionally correct (only one matches), but wasteful and inconsistent with resolveEventPayloads (line 536) which correctly uses switch (e.getEventTypeCase()).

Suggestion: Use switch (action.getOrchestratorActionTypeCase()) for consistency.

}
this.maxPayloadBytes = maxPayloadBytes;
return this;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F7] Low / P3 — No validation that maxPayloadBytes >= thresholdBytes

It's possible to set maxPayloadBytes smaller than thresholdBytes. In that case, any payload exceeding the threshold would also exceed the max, causing maybeExternalize to throw PayloadStorageException instead of externalizing — effectively disabling the feature with a misleading error message.

Suggestion: Validate maxPayloadBytes >= thresholdBytes in the LargePayloadInterceptor constructor.

Copy link
Copy Markdown
Member

@YunchuWang YunchuWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Change intent: Add large payload externalization support to the standalone Durable Task Java SDK via a new azure-blob-payloads module. Payloads exceeding a configurable threshold are transparently uploaded to Azure Blob Storage through a gRPC ClientInterceptor, with automatic orchestrator response chunking for oversized gRPC messages.

Overall risk: Moderate
Merge recommendation: Safe with fixes — two performance issues (F1, F2) should be addressed before merge.

Architecture

This is a well-designed refactoring that moves payload externalization from application-level (manual per-call-site handling) down to the transport layer (gRPC interceptor). Key strengths:

  • Transparency — application code (orchestrations, activities) is completely unaware of large payloads
  • Separation of concerns — externalization logic concentrated in one interceptor
  • ExtensibilityaddInterceptor API is generic, future interceptors can be added
  • Cross-SDK alignment — chunking logic matches .NET SDK behavior
  • Test quality — comprehensive coverage with mock unit tests and Azurite-backed integration tests

Findings by Severity

  • High: 2 (F1, F2)
  • Medium: 3 (F3, F4, F5)
  • Low: 2 (F6, F7)
  • Nit: 2 (N1, N2)

Additional Notes

[F6] Low / P3 — maybeResolve (download path) has no error handling: payloadStore.download() in maybeResolve can throw if a blob is deleted or network failure. Unlike the upload path (which has try-catch), the download path has no protection. An exception propagates through onMessage, breaking the entire getWorkItems streaming call. Consider adding try-catch to let the orchestration fail explicitly rather than silently disconnecting.

[N1] Nit — Proto changes include unrelated additions: ActivityRequest.tags, RewindOrchestrationAction, PurgeInstanceFilter.timeout appear to be upstream proto sync. No functional impact but worth noting in PR description.

[N2] Nit — Sample hardcodes token prefix: LargePayloadSample.java line 123 checks payload.startsWith("blob:v1:"). Acceptable for a sample but will need updating if token format changes.

private static int utf8ByteLength(String s) {
return s.getBytes(StandardCharsets.UTF_8).length;
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F1] High / P1 — utf8ByteLength allocates a large byte array on every call

s.getBytes(StandardCharsets.UTF_8).length allocates a full copy of the string as a byte array just to measure its length. maybeExternalize is called on every StringValue field in every gRPC message. For strings near the 900KB threshold, each call allocates ~900KB+ of garbage.

Impact: Significant GC pressure under high-throughput scenarios.

Suggestion: Compute UTF-8 length by iterating chars (zero allocation):

private static int utf8ByteLength(String s) {
    int count = 0;
    for (int i = 0; i < s.length(); i++) {
        char c = s.charAt(i);
        if (c <= 0x7F) count++;
        else if (c <= 0x7FF) count += 2;
        else if (Character.isHighSurrogate(c)) { count += 4; i++; }
        else count += 3;
    }
    return count;
}


return estimate.build().getSerializedSize();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F2] High / P1 — estimateChunkSerializedSize clones and builds entire protobuf on every iteration

The inner loop calls chunk.clone()...build().getSerializedSize() for each action added. This is O(N²) — each clone copies all previously-accumulated actions. For orchestrations with many actions (100+ callActivity fan-outs), this creates significant latency.

Suggestion: Cache the envelope overhead size (computed once), then incrementally track size by adding each action's getSerializedSize() + ~5 bytes varint overhead. Only do the expensive clone+build when the estimate is close to the limit.

throw new PayloadStorageException(
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F3] Medium / P2 — createIfNotExists called on every upload

In steady state the container already exists, but every upload() makes an extra HTTP round-trip. Under high-frequency uploads, this adds unnecessary latency.

Suggestion: Use an AtomicBoolean to skip after first confirmation:

private final AtomicBoolean containerVerified = new AtomicBoolean(false);
// In upload():
if (!containerVerified.get()) {
    containerClient.createIfNotExists();
    containerVerified.set(true);
}

.build())
.build();
}
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F4] Medium / P2 — Non-permanent storage exceptions become RuntimeException, may leave orchestrations stuck

When externalizeOrchestratorResponse catches a non-permanent exception (transient network failure), it re-throws as RuntimeException. This propagates through gRPC sendMessage. The worker's main loop only retries on StatusRuntimeException — other exceptions are not gracefully handled. The orchestration won't receive a completion, potentially leaving it stuck until timeout.

Suggestion: Convert transient storage failures into a retriable orchestration failure (isNonRetriable=false), or wrap completeOrchestratorTask in the worker loop with try-catch that abandons the work item.

}
}

private OrchestratorAction externalizeOrchestratorAction(OrchestratorAction action) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F5] Medium / P2 — externalizeOrchestratorAction uses independent if blocks for a protobuf oneof field

OrchestratorAction.orchestratorActionType is a protobuf oneof — exactly one case is active at a time. But this method checks all 6 cases with independent if blocks. Functionally correct (only one matches), but wasteful and inconsistent with resolveEventPayloads which uses switch (e.getEventTypeCase()).

Suggestion: Use switch (action.getOrchestratorActionTypeCase()) for consistency.

}
this.maxPayloadBytes = maxPayloadBytes;
return this;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F7] Low / P3 — No validation that maxPayloadBytes >= thresholdBytes

It's possible to set maxPayloadBytes smaller than thresholdBytes. Any payload exceeding the threshold would also exceed the max, causing maybeExternalize to throw PayloadStorageException instead of externalizing — effectively disabling the feature with a misleading error.

Suggestion: Validate maxPayloadBytes >= thresholdBytes in the LargePayloadInterceptor constructor.

Copy link
Copy Markdown
Member

@YunchuWang YunchuWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Summary

Change intent: Add large payload externalization support to the standalone Durable Task Java SDK via a new azure-blob-payloads module. Payloads exceeding a configurable threshold are transparently uploaded to Azure Blob Storage through a gRPC ClientInterceptor, with automatic orchestrator response chunking for oversized gRPC messages.

Overall risk: Moderate
Merge recommendation: Safe with fixes — two performance issues (F1, F2) should be addressed before merge.

Architecture

This is a well-designed refactoring that moves payload externalization from application-level (manual per-call-site handling) down to the transport layer (gRPC interceptor). Key strengths:

  • Transparency — application code (orchestrations, activities) is completely unaware of large payloads
  • Separation of concerns — externalization logic concentrated in one interceptor
  • ExtensibilityaddInterceptor API is generic, future interceptors can be added
  • Cross-SDK alignment — chunking logic matches .NET SDK behavior
  • Test quality — comprehensive coverage with mock unit tests and Azurite-backed integration tests

Findings by Severity

  • High: 2 (F1, F2)
  • Medium: 3 (F3, F4, F5)
  • Low: 2 (F6, F7)
  • Nit: 2 (N1, N2)

Additional Notes

[F6] Low / P3 — maybeResolve (download path) has no error handling: payloadStore.download() in maybeResolve can throw if a blob is deleted or network failure. Unlike the upload path (which has try-catch), the download path has no protection. An exception propagates through onMessage, breaking the entire getWorkItems streaming call. Consider adding try-catch to let the orchestration fail explicitly rather than silently disconnecting.

[N1] Nit — Proto changes include unrelated additions: ActivityRequest.tags, RewindOrchestrationAction, PurgeInstanceFilter.timeout appear to be upstream proto sync. No functional impact but worth noting in PR description.

[N2] Nit — Sample hardcodes token prefix: LargePayloadSample.java line 123 checks payload.startsWith("blob:v1:"). Acceptable for a sample but will need updating if token format changes.

private static int utf8ByteLength(String s) {
return s.getBytes(StandardCharsets.UTF_8).length;
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F1] High / P1 — utf8ByteLength allocates a large byte array on every call

s.getBytes(StandardCharsets.UTF_8).length allocates a full copy of the string as a byte array just to measure its length. maybeExternalize is called on every StringValue field in every gRPC message. For strings near the 900KB threshold, each call allocates ~900KB+ of garbage.

Impact: Significant GC pressure under high-throughput scenarios.

Suggestion: Compute UTF-8 length by iterating chars (zero allocation):

private static int utf8ByteLength(String s) {
    int count = 0;
    for (int i = 0; i < s.length(); i++) {
        char c = s.charAt(i);
        if (c <= 0x7F) count++;
        else if (c <= 0x7FF) count += 2;
        else if (Character.isHighSurrogate(c)) { count += 4; i++; }
        else count += 3;
    }
    return count;
}


return estimate.build().getSerializedSize();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F2] High / P1 — estimateChunkSerializedSize clones and builds entire protobuf on every iteration

The inner loop calls chunk.clone()...build().getSerializedSize() for each action added. This is O(N²) — each clone copies all previously-accumulated actions. For orchestrations with many actions (100+ callActivity fan-outs), this creates significant latency.

Suggestion: Cache the envelope overhead size (computed once), then incrementally track size by adding each action's getSerializedSize() + ~5 bytes varint overhead. Only do the expensive clone+build when the estimate is close to the limit.

throw new PayloadStorageException(
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F3] Medium / P2 — createIfNotExists called on every upload

In steady state the container already exists, but every upload() makes an extra HTTP round-trip. Under high-frequency uploads, this adds unnecessary latency.

Suggestion: Use an AtomicBoolean to skip after first confirmation:

private final AtomicBoolean containerVerified = new AtomicBoolean(false);
// In upload():
if (!containerVerified.get()) {
    containerClient.createIfNotExists();
    containerVerified.set(true);
}

.build())
.build();
}
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F4] Medium / P2 — Non-permanent storage exceptions become RuntimeException, may leave orchestrations stuck

When externalizeOrchestratorResponse catches a non-permanent exception (transient network failure), it re-throws as RuntimeException. This propagates through gRPC sendMessage. The worker's main loop only retries on StatusRuntimeException — other exceptions are not gracefully handled. The orchestration won't receive a completion, potentially leaving it stuck until timeout.

Suggestion: Convert transient storage failures into a retriable orchestration failure (isNonRetriable=false), or wrap completeOrchestratorTask in the worker loop with try-catch that abandons the work item.

}
}

private OrchestratorAction externalizeOrchestratorAction(OrchestratorAction action) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F5] Medium / P2 — externalizeOrchestratorAction uses independent if blocks for a protobuf oneof field

OrchestratorAction.orchestratorActionType is a protobuf oneof — exactly one case is active at a time. But this method checks all 6 cases with independent if blocks. Functionally correct (only one matches), but wasteful and inconsistent with resolveEventPayloads which uses switch (e.getEventTypeCase()).

Suggestion: Use switch (action.getOrchestratorActionTypeCase()) for consistency.

}
this.maxPayloadBytes = maxPayloadBytes;
return this;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[F7] Low / P3 — No validation that maxPayloadBytes >= thresholdBytes

It's possible to set maxPayloadBytes smaller than thresholdBytes. Any payload exceeding the threshold would also exceed the max, causing maybeExternalize to throw PayloadStorageException instead of externalizing — effectively disabling the feature with a misleading error.

Suggestion: Validate maxPayloadBytes >= thresholdBytes in the LargePayloadInterceptor constructor.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants