Skip to content

Commit

Permalink
Handles unhandled-addressed-messages by generating an OIR reply. (#798)
Browse files Browse the repository at this point in the history
This PR fixes a stadnards compliance issue. The standard requires that when an addressed message arrives for a node which is not equipped for handling said message (including any unknown but addressed MTI arriving), then an Optional IOnteraction Rejected (OIR) message has to be sent back to the originating node, identifying that this target node is not able to handle that message.

- Adds a "fallback handler" to the Dispatcher object. The fallback handler is invoked when no handler matched on the incoming message.
- Implements a handler that responds with an OIR for all local nodes. Registers this handler as the fallback handler for the MessageDIspatcher in IfCan and IfTcp. This OIR response will be suppressed for incoming OIR and TDE errors to avoid infinite loops of error messages. (In a normal setup OpenMRN does not listen to these messages. Without this case two OpenMRN nodes will have an infinite message loop chatting with each other sending OIR messages back and forth.)

Fixes some bugs exposed by this behavior.
- The TCP parser had forgotten to clear the message it was filling, causing messages to unexpectedly be marked as addressed messages.
- A number of tests had issues when OIR messages were generated, this PR fixes these issues as well. Sometimes the MTI was mistyped, sometimes a message was verified at the CAN level but not actually processed at the destination node.

===

* Adds fallback handler to the dispatcher object.

* Adds test for fallback handler feature.

* Adds helper functions to generate OIR payload.

* Adds handler for sending back OIR when an addressed but unhandled message arrives.

* Instantiates the flow for unhandled addressed messages in IfCan and IfTcp.

* Updates tests to correctly deal with generated OIR messages.

* Adds a NullErrorHandler so that OIR/TDE messages never generate an OIR reply.

* Adds debug printouts.

* Fixes MTI on messages sent in this test.

* Adds a test for generated OIRs.

* Adds missing Clear command to the TCP parser code.

* Fixes compile error in these macros.

* Adds handlers for stream messages coming into the two-node-stream setup.
This is needed to avoid generating OIR replies due to missing handlers.

* fix whitespace

* Adds documentation comment.

* Removes NullErrorHandler. Instead, prevents generating an OIR for any
incoming unhandled error message (OIR and TDE).

* Fix/add comments.
  • Loading branch information
balazsracz authored Sep 7, 2024
1 parent a7dd545 commit 5ad34b1
Show file tree
Hide file tree
Showing 16 changed files with 228 additions and 12 deletions.
21 changes: 21 additions & 0 deletions src/executor/Dispatcher.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,27 @@ TEST_F(DispatcherTest, TestMultiplehandlers)
wait();
}

TEST_F(DispatcherTest, TestFallbackHandler)
{
StrictMock<MockCanMessageHandler> h1;
f_.register_handler(&h1, 1, 0xFFUL);
StrictMock<MockCanMessageHandler> h2;
f_.register_handler(&h2, 257, 0x1FFFFFFFUL);
StrictMock<MockCanMessageHandler> hfb;
f_.register_fallback_handler(&hfb);

EXPECT_CALL(h1, handle_message(257, _));
EXPECT_CALL(h1, handle_message(1, _));
EXPECT_CALL(h2, handle_message(257, _));
send_message(257);
send_message(1);
wait();

EXPECT_CALL(hfb, handle_message(2, _));
send_message(2);
wait();
}

/*TEST_F(DispatcherTest, TestAsync)
{
StrictMock<MockCanMessageHandler> h1;
Expand Down
32 changes: 30 additions & 2 deletions src/executor/Dispatcher.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ protected:
/// is the handler to unregister from all instances.
void unregister_handler_all(UntypedHandler *handler);

/// Sets one handler to receive all messages that no other handler has
/// matched. May be called only once in the lifetime of a dispatcher
/// object. @param handler is the handler pointer for the fallback handler.
void register_fallback_handler(UntypedHandler *handler)
{
HASSERT(!fallbackHandler_);
fallbackHandler_ = handler;
}

/// Returns the current message's ID.
virtual ID get_message_id() = 0;

Expand Down Expand Up @@ -180,7 +189,10 @@ private:

protected:
/// If non-NULL we still need to call this handler.
UntypedHandler *lastHandlerToCall_;
UntypedHandler *lastHandlerToCall_{nullptr};
/// Handler to give all messages that were not matched by any other handler
/// registration.
UntypedHandler *fallbackHandler_{nullptr};
private:
/// Protects handler add / remove against iteration.
OSMutex lock_;
Expand Down Expand Up @@ -256,10 +268,19 @@ public:
}

/// Removes all instances of a handler from this dispatcher.
void unregister_handler_all(HandlerType *handler) {
void unregister_handler_all(HandlerType *handler)
{
Base::unregister_handler_all(handler);
}

/// Sets one handler to receive all messages that no other handler has
/// matched. May be called only once in the lifetime of a dispatcher
/// object. @param handler is the handler pointer for the fallback handler.
void register_fallback_handler(HandlerType *handler)
{
Base::register_fallback_handler(handler);
}

protected:
/// @return the identifier bits of the current message.
typename Base::ID get_message_id() OVERRIDE {
Expand Down Expand Up @@ -460,6 +481,13 @@ StateFlowBase::Action DispatchFlowBase<NUM_PRIO>::iteration_done()
{
send_transfer();
}
else if (fallbackHandler_)
{
// Nothing handled this message, and we have a fallbac handler
// registered. Gives the message to the fallback handler.
lastHandlerToCall_ = fallbackHandler_;
send_transfer();
}
return release_and_exit();
}

Expand Down
6 changes: 6 additions & 0 deletions src/openlcb/Convert.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ extern void append_error_to_buffer(uint16_t error_code, Payload *p);
extern void buffer_to_error(const Payload &payload, uint16_t *error_code,
uint16_t *mti, string *error_message);

/// Generates the payload for an OIR or TDE message.
/// @param error_code the 16-bit ErrorCodes value.
/// @param incoming_mti the MTI of the message that this error should refer
/// to.
extern Payload error_payload(uint16_t error_code, Defs::MTI incoming_mti);

/** A global class / variable for empty or not-yet-initialized payloads. */
extern string EMPTY_PAYLOAD;

