Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strange sequence of messages with WS-ReliableMessaging when some are dropped #627

Closed
ppalaga opened this issue Dec 2, 2022 · 12 comments
Closed

Comments

@ppalaga
Copy link
Contributor

ppalaga commented Dec 2, 2022

The modification of the older WS-RM test in #626 produces the attached log.

In WsReliableMessagingTest, we call proxy.sayHello() twice and output some log messages in between:

        LOG.info("About to send Joe");
        Assertions.assertThat(seqSize(proxy.sayHello("Joe"), "Joe")).isEqualTo(1);
        LOG.info("Received Joe, about to send Tom");
        Assertions.assertThat(seqSize(proxy.sayHello("Tom"), "Tom")).isEqualTo(2);
        LOG.info("Received Tom");
  • The first call proxy.sayHello("Joe") goes as expected, the response is received and the assertions are satisfied
  • Then Received Joe, about to send Tom is logged
  • Then we go on to the second call with Tom
  • MessageLossSimulator makes an even message 2 to get lost is logged as expected
  • But then out of a sudden the Joe request is sent once again and its response is received.

I find this unexpected. Only the second request with Tom should be resent.

Could please somebody check the attached log output and/or my changes in the test and figure out what's wrong with the test? There is a bunch of warnings there that I do not understand.
wsrm.log

@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 2, 2022

@ffang this basically tries to mimic what you do in WSRMTest of CSB. Could you please have a look?

@ffang
Copy link
Contributor

ffang commented Dec 5, 2022

Hi @ppalaga ,

The difference comes from that in your testcase the ws-rm client doesn't open a decoupled the endpoint(embeded http server) to receive the out-of-band RM protocol message, such as 202 partial response, SequenceAcknowledgement, etc.

In your log you can find

2022-12-02 22:50:10,939 WARN  [org.apa.cxf.ws.rm.Proxy] (RMManager-Timer-2028313348) It is not possible to send out-of-band acknowledgments to the anonymous address.
An acknowledgement will be piggybacked on the next response.

Which means the server is waiting for client to resend request and for the next response the server will return in-band SequenceAcknowledgement along with the sayHelloResponse to the client like

2022-12-02 22:50:12,954 INFO  [org.apa.cxf.ser.Wsr.RESP_OUT] (executor-thread-0) RESP_OUT
    Address: http://localhost:8081/wsrm
    Content-Type: text/xml
    ExchangeId: 55931c5a-690c-48ca-ac1f-fc89fdb16372
    ServiceName: WsrmHelloService
    PortName: WsrmHelloServicePort
    PortTypeName: WsrmHelloService
    Headers: {}
    Payload: <soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
  <soap:Header>
    <Action xmlns="http://www.w3.org/2005/08/addressing">https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/sayHelloResponse</Action>
    <MessageID xmlns="http://www.w3.org/2005/08/addressing">urn:uuid:c31d3323-6f17-4cce-80d5-eae485769074</MessageID>
    <To xmlns="http://www.w3.org/2005/08/addressing">http://www.w3.org/2005/08/addressing/anonymous</To>
    <RelatesTo xmlns="http://www.w3.org/2005/08/addressing">urn:uuid:fb6ac675-ae41-40c0-8074-750a810f9d2f</RelatesTo>
    <wsrm:Sequence xmlns:ns2="http://www.w3.org/2005/08/addressing" xmlns:wsrm="http://docs.oasis-open.org/ws-rx/wsrm/200702" soap:mustUnderstand="1">
      <wsrm:Identifier>urn:uuid:26eec5fb-f458-4d0a-8944-d6dc5685c156</wsrm:Identifier>
      <wsrm:MessageNumber>2</wsrm:MessageNumber>
    </wsrm:Sequence>
    <wsrm:SequenceAcknowledgement xmlns:ns2="http://www.w3.org/2005/08/addressing" xmlns:wsrm="http://docs.oasis-open.org/ws-rx/wsrm/200702">
      <wsrm:Identifier>urn:uuid:f8d8c87f-35f5-4677-8e7a-3bc2c0c2b679</wsrm:Identifier>
      <wsrm:AcknowledgementRange Upper="1" Lower="1"/>
      <wsrm:None/>
    </wsrm:SequenceAcknowledgement>
  </soap:Header>
  <soap:Body>
    <ns1:sayHelloResponse xmlns:ns1="https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm">
      <return>WS-ReliableMessaging Hello Joe! seqSize: 2</return>
    </ns1:sayHelloResponse>
  </soap:Body>
