Skip to content

Commit

Permalink
Revert "Fix read/write to send header separately to match numbers. Fe…
Browse files Browse the repository at this point in the history
…d-to-Fed not done yet."

This reverts commit a3887e9.
  • Loading branch information
Jakio815 committed Feb 12, 2025
1 parent b92b737 commit 62bad67
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 48 deletions.
35 changes: 12 additions & 23 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// This function is called in notify_advance_grant_if_safe(), which is a long
// function. During this call, the network driver might close, causing the following write_to_netdrv
// to fail. Consider a failure here a soft failure and update the federate's status.
if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) ||
write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) {
if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) {
lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
Expand Down Expand Up @@ -109,8 +108,7 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// This function is called in notify_advance_grant_if_safe(), which is a long
// function. During this call, the network driver might close, causing the following write_to_netdrv
// to fail. Consider a failure here a soft failure and update the federate's status.
if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) ||
write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) {
if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) {
lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
Expand Down Expand Up @@ -167,8 +165,7 @@ void notify_downstream_next_event_tag(scheduling_node_t* e, tag_t tag) {
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_DNET, e->id, &tag);
}
if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, 1, buffer) ||
write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length - 1, buffer + 1)) {
if (write_to_netdrv(((federate_info_t*)e)->fed_netdrv, message_length, buffer)) {
lf_print_error("RTI failed to send downstream next event tag to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
Expand Down Expand Up @@ -239,17 +236,16 @@ void handle_port_absent_message(federate_info_t* sending_federate, unsigned char
}

// Forward the message.
write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, buffer, &rti_mutex, "RTI failed to forward message to federate %d.",
federate_id);
write_to_netdrv_fail_on_error(fed->fed_netdrv, message_size, buffer + 1, &rti_mutex,
write_to_netdrv_fail_on_error(fed->fed_netdrv, message_size + 1, buffer, &rti_mutex,
"RTI failed to forward message to federate %d.", federate_id);

LF_MUTEX_UNLOCK(&rti_mutex);
}

void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer) {
size_t header_size = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t);
read_from_netdrv_fail_on_error(sending_federate->fed_netdrv, header_size, &(buffer[1]), NULL,
size_t header_size = 1 + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t);
// Read the header, minus the first byte which has already been read.
read_from_netdrv_fail_on_error(sending_federate->fed_netdrv, header_size - 1, &(buffer[1]), NULL,
"RTI failed to read the timed message header from remote federate.");
// Extract the header information. of the sender
uint16_t reactor_port_id;
Expand Down Expand Up @@ -335,9 +331,8 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_TAGGED_MSG, federate_id, &intended_tag);
}
write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, buffer, &rti_mutex,
"RTI failed to forward message header to federate %d.", federate_id);
write_to_netdrv_fail_on_error(fed->fed_netdrv, bytes_read - 1, buffer + 1, &rti_mutex,

write_to_netdrv_fail_on_error(fed->fed_netdrv, bytes_read, buffer, &rti_mutex,
"RTI failed to forward message to federate %d.", federate_id);

// The message length may be longer than the buffer,
Expand Down Expand Up @@ -463,10 +458,7 @@ static void broadcast_stop_time_to_federates_locked() {
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_STOP_GRN, fed->enclave.id, &rti_remote->base.max_stop_tag);
}
write_to_netdrv_fail_on_error(fed->fed_netdrv, 1, outgoing_buffer, &rti_mutex,
"RTI failed to send MSG_TYPE_STOP_GRANTED message header to federate %d.",
fed->enclave.id);
write_to_netdrv_fail_on_error(fed->fed_netdrv, MSG_TYPE_STOP_GRANTED_LENGTH - 1, outgoing_buffer + 1, &rti_mutex,
write_to_netdrv_fail_on_error(fed->fed_netdrv, MSG_TYPE_STOP_GRANTED_LENGTH, outgoing_buffer, &rti_mutex,
"RTI failed to send MSG_TYPE_STOP_GRANTED message to federate %d.", fed->enclave.id);
}

Expand Down Expand Up @@ -586,11 +578,8 @@ void handle_stop_request_message(federate_info_t* fed) {
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_STOP_REQ, f->enclave.id, &rti_remote->base.max_stop_tag);
}
write_to_netdrv_fail_on_error(f->fed_netdrv, 1, stop_request_buffer, &rti_mutex,
"RTI failed to forward MSG_TYPE_STOP_REQUEST message header to federate %d.",
f->enclave.id);
write_to_netdrv_fail_on_error(f->fed_netdrv, MSG_TYPE_STOP_REQUEST_LENGTH - 1, stop_request_buffer + 1,
&rti_mutex, "RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.",
write_to_netdrv_fail_on_error(f->fed_netdrv, MSG_TYPE_STOP_REQUEST_LENGTH, stop_request_buffer, &rti_mutex,
"RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.",
f->enclave.id);
}
}
Expand Down
37 changes: 12 additions & 25 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ static void send_time(unsigned char type, instant_t time) {
tracepoint_federate_to_rti(send_TIMESTAMP, _lf_my_fed_id, &tag);

LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex);
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex,
"Failed to send MSG_TYPE_TIMESTAMP header.");
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write - 1, buffer + 1, &lf_outbound_netdrv_mutex,
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write, buffer, &lf_outbound_netdrv_mutex,
"Failed to send time " PRINTF_TIME " to the RTI.", time - start_time);
LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex);
}
Expand All @@ -140,8 +138,7 @@ static void send_tag(unsigned char type, tag_t tag) {
trace_event_t event_type = (type == MSG_TYPE_NEXT_EVENT_TAG) ? send_NET : send_LTC;
// Trace the event when tracing is enabled
tracepoint_federate_to_rti(event_type, _lf_my_fed_id, &tag);
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex, "Failed to send tag header.");
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write - 1, buffer + 1, &lf_outbound_netdrv_mutex,
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, bytes_to_write, buffer, &lf_outbound_netdrv_mutex,
"Failed to send tag " PRINTF_TAG " to the RTI.", tag.time - start_time, tag.microstep);
LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex);
}
Expand Down Expand Up @@ -1420,9 +1417,7 @@ static void handle_stop_request_message() {

// Send the current logical time to the RTI.
LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex);
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, outgoing_buffer, &lf_outbound_netdrv_mutex,
"Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI.");
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH - 1, outgoing_buffer + 1,
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_REPLY_LENGTH, outgoing_buffer,
&lf_outbound_netdrv_mutex,
"Failed to send the answer to MSG_TYPE_STOP_REQUEST to RTI.");
LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex);
Expand Down Expand Up @@ -1715,9 +1710,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
tracepoint_federate_to_rti(send_ADR_QR, _lf_my_fed_id, NULL);

LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex);
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex,
"Failed to send address query header for federate %d to RTI.", remote_federate_id);
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(uint16_t), buffer + 1, &lf_outbound_netdrv_mutex,
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(uint16_t) + 1, buffer, &lf_outbound_netdrv_mutex,
"Failed to send address query for federate %d to RTI.", remote_federate_id);
LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex);

Expand Down Expand Up @@ -1977,9 +1970,7 @@ void lf_create_server(int specified_port) {
tracepoint_federate_to_rti(send_ADR_AD, _lf_my_fed_id, NULL);

// No need for a mutex because we have the only handle on this network driver.
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, (unsigned char*)buffer, NULL,
"Failed to send address advertisement header.");
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(int32_t), (unsigned char*)buffer + 1, NULL,
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, sizeof(int32_t) + 1, (unsigned char*)buffer, NULL,
"Failed to send address advertisement.");

LF_PRINT_DEBUG("Sent port %d to the RTI.", server_port);
Expand Down Expand Up @@ -2211,8 +2202,7 @@ int lf_send_message(int message_type, unsigned short port, unsigned short federa
// Trace the event when tracing is enabled
tracepoint_federate_to_federate(send_P2P_MSG, _lf_my_fed_id, federate, NULL);

int result = write_to_netdrv_close_on_error(netdrv, 1, header_buffer);
result = write_to_netdrv_close_on_error(netdrv, header_length - 1, header_buffer + 1);
int result = write_to_netdrv_close_on_error(netdrv, header_length, header_buffer);
if (result == 0) {
// Header sent successfully. Send the body.
result = write_to_netdrv_close_on_error(netdrv, length, message);
Expand Down Expand Up @@ -2420,8 +2410,7 @@ void lf_send_port_absent_to_federate(environment_t* env, interval_t additional_d
}

LF_MUTEX_LOCK(&lf_outbound_netdrv_mutex);
int result = write_to_netdrv_close_on_error(netdrv, 1, buffer);
result = write_to_netdrv_close_on_error(netdrv, message_length - 1, buffer + 1);
int result = write_to_netdrv_close_on_error(netdrv, message_length, buffer);
LF_MUTEX_UNLOCK(&lf_outbound_netdrv_mutex);

if (result != 0) {
Expand Down Expand Up @@ -2458,11 +2447,9 @@ int lf_send_stop_request_to_rti(tag_t stop_tag) {
}
// Trace the event when tracing is enabled
tracepoint_federate_to_rti(send_STOP_REQ, _lf_my_fed_id, &stop_tag);
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, 1, buffer, &lf_outbound_netdrv_mutex,
"Failed to send stop request header.");
write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_LENGTH - 1, buffer + 1,
&lf_outbound_netdrv_mutex, "Failed to send stop time " PRINTF_TIME " to the RTI.",
stop_tag.time - start_time);

write_to_netdrv_fail_on_error(_fed.netdrv_to_RTI, MSG_TYPE_STOP_REQUEST_LENGTH, buffer, &lf_outbound_netdrv_mutex,
"Failed to send stop time " PRINTF_TIME " to the RTI.", stop_tag.time - start_time);

// Treat this sending as equivalent to having received a stop request from the RTI.
_fed.received_stop_request_from_rti = true;
Expand Down Expand Up @@ -2536,8 +2523,8 @@ int lf_send_tagged_message(environment_t* env, interval_t additional_delay, int
if (lf_tag_compare(_fed.last_DNET, current_message_intended_tag) > 0) {
_fed.last_DNET = current_message_intended_tag;
}
int result = write_to_netdrv_close_on_error(netdrv, 1, header_buffer);
result = write_to_netdrv_close_on_error(netdrv, header_length - 1, header_buffer + 1);

int result = write_to_netdrv_close_on_error(netdrv, header_length, header_buffer);
if (result == 0) {
// Header sent successfully. Send the body.
result = write_to_netdrv_close_on_error(netdrv, length, message);
Expand Down

0 comments on commit 62bad67

Please sign in to comment.