Expand Down
7 changes: 5 additions & 2 deletions src/openlcb/DatagramCan.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -407,10 +407,13 @@ TEST_F(AsyncDatagramTest, DoubleReply)
wait();
wait();
send_packet(":X19A2877CN022A00;"); // Received OK
send_packet(":X19A2877CN022A00;"); // Received OK dup
wait(); // will unregister handlers
send_packet_and_expect_response(
":X19A2877CN022A00;", ":X1906822AN077C10400A28;"); // Received OK dup
wait_for_notification();
// Releases client.
send_packet(":X19A2877CN022A00;"); // Received OK dup
send_packet_and_expect_response(
":X19A2877CN022A00;", ":X1906822AN077C10400A28;"); // Received OK dup
wait();
datagram_support_.client_allocator()->insert(c);
}
Expand Down
6 changes: 6 additions & 0 deletions src/openlcb/DatagramTcp.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ class TcpDatagramTestBase : public MultiTcpIfTest
protected:
TcpDatagramTestBase()
{
LOG(INFO, "Add client 0");
add_client(REMOTE_NODE_ID + 0);
LOG(INFO, "Add client 1");
add_client(REMOTE_NODE_ID + 1);
LOG(INFO, "Add node nc");
create_new_node(&nc_, TEST_NODE_ID, &ifTcp_);
LOG(INFO, "Add node nc2");
create_new_node(&nc2_, TEST_NODE_ID + 1, &ifTcp_);
LOG(INFO, "Add node n0");
create_new_node(&n0_, REMOTE_NODE_ID + 0, &clients_[0]->ifTcp_);
LOG(INFO, "Add node n1");
create_new_node(&n1_, REMOTE_NODE_ID + 1, &clients_[1]->ifTcp_);
}

Expand Down
9 changes: 9 additions & 0 deletions src/openlcb/If.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*/

#include "openlcb/If.hxx"
#include "openlcb/Convert.hxx"

/// Ensures that the largest bucket in the main buffer pool is exactly the size
/// of a GenMessage.
Expand Down Expand Up @@ -131,6 +132,14 @@ void buffer_to_error(const Payload &payload, uint16_t *error_code,
}
}

Payload error_payload(uint16_t error_code, Defs::MTI incoming_mti)
{
Payload p(4, 0);
error_to_data(error_code, &p[0]);
error_to_data(incoming_mti, &p[2]);
return p;
}

