Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## [Unreleased](https://github.com/openfga/java-sdk/compare/v0.8.3...HEAD)
- fix: allow configuring maxParallelRequests for non-transaction writes (#187)

## v0.8.3

Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ Convenience `WriteTuples` and `DeleteTuples` methods are also available.

###### Non-transaction mode

The SDK will split the writes into separate requests and send them sequentially to avoid violating rate limits.
The SDK will split the writes into smaller transactions and send them with limited parallelization to avoid violating rate limits.

> Passing `ClientWriteOptions` with `.disableTransactions(true)` is required to use non-transaction mode.
> All other fields of `ClientWriteOptions` are optional.
Expand Down Expand Up @@ -570,7 +570,8 @@ var options = new ClientWriteOptions()
// You can rely on the model id set in the configuration or override it for this specific request
.authorizationModelId("01GXSA8YR785C4FYS3C0RTG7B1")
.disableTransactions(true)
.transactionChunkSize(5); // Maximum number of requests to be sent in a transaction in a particular chunk
.transactionChunkSize(5) // Maximum number of requests per transaction chunk, defaults to 1
.maxParallelRequests(5); // Max number of requests to issue in parallel, defaults to 10

var response = fgaClient.write(request, options).get();
```
Expand Down
55 changes: 44 additions & 11 deletions src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@ private CompletableFuture<ClientWriteResponse> writeNonTransaction(

var options = writeOptions != null
? writeOptions
: new ClientWriteOptions().transactionChunkSize(DEFAULT_MAX_METHOD_PARALLEL_REQS);
: new ClientWriteOptions()
.transactionChunkSize(1)
.maxParallelRequests(DEFAULT_MAX_METHOD_PARALLEL_REQS);

if (options.getAdditionalHeaders() == null) {
options.additionalHeaders(new HashMap<>());
Expand All @@ -434,19 +436,50 @@ private CompletableFuture<ClientWriteResponse> writeNonTransaction(
return this.writeTransactions(storeId, emptyTransaction, writeOptions);
}

var futureResponse = this.writeTransactions(storeId, transactions.get(0), options);

for (int i = 1; i < transactions.size(); i++) {
final int index = i; // Must be final in this scope for closure.
int maxParallelRequests = options.getMaxParallelRequests() != null
? options.getMaxParallelRequests()
: DEFAULT_MAX_METHOD_PARALLEL_REQS;

// The resulting completable future of this chain will result in either:
// 1. The first exception thrown in a failed completion. Other thenCompose() will not be evaluated.
// 2. The final successful ClientWriteResponse.
futureResponse = futureResponse.thenCompose(
_response -> this.writeTransactions(storeId, transactions.get(index), options));
if (maxParallelRequests <= 1) {
var futureResponse = this.writeTransactions(storeId, transactions.get(0), options);
for (int i = 1; i < transactions.size(); i++) {
final int index = i;
futureResponse = futureResponse.thenCompose(
_response -> this.writeTransactions(storeId, transactions.get(index), options));
}
return futureResponse;
}

return futureResponse;
var executor = Executors.newScheduledThreadPool(maxParallelRequests);
var latch = new CountDownLatch(transactions.size());
var failure = new AtomicReference<Throwable>();
var lastResponse = new AtomicReference<ClientWriteResponse>();

Consumer<ClientWriteRequest> singleWriteRequest =
tx -> this.writeTransactions(storeId, tx, options).whenComplete((response, throwable) -> {
try {
if (throwable != null) {
failure.compareAndSet(null, throwable);
} else {
lastResponse.set(response);
}
} finally {
latch.countDown();
}
});

try {
transactions.forEach(tx -> executor.execute(() -> singleWriteRequest.accept(tx)));
latch.await();
if (failure.get() != null) {
return CompletableFuture.failedFuture(failure.get());
}
return CompletableFuture.completedFuture(lastResponse.get());
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
} finally {
executor.shutdown();
}
Comment on lines +453 to +482
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Critical: Only the last response is returned in parallel execution mode.

When executing multiple transactions in parallel, the current implementation only returns the last successful response (line 477), discarding results from all other transactions. This means:

  • The caller cannot determine which specific writes/deletes succeeded
  • The response doesn't accurately represent the complete operation
  • Information about partial successes is lost

Consider aggregating all responses or returning a composite response that includes results from all transactions.

🤖 Prompt for AI Agents
In src/main/java/dev/openfga/sdk/api/client/OpenFgaClient.java between lines 453
and 482, the code currently returns only the last successful response from
parallel transaction executions, losing information about other transaction
results. To fix this, modify the implementation to collect and aggregate all
individual ClientWriteResponse objects from each transaction into a composite
response or a list, and return this aggregated result instead of just the last
one. This ensures the caller receives complete information about all transaction
outcomes.

}

private <T> Stream<List<T>> chunksOf(int chunkSize, List<T> list) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class ClientWriteOptions implements AdditionalHeadersSupplier {
private String authorizationModelId;
private Boolean disableTransactions = false;
private int transactionChunkSize;
private Integer maxParallelRequests;

public ClientWriteOptions additionalHeaders(Map<String, String> additionalHeaders) {
this.additionalHeaders = additionalHeaders;
Expand Down Expand Up @@ -56,4 +57,13 @@ public ClientWriteOptions transactionChunkSize(int transactionChunkSize) {
public int getTransactionChunkSize() {
return transactionChunkSize > 0 ? transactionChunkSize : 1;
}

public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) {
this.maxParallelRequests = maxParallelRequests;
return this;
}
Comment on lines +61 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add input validation for the setter method.

The setter accepts any Integer value including negative numbers, which could lead to invalid configurations. Consider adding validation to ensure only positive values are accepted.

 public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) {
+    if (maxParallelRequests != null && maxParallelRequests < 1) {
+        throw new IllegalArgumentException("maxParallelRequests must be greater than 0");
+    }
     this.maxParallelRequests = maxParallelRequests;
     return this;
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) {
this.maxParallelRequests = maxParallelRequests;
return this;
}
public ClientWriteOptions maxParallelRequests(Integer maxParallelRequests) {
if (maxParallelRequests != null && maxParallelRequests < 1) {
throw new IllegalArgumentException("maxParallelRequests must be greater than 0");
}
this.maxParallelRequests = maxParallelRequests;
return this;
}
🤖 Prompt for AI Agents
In src/main/java/dev/openfga/sdk/api/configuration/ClientWriteOptions.java
around lines 61 to 64, the setter method maxParallelRequests currently accepts
any Integer, including negative values. Add input validation to check if the
provided maxParallelRequests is positive (greater than zero) before setting the
field. If the value is invalid, throw an IllegalArgumentException with a clear
message indicating that only positive integers are allowed.


public Integer getMaxParallelRequests() {
return maxParallelRequests;
}
}
15 changes: 10 additions & 5 deletions src/test/java/dev/openfga/sdk/api/client/OpenFgaClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1216,8 +1216,10 @@ public void writeTest_nonTransaction() throws Exception {
ClientWriteRequest request = new ClientWriteRequest()
.writes(List.of(writeTuple, writeTuple, writeTuple, writeTuple, writeTuple))
.deletes(List.of(tuple, tuple, tuple, tuple, tuple));
ClientWriteOptions options =
new ClientWriteOptions().disableTransactions(true).transactionChunkSize(2);
ClientWriteOptions options = new ClientWriteOptions()
.disableTransactions(true)
.transactionChunkSize(2)
.maxParallelRequests(1);

// When
var response = fga.write(request, options).get();
Expand Down Expand Up @@ -1284,8 +1286,10 @@ public void writeTest_nonTransactionsWithFailure() {
.user(user)
.condition(DEFAULT_CONDITION))
.collect(Collectors.toList()));
ClientWriteOptions options =
new ClientWriteOptions().disableTransactions(true).transactionChunkSize(1);
ClientWriteOptions options = new ClientWriteOptions()
.disableTransactions(true)
.transactionChunkSize(1)
.maxParallelRequests(1);

// When
var execException = assertThrows(
Expand Down Expand Up @@ -2005,7 +2009,8 @@ public void shouldSplitBatchesSuccessfully(WireMockRuntimeInfo wireMockRuntimeIn
.correlationId("cor-3");
ClientBatchCheckRequest request = new ClientBatchCheckRequest().checks(List.of(item1, item2, item3));

ClientBatchCheckOptions options = new ClientBatchCheckOptions().maxBatchSize(2);
ClientBatchCheckOptions options =
new ClientBatchCheckOptions().maxBatchSize(2).maxParallelRequests(1);

// When
ClientBatchCheckResponse response = fga.batchCheck(request, options).join();
Expand Down