Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
c880fbf
Add more verbose asserts
FabianMeiswinkel Jun 25, 2026
a60a7c0
Potential fix for pull request finding
FabianMeiswinkel Jun 26, 2026
e9c3e35
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 26, 2026
157a4ad
Potential fix for pull request finding
FabianMeiswinkel Jun 26, 2026
3d34939
Fix NITs in tests
FabianMeiswinkel Jun 26, 2026
431ea68
Update PerPartitionCircuitBreakerE2ETests.java
FabianMeiswinkel Jun 26, 2026
8849c5a
Update ReadFeedStoredProceduresTest.java
FabianMeiswinkel Jun 26, 2026
19f800a
Update CosmosItemTest.java
FabianMeiswinkel Jun 26, 2026
1e5d841
Update StoredProcedureUpsertReplaceTest.java
FabianMeiswinkel Jun 26, 2026
6dabaae
Update FaultInjectionServerErrorRuleOnDirectTests.java
FabianMeiswinkel Jun 26, 2026
bff6574
Update FaultInjectionServerErrorRuleOnDirectTests.java
FabianMeiswinkel Jun 26, 2026
9ec325c
Update StoredProcedureQueryTest.java
FabianMeiswinkel Jun 26, 2026
42db971
Fix SP tests
FabianMeiswinkel Jun 26, 2026
525e1d4
More SP test fixes
FabianMeiswinkel Jun 26, 2026
e2037e5
Update CosmosContainerChangeFeedTest.java
FabianMeiswinkel Jun 26, 2026
2368d58
Fix test flakiness
FabianMeiswinkel Jun 26, 2026
0fa3305
Update CosmosSyncStoredProcTest.java
FabianMeiswinkel Jun 26, 2026
749a395
Fix test flakiness
FabianMeiswinkel Jun 26, 2026
b309f37
Update TestSuiteBase.java
FabianMeiswinkel Jun 26, 2026
b4aeb52
Changing test regions
FabianMeiswinkel Jun 26, 2026
97b4bb4
Update TestSuiteBase.java
FabianMeiswinkel Jun 26, 2026
3e44374
Update CosmosDiagnosticsTest.java
FabianMeiswinkel Jun 26, 2026
6fa7874
Update KafkaCosmosConnectContainer.java
FabianMeiswinkel Jun 26, 2026
8136a76
Fix test issues
FabianMeiswinkel Jun 27, 2026
b77fcc3
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 27, 2026
6fd160c
Update TestSuiteBase.java
FabianMeiswinkel Jun 29, 2026
f22b10e
Update ClientMetricsTest.java
FabianMeiswinkel Jun 29, 2026
6fd13bf
Update ClientMetricsTest.java
FabianMeiswinkel Jun 29, 2026
1c02d51
Update EndToEndTimeOutValidationTests.java
FabianMeiswinkel Jun 29, 2026
3dbb741
Update CosmosItemTest.java
FabianMeiswinkel Jun 29, 2026
eea3402
Fix test flakiness
FabianMeiswinkel Jun 29, 2026
44d52be
Address flakiness
FabianMeiswinkel Jun 29, 2026
e756565
Systemically fixing container creation race
FabianMeiswinkel Jun 29, 2026
4603395
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 29, 2026
dccadef
Update TestSuiteBase.java
FabianMeiswinkel Jun 29, 2026
fa7790d
Update TransactionalBatchAsyncContainerTest.java
FabianMeiswinkel Jun 29, 2026
d7c2457
Update EndToEndTimeOutValidationTests.java
FabianMeiswinkel Jun 29, 2026
da1ead6
Update EndToEndTimeOutValidationTests.java
FabianMeiswinkel Jun 29, 2026
89e2c15
Update EndToEndTimeOutValidationTests.java
FabianMeiswinkel Jun 29, 2026
e8110f9
Update CosmosItemTest.java
FabianMeiswinkel Jun 29, 2026
230ee23
Update CosmosBulkTest.java
FabianMeiswinkel Jun 29, 2026
102085c
Update AzureKeyCredentialTest.java
FabianMeiswinkel Jun 29, 2026
5807563
Update CosmosContainerOpenConnectionsAndInitCachesTest.java
FabianMeiswinkel Jun 29, 2026
f7e565a
Update OperationPoliciesTest.java
FabianMeiswinkel Jun 29, 2026
15a9e2e
Update ProactiveConnectionManagementTest.java
FabianMeiswinkel Jun 29, 2026
87ca96e
Fixed flaky tests
FabianMeiswinkel Jun 29, 2026
68ea1f9
Update CosmosItemContentResponseOnWriteTest.java
FabianMeiswinkel Jun 29, 2026
f40486a
Make getFeedRanges from tests more robust
FabianMeiswinkel Jun 29, 2026
22fe6ca
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 29, 2026
7c10d3b
Update GatewayAddressCacheTest.java
FabianMeiswinkel Jun 29, 2026
373be1a
Merge branch 'users/fabianm/Testfix' of https://github.com/FabianMeis…
FabianMeiswinkel Jun 29, 2026
e5cfbc5
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 29, 2026
056e266
Update CustomerWorkflowPartitionLevelCircuitBreakerTest.java
FabianMeiswinkel Jun 29, 2026
c337e8c
Merge branch 'users/fabianm/Testfix' of https://github.com/FabianMeis…
FabianMeiswinkel Jun 29, 2026
8a5e8a8
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 29, 2026
df536b4
Merge branch 'users/fabianm/Testfix' of https://github.com/FabianMeis…
FabianMeiswinkel Jun 29, 2026
67df9b9
Fix Test flakiness
FabianMeiswinkel Jun 29, 2026
6e5ebf3
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 29, 2026
26abb95
Update CosmosItemTest.java
FabianMeiswinkel Jun 30, 2026
3e19fbf
Increasing setup timeouts
FabianMeiswinkel Jun 30, 2026
9acafbc
More robustness imprvements
FabianMeiswinkel Jun 30, 2026
2ec1944
More robustness fixes
FabianMeiswinkel Jun 30, 2026
7047653
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 30, 2026
696318a
Update LocationCacheTest.java
FabianMeiswinkel Jun 30, 2026
9c5d1d7
Update FaultInjectionWithAvailabilityStrategyTestsBase.java
FabianMeiswinkel Jun 30, 2026
63086c6
Update TestSuiteBase.java
FabianMeiswinkel Jun 30, 2026
5cbec09
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jun 30, 2026
7678abf
Fix FI unit tests
FabianMeiswinkel Jun 30, 2026
b2c2334
Fixing test flakiness
FabianMeiswinkel Jul 1, 2026
ea9cd34
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jul 1, 2026
235f597
Update TestSuiteBase.java
FabianMeiswinkel Jul 1, 2026
71fa847
fix bulk test failures
FabianMeiswinkel Jul 1, 2026
4069dfa
Update FaultInjectionWithAvailabilityStrategyTestsBase.java
FabianMeiswinkel Jul 1, 2026
3aa3840
Update TestSuiteBase.java
FabianMeiswinkel Jul 1, 2026
c4724bf
Update OperationPoliciesTest.java
FabianMeiswinkel Jul 1, 2026
dadc344
Merge branch 'main' into users/fabianm/Testfix
FabianMeiswinkel Jul 1, 2026
7d31bf4
Fixes ReadManyByPK bug in Spark connector resulting in duplicates and…
FabianMeiswinkel Jul 1, 2026
22d755a
Updating changelogs
FabianMeiswinkel Jul 1, 2026
83ad534
Update CosmosReadManyByPartitionKeyReader.scala
FabianMeiswinkel Jul 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,41 @@