void send_event(Node* src_node, uint64_t event_id)
{
auto *b = src_node->iface()->global_message_write_flow()->alloc();
Expand Down
1 change: 1 addition & 0 deletions src/openlcb/IfCan.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,7 @@ IfCan::IfCan(ExecutorBase *executor, CanHubFlow *device,
add_owned_flow(new AliasConflictHandler(this));
add_owned_flow(new FrameToGlobalMessageParser(this));
add_owned_flow(new VerifyNodeIdHandler(this));
add_owned_flow(new UnhandledAddressedMessageHandler(this));
add_owned_flow(new RemoteAliasCacheUpdater(this));
add_owned_flow(new AMEQueryHandler(this));
add_owned_flow(new AMEGlobalQueryHandler(this));
Expand Down
7 changes: 7 additions & 0 deletions src/openlcb/IfCan.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,13 @@ TEST_F(AsyncNodeTest, PassAddressedMessageToIfWithPayloadUnknownSource)
wait();
}

TEST_F(AsyncNodeTest, OIROnUnknownMTI)
{
send_packet_and_expect_response(":X19948210N022A;",
":X1906822AN021010400948;");
wait();
}

TEST_F(AsyncNodeTest, SendAddressedMessageToAlias)
{
static const NodeAlias alias = 0x210U;
Expand Down
56 changes: 56 additions & 0 deletions src/openlcb/IfImpl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,62 @@ private:
If::VNodeMap::Iterator it_;
#endif
};

/// Message handler that is registered as a fallback handler in the interface's
/// message dispatcher. This means that all incoming messages that were not
/// matching the MTI / mask of any existing instantiated handler will end up
/// here.
///
/// The standard requires that when an addressed message is not handled by a
/// node (e.g. because it does not know about the MTI at all), then an Optional
/// Interaction Rejected reply be sent to the originator. This flow generates
/// that OIR reply.
class UnhandledAddressedMessageHandler : public IncomingMessageStateFlow
{
public:
UnhandledAddressedMessageHandler(If *service)
: IncomingMessageStateFlow(service)
{
iface()->dispatcher()->register_fallback_handler(this);
}

/// Handler callback for incoming messages.
Action entry() override
{
if (!message()->data()->dstNode)
{
// Destination is not a local node.
return release_and_exit();
}
auto mti = message()->data()->mti;
if (mti == Defs::MTI_OPTIONAL_INTERACTION_REJECTED ||
mti == Defs::MTI_TERMINATE_DUE_TO_ERROR) {
// We don't generate an OIR for an incoming error report, as this
// generally would cause an infinite bouncing of error reports back
// and forth between two OpenMRN nodes.
return release_and_exit();
}
return allocate_and_call(
iface()->addressed_message_write_flow(), STATE(fill_oir));
}

/// Called after the message buffer allocation is complete. Fills in the
/// outgoing Optional Interaction Rejected message and sends it off to the
/// write flow.
Action fill_oir()
{
auto rb = get_buffer_deleter(
get_allocation_result(iface()->addressed_message_write_flow()));
GenMessage *inm = message()->data();
GenMessage *outm = rb->data();
outm->reset(Defs::MTI_OPTIONAL_INTERACTION_REJECTED,
inm->dstNode->node_id(), inm->src,
error_payload(Defs::ERROR_UNIMPLEMENTED, inm->mti));
iface()->addressed_message_write_flow()->send(rb.release());
return release_and_exit();
}
};

} // namespace openlcb

#endif // _OPENLCB_IFIMPL_HXX_
1 change: 1 addition & 0 deletions src/openlcb/IfTcp.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ IfTcp::IfTcp(NodeID gateway_node_id, HubFlow *device, int local_nodes_count)
, device_(device)
{
add_owned_flow(new VerifyNodeIdHandler(this));
add_owned_flow(new UnhandledAddressedMessageHandler(this));
seq_ = new ClockBaseSequenceNumberGenerator;
add_owned_flow(seq_);
auto filter = new LocalMessageFilter(this);
Expand Down
1 change: 1 addition & 0 deletions src/openlcb/IfTcpImpl.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public:
{
return false;
}
tgt->clear();
HASSERT((flags & FLAGS_CHAINING) == 0);
tgt->flagsDst = 0;
tgt->flagsSrc = 0;
Expand Down
56 changes: 56 additions & 0 deletions src/openlcb/MemoryConfigStream.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,23 @@ TEST_F(MemoryConfigTest, two_node)
twait();
clear_expect(true);

StrictMock<MockMessageHandler> handler;

expect_packet(":X1B22A225N20600000000228FF;");
expect_packet(":X1D22A225N43FFFFFFFF;");

otherIfCan_->dispatcher()->register_handler(
&handler, Defs::MTI_STREAM_INITIATE_REQUEST, Defs::MTI_EXACT);

// stream initiate request, SID 0x02 DID 0x43 buffer infinite
EXPECT_CALL(handler,
handle_message(Pointee(AllOf(Field(&GenMessage::mti,
Defs::MTI_STREAM_INITIATE_REQUEST),
Field(&GenMessage::dstNode, otherNode_.get()),
Field(&GenMessage::payload,
IsBufferNodeValueString(0xFFFF00000243ULL)) //,
)),
_));
expect_packet(":X19CC822AN0225FFFF00000243;");

// Read stream request, space 0x28, offset 2, length infinite, dst stream
Expand All @@ -274,6 +288,8 @@ TEST_F(MemoryConfigTest, two_node)

wait();
clear_expect(true);
Mock::VerifyAndClear(&handler);
otherIfCan_->dispatcher()->unregister_handler_all(&handler);
EXPECT_FALSE(datagramDoneBn_.is_done());

// stream initiate reply
Expand All @@ -299,11 +315,24 @@ TEST_F(MemoryConfigTest, two_node)
expect_packet(":X1F22522AN4302030405060708;");
expect_packet(":X1F22522AN43090A0B0C0D0E;");

otherIfCan_->dispatcher()->register_handler(
&handler, Defs::MTI_STREAM_COMPLETE, Defs::MTI_EXACT);

// stream complete, SID 02 DID 43 sent bytes 13
EXPECT_CALL(handler,
handle_message(
Pointee(AllOf(Field(&GenMessage::mti, Defs::MTI_STREAM_COMPLETE),
Field(&GenMessage::dstNode, otherNode_.get()),
Field(&GenMessage::payload,
IsBufferNodeValueString(0x02430000000D)) //,
)),
_));
expect_packet(":X198A822AN022502430000000D;");

twait();
clear_expect(true);
Mock::VerifyAndClear(&handler);
otherIfCan_->dispatcher()->unregister_handler_all(&handler);
EXPECT_TRUE(datagramDoneBn_.is_done());
}

Expand All @@ -314,9 +343,22 @@ TEST_F(MemoryConfigTest, length_limit)
twait();
clear_expect(true);

StrictMock<MockMessageHandler> handler;

expect_packet(":X1B22A225N20600000000228FF;");
expect_packet(":X1D22A225N4300000009;");

otherIfCan_->dispatcher()->register_handler(
&handler, Defs::MTI_STREAM_INITIATE_REQUEST, Defs::MTI_EXACT);
// stream initiate request, SID 0x02 DID 0x43 buffer infinite
EXPECT_CALL(handler,
handle_message(Pointee(AllOf(Field(&GenMessage::mti,
Defs::MTI_STREAM_INITIATE_REQUEST),
Field(&GenMessage::dstNode, otherNode_.get()),
Field(&GenMessage::payload,
IsBufferNodeValueString(0xFFFF00000243ULL)) //,
)),
_));
expect_packet(":X19CC822AN0225FFFF00000243;");

// Read stream request, space 0x28, offset 2, length 9, dst stream
Expand All @@ -325,6 +367,8 @@ TEST_F(MemoryConfigTest, length_limit)

wait();
clear_expect(true);
Mock::VerifyAndClear(&handler);
otherIfCan_->dispatcher()->unregister_handler_all(&handler);
EXPECT_FALSE(datagramDoneBn_.is_done());

// stream initiate reply
Expand All @@ -350,11 +394,23 @@ TEST_F(MemoryConfigTest, length_limit)
expect_packet(":X1F22522AN4302030405060708;");
expect_packet(":X1F22522AN43090A;");

otherIfCan_->dispatcher()->register_handler(
&handler, Defs::MTI_STREAM_COMPLETE, Defs::MTI_EXACT);
// stream complete, SID 02 DID 43 sent bytes 13
EXPECT_CALL(handler,
handle_message(
Pointee(AllOf(Field(&GenMessage::mti, Defs::MTI_STREAM_COMPLETE),
Field(&GenMessage::dstNode, otherNode_.get()),
Field(&GenMessage::payload,
IsBufferNodeValueString(0x024300000009)) //,
)),
_));
expect_packet(":X198A822AN0225024300000009;");

twait();
clear_expect(true);
Mock::VerifyAndClear(&handler);
otherIfCan_->dispatcher()->unregister_handler_all(&handler);
EXPECT_TRUE(datagramDoneBn_.is_done());
}

Expand Down
13 changes: 12 additions & 1 deletion src/openlcb/SNIPClient.cxxtest
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,22 @@ TEST_F(SNIPClientTest, remote)
TEST_F(SNIPClientTest, timeout)
{
long long start = os_get_time_monotonic();
auto b = invoke_flow(&client_, &nodeTwo_, NodeHandle(nodeTwo_.node_id()));
auto b = invoke_flow(&client_, &nodeTwo_, NodeHandle(NodeAlias(0x123)));
EXPECT_EQ(SNIPClientRequest::OPENMRN_TIMEOUT, b->data()->resultCode);
EXPECT_EQ(0u, b->data()->response.size());
long long time = os_get_time_monotonic() - start;
EXPECT_LT(MSEC_TO_NSEC(49), time);
twait();
}

TEST_F(SNIPClientTest, self_reject)
{
// Sending to self on nodeTwo will get an OIR rejection from the stack
// because there is no SNIP handler there.
auto b = invoke_flow(&client_, &nodeTwo_, NodeHandle(nodeTwo_.node_id()));
EXPECT_EQ(SNIPClientRequest::ERROR_REJECTED | Defs::ERROR_UNIMPLEMENTED,
b->data()->resultCode);
EXPECT_EQ(0u, b->data()->response.size());
}

TEST_F(SNIPClientTest, reject)
Expand Down
Loading

0 comments on commit 5ad34b1

Please sign in to comment.