</soap:Envelope>

So you can see in this response it also carries the in-band SequenceAcknowledgement this time.

To be exact the same behaviour as the test in CSB, you should do change like

diff --git a/integration-tests/ws-rm/pom.xml b/integration-tests/ws-rm/pom.xml
index ca5729f..42f6300 100644
--- a/integration-tests/ws-rm/pom.xml
+++ b/integration-tests/ws-rm/pom.xml
@@ -29,7 +29,12 @@
             <groupId>io.quarkiverse.cxf</groupId>
             <artifactId>quarkus-cxf-rt-features-logging</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.cxf</groupId>
+            <artifactId>cxf-rt-transports-http-undertow</artifactId>
+            <version>${cxf.version}</version>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-test-derby</artifactId>
@@ -97,4 +102,4 @@
         </profile>
     </profiles>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/integration-tests/ws-rm/src/test/java/io/quarkiverse/cxf/it/ws/rm/server/WsReliableMessagingTest.java b/integration-tests/ws-rm/src/test/java/io/quarkiverse/cxf/it/ws/rm/server/WsReliableMessagingTest.java
index 0384ad1..206a136 100644
--- a/integration-tests/ws-rm/src/test/java/io/quarkiverse/cxf/it/ws/rm/server/WsReliableMessagingTest.java
+++ b/integration-tests/ws-rm/src/test/java/io/quarkiverse/cxf/it/ws/rm/server/WsReliableMessagingTest.java
@@ -7,6 +7,9 @@ import javax.xml.ws.Service;
 
 import org.apache.cxf.endpoint.Client;
 import org.apache.cxf.ext.logging.LoggingFeature;
+import org.apache.cxf.frontend.ClientProxy;
+import org.apache.cxf.transport.http.HTTPConduit;
+import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
 import org.apache.cxf.ws.addressing.WSAddressingFeature;
 import org.assertj.core.api.Assertions;
 import org.jboss.logging.Logger;