package com.azure.cosmos.kafka.connect;

import com.azure.core.exception.ResourceNotFoundException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.connect.apiclient.Configuration;
import org.sourcelab.kafka.connect.apiclient.KafkaConnectClient;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorDefinition;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorStatus;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ResourceNotFoundException;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class KafkaCosmosConnectContainer extends GenericContainer<KafkaCosmosConnectContainer> {
private static final Logger logger = LoggerFactory.getLogger(KafkaCosmosConnectContainer.class);
private static final int KAFKA_CONNECT_PORT = 8083;
private static final Duration KAFKA_CONNECT_REST_OPERATION_TIMEOUT = Duration.ofMinutes(2);
private static final Duration KAFKA_CONNECT_REST_RETRY_DELAY = Duration.ofMillis(500);
private static final int KAFKA_ADMIN_OPERATION_TIMEOUT_IN_SECONDS = 30;
private Properties producerProperties;
private Properties consumerProperties;
private AdminClient adminClient;
Expand Down Expand Up @@ -54,6 +66,10 @@ private void defaultConfig() {
// withEnv("CONNECT_LOG4J_LOGGERS", "org.apache.kafka=DEBUG,org.reflections=DEBUG,com.azure.cosmos.kafka=DEBUG");

withExposedPorts(KAFKA_CONNECT_PORT);
waitingFor(Wait.forHttp("/connectors")
.forPort(KAFKA_CONNECT_PORT)
.forStatusCode(200)
.withStartupTimeout(KAFKA_CONNECT_REST_OPERATION_TIMEOUT));
}

private Properties defaultConsumerConfig() {
Expand Down Expand Up @@ -158,13 +174,9 @@ public void registerConnector(String name, Map<String, String> config) {
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));

logger.info("adding kafka connector {}", name);

try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ConnectorDefinition connectorDefinition = kafkaConnectClient.addConnector(newConnectorDefinition);
ConnectorDefinition connectorDefinition = executeWithKafkaConnectRestRetry(
"adding kafka connector " + name,
() -> kafkaConnectClient.addConnector(newConnectorDefinition));
logger.info("adding kafka connector completed with " + connectorDefinition);
}

Expand Down Expand Up @@ -212,7 +224,9 @@ public void resumeConnector(String name) {

public ConnectorStatus getConnectorStatus(String name) {
KafkaConnectClient kafkaConnectClient = new KafkaConnectClient(new Configuration(getTarget()));
return kafkaConnectClient.getConnectorStatus(name);
return executeWithKafkaConnectRestRetry(
"getting kafka connector status " + name,
() -> kafkaConnectClient.getConnectorStatus(name));
}

public String getTarget() {
Expand All @@ -232,11 +246,91 @@ public Properties getConsumerProperties() {
}

public void createTopic(String topicName, int numPartitions) {
this.adminClient.createTopics(
Arrays.asList(new NewTopic(topicName, numPartitions, (short) replicationFactor)));
try {
this.adminClient.createTopics(
Arrays.asList(new NewTopic(topicName, numPartitions, (short) replicationFactor)))
.all()
.get(KAFKA_ADMIN_OPERATION_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
logger.info("Creating topic {} succeeded.", topicName);
} catch (ExecutionException exception) {
if (exception.getCause() instanceof TopicExistsException) {
logger.info("Topic {} already exists.", topicName);
return;
}

throw new RuntimeException("Failed to create topic " + topicName, exception);
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while creating topic " + topicName, exception);
} catch (TimeoutException exception) {
throw new RuntimeException("Timed out while creating topic " + topicName, exception);
}
}

public void deleteTopic(String topicName) {
this.adminClient.deleteTopics(Arrays.asList(topicName));
try {
this.adminClient.deleteTopics(Arrays.asList(topicName))
.all()
.get(KAFKA_ADMIN_OPERATION_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
logger.info("Deleting topic {} succeeded.", topicName);
} catch (ExecutionException exception) {
if (exception.getCause() instanceof UnknownTopicOrPartitionException) {
logger.info("Topic {} not found.", topicName);
return;
}

logger.warn("Failed to delete topic {}", topicName, exception);
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while deleting topic " + topicName, exception);
} catch (TimeoutException exception) {
logger.warn("Timed out while deleting topic {}", topicName, exception);
}
}

private <T> T executeWithKafkaConnectRestRetry(String operationName, Callable<T> operation) {
long deadlineNanos = System.nanoTime() + KAFKA_CONNECT_REST_OPERATION_TIMEOUT.toNanos();
int attempts = 0;
InvalidRequestException lastException = null;

while (System.nanoTime() < deadlineNanos) {
attempts++;
try {
return operation.call();
} catch (InvalidRequestException exception) {
if (!isTransientKafkaConnectRestNotFound(exception)) {
throw exception;
}

lastException = exception;
logger.warn(
"Kafka Connect REST returned transient Not Found while {} on attempt {}. Retrying.",
operationName,
attempts,
exception);
} catch (Exception exception) {
throw new RuntimeException("Failed while " + operationName, exception);
}

sleepBeforeKafkaConnectRestRetry(operationName);
}

throw new RuntimeException(
"Timed out after " + KAFKA_CONNECT_REST_OPERATION_TIMEOUT.getSeconds()
+ " seconds while " + operationName,
lastException);
}

private static boolean isTransientKafkaConnectRestNotFound(InvalidRequestException exception) {
return exception.getErrorCode() == 404;
}

private static void sleepBeforeKafkaConnectRestRetry(String operationName) {
try {
TimeUnit.MILLISECONDS.sleep(KAFKA_CONNECT_REST_RETRY_DELAY.toMillis());
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while " + operationName, exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.azure.cosmos.models.IncludedPath;
import com.azure.cosmos.models.IndexingPolicy;
import com.azure.cosmos.models.PartitionKeyDefinition;
import com.azure.cosmos.models.SqlParameter;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.models.ThroughputProperties;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -35,9 +37,11 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Listeners({KafkaCosmosTestNGLogListener.class})
public class KafkaCosmosTestSuiteBase implements ITest {
Expand All @@ -46,6 +50,9 @@ public class KafkaCosmosTestSuiteBase implements ITest {

protected static final int SUITE_SETUP_TIMEOUT = 120000;
protected static final int SUITE_SHUTDOWN_TIMEOUT = 60000;
private static final int KAFKA_COSMOS_SUITE_SETUP_TIMEOUT = 10 * SUITE_SETUP_TIMEOUT;
private static final Duration CONTAINER_METADATA_MAX_WAIT = Duration.ofMinutes(2);
private static final Duration CONTAINER_METADATA_ATTEMPT_TIMEOUT = Duration.ofSeconds(10);

protected static final AzureKeyCredential credential;
protected static String databaseName;
Expand Down Expand Up @@ -89,7 +96,7 @@ protected static CosmosContainerProperties getSinglePartitionContainer(CosmosAsy
credential = new AzureKeyCredential(KafkaCosmosTestConfigurations.MASTER_KEY);
}

@BeforeSuite(groups = { "kafka", "kafka-integration" }, timeOut = SUITE_SETUP_TIMEOUT)
@BeforeSuite(groups = { "kafka", "kafka-integration" }, timeOut = KAFKA_COSMOS_SUITE_SETUP_TIMEOUT)
public void beforeSuite() {

logger.info("beforeSuite Started");
Expand Down Expand Up @@ -119,9 +126,11 @@ public void beforeSuite() {
options,
6000);
}

waitForCreatedContainersToBeQueryable();
}

@BeforeSuite(groups = { "kafka-emulator" }, timeOut = SUITE_SETUP_TIMEOUT)
@BeforeSuite(groups = { "kafka-emulator" }, timeOut = KAFKA_COSMOS_SUITE_SETUP_TIMEOUT)
public void beforeSuite_emulator() {

logger.info("beforeSuite Started");
Expand Down Expand Up @@ -151,6 +160,8 @@ public void beforeSuite_emulator() {
options,
6000);
}

waitForCreatedContainersToBeQueryable();
}

@BeforeSuite(groups = { "unit" }, timeOut = SUITE_SETUP_TIMEOUT)
Expand Down Expand Up @@ -227,6 +238,98 @@ private static String createCollection(
return cosmosContainerProperties.getId();
}

private static void waitForCreatedContainersToBeQueryable() {
try (CosmosAsyncClient probeClient = createGatewayHouseKeepingDocumentClient(true).buildAsyncClient()) {
waitForCreatedContainersToBeQueryable(
probeClient,
databaseName,
Arrays.asList(
multiPartitionContainerName,
multiPartitionContainerWithIdAsPartitionKeyName,
singlePartitionContainerName));
}
}

private static void waitForCreatedContainersToBeQueryable(
CosmosAsyncClient cosmosAsyncClient,
String databaseName,
List<String> expectedContainerNames) {

long deadlineNanos = System.nanoTime() + CONTAINER_METADATA_MAX_WAIT.toNanos();
int attempts = 0;
Throwable lastFailure = null;

while (System.nanoTime() < deadlineNanos) {
attempts++;
try {
List<String> visibleContainerNames = getVisibleContainerNames(
cosmosAsyncClient,
databaseName,
expectedContainerNames);

if (visibleContainerNames.containsAll(expectedContainerNames)) {
logger.info(
"Kafka test containers {} became queryable in database {} after {} attempt(s).",
expectedContainerNames,
databaseName,
attempts);
return;
}

lastFailure = new AssertionError(
"Expected containers " + expectedContainerNames + " but only found " + visibleContainerNames);
} catch (Exception exception) {
lastFailure = exception;
}

try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for Kafka test containers to become queryable.", exception);
}
}

throw new AssertionError(
"Kafka test containers " + expectedContainerNames + " were not queryable in database "
+ databaseName + " within " + CONTAINER_METADATA_MAX_WAIT.getSeconds() + " seconds after "
+ attempts + " attempt(s).",
lastFailure);
}

private static List<String> getVisibleContainerNames(
CosmosAsyncClient cosmosAsyncClient,
String databaseName,
List<String> expectedContainerNames) {

StringBuilder queryBuilder = new StringBuilder("SELECT * FROM c WHERE c.id IN (");
List<SqlParameter> parameters = new ArrayList<>();
for (int index = 0; index < expectedContainerNames.size(); index++) {
String parameterName = "@container" + index;
parameters.add(new SqlParameter(parameterName, expectedContainerNames.get(index)));
queryBuilder.append(parameterName);
if (index < expectedContainerNames.size() - 1) {
queryBuilder.append(", ");
}
}
queryBuilder.append(")");

List<CosmosContainerProperties> visibleContainers = cosmosAsyncClient
.getDatabase(databaseName)
.queryContainers(new SqlQuerySpec(queryBuilder.toString(), parameters))
.byPage()
.flatMapIterable(response -> response.getResults())
.collectList()
.block(CONTAINER_METADATA_ATTEMPT_TIMEOUT);

List<String> visibleContainerNames = new ArrayList<>();
for (CosmosContainerProperties visibleContainer : visibleContainers) {
visibleContainerNames.add(visibleContainer.getId());
}

return visibleContainerNames;
}

static protected CosmosContainerProperties getCollectionDefinitionWithRangeRangeIndex(boolean enableAllVersionsAndDeletesPolicy) {
return getCollectionDefinitionWithRangeRangeIndex(Collections.singletonList("/mypk"), enableAllVersionsAndDeletesPolicy);
}
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)
* Fixed an issue in the `readManyByPartitionKeys` API in the Spark connector which could result in duplicates and missing the first record. - See [PR 49694](https://github.com/Azure/azure-sdk-for-java/pull/49694)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)
* Fixed an issue in the `readManyByPartitionKeys` API in the Spark connector which could result in duplicates and missing the first record. - See [PR 49694](https://github.com/Azure/azure-sdk-for-java/pull/49694)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)
* Fixed an issue in the `readManyByPartitionKeys` API in the Spark connector which could result in duplicates and missing the first record. - See [PR 49694](https://github.com/Azure/azure-sdk-for-java/pull/49694)

#### Other Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#### Bugs Fixed
* Added a defensive guard in bounded change feed reads (with `endLsn`) that fails the Spark task with `IllegalStateException` when the underlying paginator stops before the latest continuation token has advanced to `endLsn`. - See [PR 49393](https://github.com/Azure/azure-sdk-for-java/pull/49393)
* Fixed an issue in the `readManyByPartitionKeys` API in the Spark connector which could result in duplicates and missing the first record. - See [PR 49694](https://github.com/Azure/azure-sdk-for-java/pull/49694)

#### Other Changes

Expand Down
Loading
Loading