Skip to content

Commit

Permalink
Fix invalid handling of glob collections for wildcard subscribers (#1022
Browse files Browse the repository at this point in the history
)

* Add watchAllResource subscriber

* Fix invalid handling of glob collections for wildcard subscribers

As the title says, things weren't wired through correctly for glob collections,
namely they were being ignored because they were triggering the check for
whtehre the cluster is being watched at all. This fixes that (and the fact that
the initial subscription did not respect the `useGlobCollections` flag). This
change has been unit tested and tested through in restli-resource-explorer, and
it works for both glob and non-glob.

* CHANGELOG

* try to make test less flaky

* Un-revert

* Fix CL date

* CL

* Make flaky tests run more
  • Loading branch information
PapaCharlie authored Sep 24, 2024
1 parent e0b2683 commit 4e391df
Show file tree
Hide file tree
Showing 8 changed files with 500 additions and 54 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.58.9] - 2024-09-24
- Fix invalid handling of glob collections for wildcard subscribers

## [29.58.8] - 2024-09-23
- Revert Add WildcardResourceSubscriber which could subscribe to all resources, like NODE and URIMap resources.

Expand Down Expand Up @@ -5734,7 +5737,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.8...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.9...master
[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9
[29.58.8]: https://github.com/linkedin/rest.li/compare/v29.58.7...v29.58.8
[29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7
[29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.util.NamedThreadFactory;
import com.linkedin.test.util.retry.ThreeRetries;
import com.linkedin.test.util.retry.TenRetries;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.net.URI;
Expand Down Expand Up @@ -141,7 +141,7 @@ public void teardown()
* the requests sending from the clients should result in an even distribution. The total call count
* received by a single server should not deviate by more than 15% of the average.
*/
@Test(retryAnalyzer = ThreeRetries.class)
@Test(retryAnalyzer = TenRetries.class)
public void testBalancedLoadDistribution()
{
SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener();
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testBalancedLoadDistribution()
* After the update event is received by the ZK event subscriber. One request is required to actually trigger the
* load balancer state and hash ring changes.
*/
@Test(retryAnalyzer = ThreeRetries.class)
@Test(retryAnalyzer = TenRetries.class)
public void testD2WeightLessThanOne()
{
SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener();
Expand All @@ -218,11 +218,12 @@ public void testD2WeightLessThanOne()
throw new RuntimeException("Failed the test because thread was interrupted");
}

try {
try
{
// Change the D2 weight of server:2851 to 0.5
invokeD2ChangeWeightJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""), 0.5);
// Wait 5ms for the change to propagate
Thread.sleep(5);
// Wait 50ms for the change to propagate
Thread.sleep(50);
} catch (Exception e) {
fail("Failed to invoke d2 weight change jmx", e);
}
Expand Down Expand Up @@ -284,7 +285,7 @@ public void testD2WeightLessThanOne()
* And if we further increase the weight to 4.0. The host will receive 4x the traffic of the other hosts
* (with a tolerance of 15%).
*/
@Test(retryAnalyzer = ThreeRetries.class)
@Test(retryAnalyzer = TenRetries.class)
public void testD2WeightGreaterThanOne()
{
SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener();
Expand All @@ -304,11 +305,12 @@ public void testD2WeightGreaterThanOne()
throw new RuntimeException("Failed the test because thread was interrupted");
}

try {
try
{
// Change the D2 weight of server:2851 to 2.0
invokeD2ChangeWeightJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""), 2);
// Wait 5ms for the change to propagate
Thread.sleep(5);
// Wait 50ms for the change to propagate
Thread.sleep(50);
} catch (Exception e) {
fail("Failed to invoke d2 weight change jmx", e);
}
Expand Down Expand Up @@ -363,11 +365,12 @@ public void testD2WeightGreaterThanOne()
}
}

try {
try
{
// Change the D2 weight of server:2851 to 4.0
invokeD2ChangeWeightJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""), 4);
// Wait 5ms for the change to propagate
Thread.sleep(5);
// Wait 50ms for the change to propagate
Thread.sleep(50);
} catch (Exception e) {
fail("Failed to invoke d2 weight change jmx", e);
}
Expand Down Expand Up @@ -405,7 +408,7 @@ public void testD2WeightGreaterThanOne()
* 2. The host start receiving traffic and has a health score > 0.5.
* The host will then be kicked out of the recovery program and continue to recover/degrade using normal up/downStep.
*/
@Test(retryAnalyzer = ThreeRetries.class)
@Test(retryAnalyzer = TenRetries.class)
public void testHostMarkDownAndMarkUp()
{
SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener();
Expand All @@ -425,11 +428,12 @@ public void testHostMarkDownAndMarkUp()
throw new RuntimeException("Failed the test because thread was interrupted");
}

try {
try
{
// Mark down server:2851
invokeMarkDownJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""));
// Wait 5ms for the change to propagate
Thread.sleep(5);
// Wait 50ms for the change to propagate
Thread.sleep(50);
} catch (Exception e) {
fail("Failed to invoke d2 weight change jmx", e);
}
Expand Down Expand Up @@ -459,11 +463,12 @@ public void testHostMarkDownAndMarkUp()
throw new RuntimeException("Failed the test because thread was interrupted");
}

try {
try
{
// Mark up server:2851
invokeMarkUpJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""));
// Wait 5ms for the change to propagate
Thread.sleep(5);
// Wait 50ms for the change to propagate
Thread.sleep(50);
} catch (Exception e) {
fail("Failed to invoke d2 weight change jmx", e);
}
Expand Down
116 changes: 110 additions & 6 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,91 @@ final void onChanged(ResourceUpdate update)
}
}

public static abstract class WildcardResourceWatcher
{
private final ResourceType _type;

/**
* Defining a private constructor means only classes that are defined in this file can extend this class (see
* {@link ResourceWatcher}).
*/
WildcardResourceWatcher(ResourceType type)
{
_type = type;
}

final ResourceType getType()
{
return _type;
}

/**
* Called when the resource discovery RPC encounters some transient error.
*/
public abstract void onError(Status error);

/**
* Called when the resource discovery RPC reestablishes connection.
*/
public abstract void onReconnect();

/**
* Called when a resource is added or updated.
* @param resourceName the name of the resource that was added or updated.
* @param update the new data {@link ResourceUpdate} for the resource.
*/
abstract void onChanged(String resourceName, ResourceUpdate update);

/**
* Called when a resource is removed.
* @param resourceName the name of the resource that was removed.
*/
public abstract void onRemoval(String resourceName);
}

public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher
{
public WildcardNodeResourceWatcher()
{
super(ResourceType.NODE);
}

/**
* Called when a node resource is added or updated.
* @param resourceName the resource name of the {@link NodeUpdate} that was added or updated.
* @param update the new data for the {@link NodeUpdate}, including D2 cluster and service information.
*/
public abstract void onChanged(String resourceName, NodeUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, (NodeUpdate) update);
}
}

public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher
{
public WildcardD2URIMapResourceWatcher()
{
super(ResourceType.D2_URI_MAP);
}

/**
* Called when a {@link D2URIMapUpdate} resource is added or updated.
* @param resourceName the resource name of the {@link D2URIMapUpdate} map resource that was added or updated.
* like the /d2/uris/clusterName
* @param update the new data for the {@link D2URIMapUpdate} resource
*/
public abstract void onChanged(String resourceName, D2URIMapUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, (D2URIMapUpdate) update);
}
}

public interface ResourceUpdate
{
boolean isValid();
Expand All @@ -109,7 +194,7 @@ public static final class NodeUpdate implements ResourceUpdate
_nodeData = nodeData;
}

XdsD2.Node getNodeData()
public XdsD2.Node getNodeData()
{
return _nodeData;
}
Expand Down Expand Up @@ -261,13 +346,32 @@ static ResourceType fromTypeUrl(String typeUrl)
* will always notify the given watcher of the current data if it is already present, even if the given watcher was
* already subscribed to said resource. However, the subscription will only be added once.
*/
abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);
public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);

abstract void startRpcStream();
/**
* Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher
* will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher
* will always notify the given watcher of the current data.
*/
public abstract void watchAllXdsResources(WildcardResourceWatcher watcher);

abstract void shutdown();
/**
* Initiates the RPC stream to the xDS server.
*/
public abstract void startRpcStream();

/**
* Shuts down the xDS client.
*/
public abstract void shutdown();

abstract String getXdsServerAuthority();
/**
* Returns the authority of the xDS server.
*/
public abstract String getXdsServerAuthority();

abstract public XdsClientJmx getXdsClientJmx();
/**
* Returns the JMX bean for the xDS client.
*/
public abstract XdsClientJmx getXdsClientJmx();
}
Loading

0 comments on commit 4e391df

Please sign in to comment.