@@ -39,7 +42,12 @@ public class WsReliableMessagingTest {
 
         Client clientProxy = org.apache.cxf.frontend.ClientProxy.getClient(proxy);
         clientProxy.getOutInterceptors().add(new MessageLossSimulator());
-
+        String decoupledEndpoint = "http://localhost:9001"
+                + "/wsrm/decoupled_endpoint";
+        Client client = ClientProxy.getClient(proxy);
+        HTTPConduit hc = (HTTPConduit) (client.getConduit());
+        HTTPClientPolicy cp = hc.getClient();
+        cp.setDecoupledEndpoint(decoupledEndpoint);
         LOG.info("About to send Joe");
         Assertions.assertThat(seqSize(proxy.sayHello("Joe"), "Joe")).isEqualTo(1);
         LOG.info("Received Joe, about to send Tom");

Freeman

@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 6, 2022

That's very helpful, thanks a lot, @ffang!

So adding a decoupled endpoint should solve the issue. No problem, let's add it.
I wonder whether the decoupled endpoint at http://localhost:9001/wsrm/decoupled_endpoint is doing something special with the messages it receives?
I do not see any code in the CSB WSRMTest handling those messages. Maybe it is hidden in cxf-rt-transports-http-undertow?
I mean I'd like to add the decoupled endpoint in the Quarkus way (where Undertow 2.x is not available) and I am not sure what it means. Is it enough to add a dummy http endpoint e.g. using RESTeasy? Or I need to look into cxf-rt-transports-http-undertow and port it to vert.x HTTP stack available on Quarkus?

@ffang
Copy link
Contributor

ffang commented Dec 7, 2022

Hi @ppalaga ,

This is a good question.

I wonder whether the decoupled endpoint at http://localhost:9001/wsrm/decoupled_endpoint is doing something special with the messages it receives?

Yes, the decoupled endpoint(actually a cxf server created on the fly before create ws-rm sequence) control the ws-rm work flow, such as create sequence, Acknowledge sequence, trigger retransmission if necessary.

I do not see any code in the CSB WSRMTest handling those messages. Maybe it is hidden in cxf-rt-transports-http-undertow?

No, the related logic isn't in the cxf http transport, it's from cxf ws-rm/ws-addressing module.

I mean I'd like to add the decoupled endpoint in the Quarkus way (where Undertow 2.x is not available) and I am not sure what it means. Is it enough to add a dummy http endpoint e.g. using RESTeasy? Or I need to look into cxf-rt-transports-http-undertow and port it to vert.x HTTP stack available on Quarkus?

As the decoupled endpoint controls ws-rm workflow, so it can't be dummy http endpoint. But to use the platform servlet container is doable. Please see my commit here apache/camel-spring-boot@808a046

It use relative address for decoupledEndpoint

String decoupledEndpoint = "/wsrm/decoupled_endpoint";

which means cxf http-servlet transport will kick in and use the platform/container provided servlet container(in SpringBoot it's spring-boot-starter-web)

And we also need a property to let CXF know the absolute http address(platform|container servlet port and context + relative address for decoupledEndpoint) which can be used during ws-rm conversation

client.getBus().setProperty(WSAContextUtils.DECOUPLED_ENDPOINT_BASE_PROPERTY, 
                                       "http://localhost:" + port + "/services/wsrm/decoupled_endpoint");

This works in SpringBoot, but I believe this could be very similar in Quarkus with vert.x HTTP stack as servlet implementation.

Freeman

@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 8, 2022

Perfect, thanks for the explanation, @ffang!

ffang added a commit to ffang/quarkus-cxf-1 that referenced this issue Apr 20, 2023
ffang added a commit to ffang/quarkus-cxf-1 that referenced this issue Apr 24, 2023
…rtFactory while also implmenting WSDLEndpointFactory
ffang added a commit to ffang/quarkus-cxf-1 that referenced this issue Apr 25, 2023
ppalaga added a commit to ppalaga/quarkus-cxf that referenced this issue Aug 17, 2023
@ppalaga
Copy link
Contributor Author

ppalaga commented Aug 17, 2023

@ffang I squashed and cherry-picked your commits from #835 into #626 ( 3b26150 ).

Some warnings are gone and my original test is passing. There are still some things unclear to me. Could you please help?

  1. The following warning can be found several times in the attached log WsReliableMessagingTest.txt:
WARN  [org.apa.cxf.ws.rm.RMOutInterceptor] (...) sequence not set for outbound message, skipped acknowledgement request

Does it point at something we need to fix?

And the same for

WARN  [org.apa.cxf.ws.add.ContextUtils] (RMManager-Timer-1648152878) WS-Addressing - failed to retrieve Message Addressing Properties from context

and also for

WARN  [org.apa.cxf.ws.add.soa.MAPCodec] (executor-thread-1) Failed to correlate message, aborting dispatch.
  1. This sequence of messages seems to be a good sign. It actually proves that the redelivery happens for the second message:
INFO  [org.apa.cxf.ws.rm.soa.RetransmissionQueueImpl] (RMManager-Timer-1648152878) Retransmitted message 1 in sequence urn:uuid:86847643-75fa-4902-9b89-57e73681dd7b
INFO  [org.apa.cxf.ws.rm.soa.RetransmissionQueueImpl] (RMManager-Timer-1648152878) WS-RM retransmission of message 2.

I wonder if there is a way to programmaticaly ensure that the re-delivery is happening? If there is no API we could spy on, then we could make the assertions against the log file (I did it something like that elsewhere).

  1. I have no idea anymore why I did the assertions about rmManager.getStore().getSourceSequences(endpointIdentifier).size(). Does it make any sense from your point of view?
    I wonder why there are two sequences for our endpointIdentifier?
    Should we perhaps better do some assertions against getCurrentMessageNr() of the sequences?

  2. So now apparently the decoupled endpoint is created under http://localhost:8081/services/wsrm/decoupled_endpoint It is not clear to me which side of the exchange is creating it? - Client or server? Could you please explain the mechanism how it is created?

WsReliableMessagingTest.txt

@ffang
Copy link
Contributor

ffang commented Sep 1, 2023

Hi @ppalaga ,

Sorry for the late reply, was on PTO and need more time to catch up.

I just sent a PR(ppalaga#1) against your 221202-ws-rm branch to refactor ws-rm test which aligns with the same idea from CXF to simulate message loss.

And for your questions

WARN  [org.apa.cxf.ws.rm.RMOutInterceptor] (...) sequence not set for outbound message, skipped acknowledgement request

This is OK.

WARN  [org.apa.cxf.ws.add.ContextUtils] (RMManager-Timer-1648152878) WS-Addressing - failed to retrieve Message Addressing Properties from context.
WARN  [org.apa.cxf.ws.add.soa.MAPCodec] (executor-thread-1) Failed to correlate message, aborting dispatch.

These two warn messages are from the testcase itself, more relavant to the extended RMStoreFeature, I don't see why we should use it in our test. We can just use the same way how CXF itself tests the message loss, please get details from the revised WsReliableMessagingTest.

I wonder if there is a way to programmaticaly ensure that the re-delivery is happening? If there is no API we could spy on, then we could make the assertions against the log file (I did it something like that elsewhere).

Yes, in the revised WsReliableMessagingTest we add extra in and out interceptors on the ws-rm client side to verify message loss and resend.

rmManager.getStore().getSourceSequences(endpointIdentifier).size(). Does it make any sense from your point of view?
I wonder why there are two sequences for our endpointIdentifier?
Should we perhaps better do some assertions against getCurrentMessageNr() of the sequences?

No idea either here, we can just use the way in revised WsReliableMessagingTest to verify the communicating messages.

So now apparently the decoupled endpoint is created under http://localhost:8081/services/wsrm/decoupled_endpoint It is not clear to me which side of the exchange is creating it? - Client or server? Could you please explain the mechanism how it is created?

Yes, decoupled endpoint is created and it's working now. The decoupled endpoint is created on the ws-rm client side and plays the role as a destination to receive ws-rm messages returned from the ws-rm server. the ws-rm exchange is created firstly on the ws-rm client side, you can see in the log the first message sent by the ws-rm client is the wsrm:CreateSequence message. Something like this

<soap:Header>
    <Action xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing" soap:mustUnderstand="1">http://schemas.xmlsoap.org/ws/2005/02/rm/CreateSequence</Action>
    <MessageID xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing" soap:mustUnderstand="1">urn:uuid:4091227b-8001-49b4-ad2a-0c012bd942ea</MessageID>
    <To xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing" soap:mustUnderstand="1">http://localhost:8081/services/wsrm/HelloWorld</To>
    <ReplyTo xmlns="http://schemas.xmlsoap.org/ws/2004/08/addressing" soap:mustUnderstand="1">
      <Address>http://localhost:8081/services/wsrm/decoupled_endpoint</Address>
    </ReplyTo>
  </soap:Header>
  <soap:Body>
    <wsrm:CreateSequence xmlns:wsrm="http://schemas.xmlsoap.org/ws/2005/02/rm" xmlns:ns2="http://schemas.xmlsoap.org/ws/2004/08/addressing">
      <wsrm:AcksTo>
        <ns2:Address>http://localhost:8081/services/wsrm/decoupled_endpoint</ns2:Address>
      </wsrm:AcksTo>
      <wsrm:Expires>PT0S</wsrm:Expires>
      <wsrm:Offer>
        <wsrm:Identifier>urn:uuid:007e6dac-7cf9-46ed-9586-120e38985b88</wsrm:Identifier>
        <wsrm:Expires>PT0S</wsrm:Expires>
      </wsrm:Offer>
    </wsrm:CreateSequence>
  </soap:Body>
</soap:Envelope>

And in the revised WsReliableMessagingTest, please see the comment, for the ws-rm client, the expected outbound message is like

        // Expected outbound:
        // CreateSequence
        // + 4 greetMe messages
        // + 2 resends

and the expected inbound message is like

        // Expected inbound:
        // createSequenceResponse
        // + 4 greetMeResponse actions (to original or resent)
        // + partial(202) responses

WS-RM is surely tricky and complicated, I'm happy we can make it works in the quarkus. Please let me know if you have further questions.

Best Regards
Freeman

@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 9, 2023

Hi @ffang, thanks for the improvements and sorry for the delay!

I can confirm the test works well with your changes.

I had to adapt it a bit (in 9da33a4) due to changes that happened in between.

Then I simplified your code a bit in 11ca287 which still worked.

Finally, I wanted to move the client from the test to the application under test in

b0f41b3

so that we can be sure that it also works in native mode. But when I moved it, it stopped working. The sequence of in messages is different:

[ERROR] Failures: 
[ERROR]   WsReliableMessagingTest.testTwowayMessageLoss:75 
expected: 
  ["http://docs.oasis-open.org/ws-rx/wsrm/200702/CreateSequenceResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse"]
 but was: 
  ["http://docs.oasis-open.org/ws-rx/wsrm/200702/CreateSequenceResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse",
      "https://quarkiverse.github.io/quarkiverse-docs/quarkus-cxf/test/ws-rm/WsrmHelloService/helloResponse"]

I am not able to figure out why. I believe I moved the whole setup correctly:

@ffang could you please have a look, what makes the flow different?

@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 10, 2023

I have added two more things in

9ed973c

to better mimic the original test setup: add WSAddressingFeature to the client, have separate RMFeature instances for client and service. I believe both are needed, but they are still not enough to let the tests pass. The inRecorder still gets two more responses.

@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 10, 2023

To make the original test with all bus setup and client fail in the same way, it is enough to let the test use the same Bus as the application: ppalaga@1604701

I am far from understanding why this causes the inRecorder to capture more messages. It is set directly on the client https://github.com/ppalaga/quarkus-cxf/blob/i627-default-bus/integration-tests/ws-rm/src/test/java/io/quarkiverse/cxf/it/ws/rm/server/WsReliableMessagingTest.java#L72-L73 and should have nothing to do with the bus. @ffang could you please help me to understand what is going on here? Does perhaps the adressing feature behave differently when the client and service are in the same bus?

@ffang
Copy link
Contributor

ffang commented Dec 13, 2023

Hi @ppalaga ,

Thanks for the update and detailed info! Another very good question!

The difference you observed comes from the shared bus between client and server for sure, which won't happen in the real scenario! And in CXF tests it's common that we are very careful about the shared bus between the client and server, normally we need to use different buses or even fork process(in JVM or another JVM) for the server for the test purpose for complex scenarios.

There is an extension concept in CXF bus, and the extension is distinguished by class type

public final <T> T getExtension(Class<T> extensionType) {

So it can't be multiple instances of a certain type. So if the client and server share the same bus, which means the extensions set on client and server separately can conflict with each other and this can mess up the behaviour.

Back to this testcase, IIRC, we actually use different RMFeature for the client(new RMFeature instance in the test code directly) and server(from CDI produce) side, and the fine-tuned AcknowledgementInterval and BaseRetransmissionInterval are different on server and client side, and they determine the complex message flows between client and server. That's why we need different buses for client and server.

Best Regards
Freeman

ppalaga pushed a commit to ppalaga/quarkus-cxf that referenced this issue Dec 14, 2023
@ppalaga
Copy link
Contributor Author

ppalaga commented Dec 15, 2023

Thanks a lot @ffang, another super-helpful answer!

The difference you observed comes from the shared bus between client and server for sure, which won't happen in the real scenario! And in CXF tests it's common that we are very careful about the shared bus between the client and server, normally we need to use different buses or even fork process(in JVM or another JVM) for the server for the test purpose for complex scenarios.

There is an extension concept in CXF bus, and the extension is distinguished by class type

public final <T> T getExtension(Class<T> extensionType) {

So it can't be multiple instances of a certain type. So if the client and server share the same bus, which means the extensions set on client and server separately can conflict with each other and this can mess up the behaviour.

Yeah, this explains a lot. Looking into RMFeature, I see that a lot of stuff is set on the RMManager obtained from the bus. Even if we have two separate features for the client and server, they clearly clash, because all happens on the same bus.

I wonder how much limiting it is that Quarkus CXF currently supports just a single bus?

Based on your advice, I was able to refactor the test into two separate apps (server and client) that communicate with each other. It works both in JVM and native mode! - see #1139

I have filed some followups:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants