From d73444170d215da82d829cdbc631eefabfcfc211 Mon Sep 17 00:00:00 2001 From: jgmonroy Date: Fri, 1 Apr 2016 09:17:40 +0200 Subject: [PATCH] Merged conflicts --- AutoDocking_Laser/CDockingApp.cpp | 4 +- MQTTMosquitto/CMQTTMosquitto.cpp | 138 +++++++++++++++++++++--------- MQTTMosquitto/CMQTTMosquitto.h | 5 ++ MQTTMosquitto/Constants.cpp | 2 +- SpeechSynth/CSpeechSynth.cpp | 2 + 5 files changed, 109 insertions(+), 42 deletions(-) diff --git a/AutoDocking_Laser/CDockingApp.cpp b/AutoDocking_Laser/CDockingApp.cpp index 29b70ee..6ecd3b2 100644 --- a/AutoDocking_Laser/CDockingApp.cpp +++ b/AutoDocking_Laser/CDockingApp.cpp @@ -602,12 +602,12 @@ void CDockingApp::Obtener_puntos_plano() obs->getSensorPose(laser_pose); // Is laser rolled? - printf( "Laser Roll is: %.3f, diff=%.3f\n",laser_pose.roll(),std::abs(double(std::abs(laser_pose.roll())-PI) ) ); + //printf( "Laser Roll is: %.3f, diff=%.3f\n",laser_pose.roll(),std::abs(double(std::abs(laser_pose.roll())-PI) ) ); if( std::abs(double(std::abs(laser_pose.roll())-PI) ) < 0.01 ) { // Scans ar not from right to left!, but left to right! std::reverse(obs->scan.begin(),obs->scan.end()); - printf( "REVERSING LASER SCAN" ); + //printf( "REVERSING LASER SCAN" ); } diff --git a/MQTTMosquitto/CMQTTMosquitto.cpp b/MQTTMosquitto/CMQTTMosquitto.cpp index 98cd070..32e25b9 100644 --- a/MQTTMosquitto/CMQTTMosquitto.cpp +++ b/MQTTMosquitto/CMQTTMosquitto.cpp @@ -38,6 +38,7 @@ -------------------------------------------------------------- Robot_Name/ClientACK -->CLIENT_MQTT_ACK + Robot_Name/ServerACK -->(None, just for checking times) Robot_Name/TopologyCommand -GetTopology --> (Send Variable GRAPH in Topic Topology) @@ -84,8 +85,7 @@ */ -#include "CMQTTMosquitto.h" -//#include "mosquitto.h" +#include "CMQTTmosquitto.h" #include #include "Constants.h" @@ -100,7 +100,8 @@ using namespace mrpt; using namespace mrpt::utils; using namespace mrpt::obs; -CMQTTMosquitto::CMQTTMosquitto(): mosquittopp( ("Giraff_"+mrpt::system::dateTimeToString(mrpt::system::now())).c_str() ,false) +//Create a MQTT connection with a clean session (no logs on the broker) +CMQTTMosquitto::CMQTTMosquitto(): mosquittopp( ("Giraff_"+mrpt::system::dateTimeToString(mrpt::system::now())).c_str() ,true) { } @@ -114,6 +115,9 @@ bool CMQTTMosquitto::OnStartUp() // Read Module Parameters //------------------------ + //! @moos_param ACKRate The rate (Hz) of sendign the ACK msg to the client + ACKRate = m_ini.read_float("", "ACKRate", 2.0, false); + //! @moos_param localizationRate The rate (Hz) of sendign the localization to the Pilot client localizationRate = m_ini.read_float("","localizationRate",10.0,false); @@ -221,13 +225,19 @@ bool CMQTTMosquitto::OnStartUp() // Connect the Mosquitto Client on_connect(connect(broker_host.c_str(), broker_port, Constants::keepalive_secs)); //Subscribe to a list of topics - on_subscribe(NULL,2,NULL); + on_subscribe(NULL,0,NULL); //Init timeStamps timeStamp_last_loc = mrpt::system::now(); timeStamp_last_laser = mrpt::system::now(); - + timestampClientACK = 0; + timestampServerACK = 0; + timestampRobotACK = mrpt::system::now(); + //Init counters of ACK + counterClientACK = 0; + counterServerACK = 0; + counterRobotACK = 0; is_motion_command_set = false; //indicates (true/false) if a motion command is active (To control robot from client) return DoRegistrations(); @@ -294,7 +304,7 @@ bool CMQTTMosquitto::Iterate() // Connect the Mosquitto Client on_connect(connect(broker_host.c_str(), broker_port, Constants::keepalive_secs)); //Subscribe to a list of topics - on_subscribe(NULL,2,NULL); + on_subscribe(NULL,0,NULL); } @@ -306,8 +316,9 @@ bool CMQTTMosquitto::Iterate() { //compute time difference from last motion command double seconds = mrpt::system::timeDifference(time_last_motion_command,mrpt::system::now()); - if (seconds > 0.2) + if (seconds > 0.5) { + printf("[MQTT]: Motion command Timed-Out (500ms) - sending CANCEL_NAVIGATION\n "); //Cancel motion is_motion_command_set = false; //! @moos_publish CANCEL_NAVIGATION Cancel the current navigation and return control to client (Pilot) @@ -316,6 +327,20 @@ bool CMQTTMosquitto::Iterate() } + // Send robotACK + //--------------- + if (mrpt::system::timeDifference(timestampRobotACK, mrpt::system::now()) > (1 / ACKRate)) + { + // robotACK = "LastClientACK|RobotACK|lastServerACK" + // All ACK msgs has the format: "counter timestamp" + timestampRobotACK = mrpt::system::now(); + std::string message = format("%u %I64u|%u %I64u|%u %I64u", counterClientACK, timestampClientACK, counterRobotACK, timestampRobotACK, counterServerACK, timestampServerACK); + + on_publish(NULL, (broker_username + "/" + "RobotACK").c_str(), strlen(message.c_str()), message.c_str()); + counterRobotACK++; + } + + // Send robot STATUS //------------------- // LOCALIZATION + TOPOLOGICAL_PLACE + TOPOLOGICAL_DESTINY + CONTROL_MODE + BATTERY_V_FLOAT + Is_Charging + RANDOM_NAVIGATOR + PARKING + COLLABORATIVE @@ -642,30 +667,33 @@ void CMQTTMosquitto::on_connect (int rc) void CMQTTMosquitto::on_message(const mosquitto_message *message) { - char buf[51]; - memset(buf, 0, 51*sizeof(char)); - /* Copy N-1 bytes to ensure always 0 terminated. */ - memcpy(buf, message->payload, 50*sizeof(char)); - const char *aux; - aux=buf; + char * cstr = new char[message->payloadlen + 1]; + std::strcpy(cstr, (char*)message->payload); + std::string messageCommand = string(cstr); // Display incomming msg (except ClientACK) //----------------------- if( strcmp(message->topic, (broker_username + "/" +"ClientACK").c_str()) ) { std::cout << "[MQTTMosquitto]: " << message->topic << " : "; - printf(buf, 50); + printf("MSG: %s", messageCommand); printf("\n\n"); //if (message->payloadlen) // std::cout << ", payload text is " << message->payload << "\n"; //else // std::cout << ", payload is (null)\n"; - } - + } + + //First value is ALWAYS the message timeStamp: [tt|msg_content] + size_t pos = messageCommand.find("|"); + mrpt::system::TTimeStamp messageTimeStamp = std::stoull(messageCommand.substr(0, pos)); + messageCommand.erase(0, pos + 1); + printf("MSG_noTS = %s\n", messageCommand.c_str()); + // TOPOLOGY-COMMANDS if(!strcmp(message->topic, (broker_username + "/" +"TopologyCommand").c_str() )) { - string topologyMessage = string(aux); + string topologyMessage = messageCommand; //First element is the name of the action to perform in the topology graph size_t pos = topologyMessage.find(" "); @@ -739,7 +767,7 @@ void CMQTTMosquitto::on_message(const mosquitto_message *message) // NAVIGATION-COMMANDS else if(!strcmp(message->topic, (broker_username + "/" +"NavigationCommand").c_str() )) { - string pluginCommand = string(aux); + string pluginCommand = messageCommand; //First element is the name of the action to perform, then the parameters size_t pos = pluginCommand.find(" "); @@ -794,12 +822,16 @@ void CMQTTMosquitto::on_message(const mosquitto_message *message) // Motion v w else if(action == "Motion") { - //! @moos_publish MOTION_REQUEST v w A request to set the robot linear and angular speeds - m_Comms.Notify("MOTION_REQUEST", pluginCommand); + //check that motion msg is fresh + if (mrpt::system::timeDifference(timestampClientACK,messageTimeStamp)<=0.5) + { + //! @moos_publish MOTION_REQUEST v w A request to set the robot linear and angular speeds + m_Comms.Notify("MOTION_REQUEST", pluginCommand); - //Get timeStamp - time_last_motion_command = mrpt::system::now(); - is_motion_command_set = true; + //Get timeStamp to ensure consecutive motion commands have enough frequency rate + time_last_motion_command = mrpt::system::now(); + is_motion_command_set = true; + } } // RandomNavigator 0/1 @@ -869,7 +901,7 @@ void CMQTTMosquitto::on_message(const mosquitto_message *message) // MAPBUILDING-COMMANDS else if(!strcmp(message->topic, (broker_username + "/" +"MapBuildingCommand").c_str() )) { - string command = string(aux); + string command = messageCommand; //First element is the name of the action to perform size_t pos = command.find(" "); @@ -901,7 +933,7 @@ void CMQTTMosquitto::on_message(const mosquitto_message *message) // SESSION-COMMANDS (for SessionLogger module) else if(!strcmp(message->topic, (broker_username + "/" +"SessionCommand").c_str() )) { - string command = string(aux); + string command = messageCommand; //First element is the name of the action to perform size_t pos = command.find(" "); @@ -923,19 +955,36 @@ void CMQTTMosquitto::on_message(const mosquitto_message *message) // ClientACK else if(!strcmp(message->topic, (broker_username + "/" +"ClientACK").c_str() )) { - string ClientName = string(aux); - //! @moos_publish CLIENT_MQTT_ACK Flag to indicate that communication with client is alive. - m_Comms.Notify("CLIENT_MQTT_ACK",ClientName.c_str()); + m_Comms.Notify("CLIENT_MQTT_ACK", messageCommand.c_str()); + + // messageCommand = "ClientACK|RobotACK|ServerACK" + //All ACK msgs are = "counter timestamp". Extract the Client counter and timestamp + size_t pos = messageCommand.find("|"); + std::string clientACK = messageCommand.substr(0, pos); + //Separate counter from timestamp + pos = clientACK.find(" "); + counterClientACK = std::atoi(clientACK.substr(0, pos).c_str()); + timestampClientACK = std::stoull(clientACK.substr(pos + 1)); + } - //Echo ACK to the user - on_publish(NULL, (broker_username + "/" + "RobotACK").c_str(),5,"alive"); + // ServerACK + else if (!strcmp(message->topic, (broker_username + "/" + "ServerACK").c_str())) + { + // messageCommand = "ClientACK|RobotACK|ServerACK" + //All ACK msgs are = "counter timestamp". Extract the Server counter and timestamp + size_t pos = messageCommand.rfind("|"); + std::string serverACK = messageCommand.substr(pos+1); + //Separate counter from timestamp + pos = serverACK.find(" "); + counterServerACK = std::atoi(serverACK.substr(0, pos).c_str()); + timestampServerACK = std::stoull(serverACK.substr(pos + 1)); } // Debug //Debugf else if(!strcmp(message->topic, (broker_username + "/" +"Debug").c_str() )) { - string command = string(aux); + string command = messageCommand; //First element is the name of the Variable to publish size_t pos = command.find(" "); @@ -945,7 +994,7 @@ void CMQTTMosquitto::on_message(const mosquitto_message *message) } else if(!strcmp(message->topic, (broker_username + "/" +"Debugf").c_str() )) { - string command = string(aux); + string command = messageCommand; //First element is the name of the Variable to publish size_t pos = command.find(" "); @@ -960,13 +1009,13 @@ void CMQTTMosquitto::on_subscribe(uint16_t mid, int qos_count, const uint8_t *gr { for (int i = 0; i < Constants::topic_count; ++i) { - std::string GirafName_topic = broker_username + "/" + Constants::topics[i]; - std::cerr << "Subscribing to " << GirafName_topic << "\n"; - int rc=subscribe(NULL, GirafName_topic.c_str(), qos_count); + std::string topicName = broker_username + "/" + Constants::topics[i]; + std::cerr << "Subscribing to " << topicName << "\n"; + int rc = subscribe(NULL, topicName.c_str(), qos_count); if (rc) - std::cerr << "Error: failed to subscribe to " << GirafName_topic << ", subscribe returned " << rc << "\n"; + std::cerr << "Error: failed to subscribe to " << topicName << ", subscribe returned " << rc << "\n"; else - std::cerr << "Subscribed to " << GirafName_topic << ", OK!\n"; + std::cerr << "Subscribed to " << topicName << ", OK!\n"; } time_t rawtime; time(&rawtime); @@ -976,5 +1025,16 @@ void CMQTTMosquitto::on_subscribe(uint16_t mid, int qos_count, const uint8_t *gr void CMQTTMosquitto::on_publish(int *mid, const char *topic, int payloadlen, const void *payload) { - int n= publish(mid,topic,payloadlen,payload); -} + //Append "current_timestamp|" to all published messages, as part of the payload message + char * cstr = new char[payloadlen + 1]; + std::strcpy(cstr, (char*)payload ); + std::string sLoad = string(cstr); + uint64_t tt = (uint64_t)mrpt::system::now() / 10000; //timestamp in ms + std::string myPayload = format("%I64u|", tt) + sLoad; + + //Update vars + payloadlen = strlen(myPayload.c_str()); + + //Send message over MQTT with qos=0 + int n = publish(mid, topic, payloadlen, myPayload.c_str(),0,false); +} \ No newline at end of file diff --git a/MQTTMosquitto/CMQTTMosquitto.h b/MQTTMosquitto/CMQTTMosquitto.h index d02d26c..5b96f87 100644 --- a/MQTTMosquitto/CMQTTMosquitto.h +++ b/MQTTMosquitto/CMQTTMosquitto.h @@ -80,6 +80,11 @@ class CMQTTMosquitto :public COpenMORAApp,mosquittopp std::string TLS_path, TLS_CAfile_pem, TLS_CERTfile_pem, TLS_KEYfile_pem; bool is_motion_command_set; mrpt::system::TTimeStamp time_last_motion_command; + + /** To handle message synchronization*/ + float ACKRate; + mrpt::system::TTimeStamp timestampClientACK, timestampServerACK, timestampRobotACK; + int counterClientACK, counterServerACK, counterRobotACK; }; #endif diff --git a/MQTTMosquitto/Constants.cpp b/MQTTMosquitto/Constants.cpp index ba92e9b..04727fe 100644 --- a/MQTTMosquitto/Constants.cpp +++ b/MQTTMosquitto/Constants.cpp @@ -4,6 +4,6 @@ const char *Constants::host = "giraffplus.xlab.si"; const int Constants::port = 8883; const int Constants::keepalive_secs = 30; -const char *Constants::topics[] = {"TopologyCommand","NavigationCommand","MapBuildingCommand","SessionCommand","ClientACK","Debug","Debugf"}; +const char *Constants::topics[] = { "TopologyCommand", "NavigationCommand", "MapBuildingCommand", "SessionCommand", "ClientACK", "ServerACK", "Debug", "Debugf" }; const int Constants::topic_count = (sizeof(Constants::topics) / sizeof(char*)); const char *Constants::publish_topic = "hello_topic"; \ No newline at end of file diff --git a/SpeechSynth/CSpeechSynth.cpp b/SpeechSynth/CSpeechSynth.cpp index 7a6485e..928cfb4 100644 --- a/SpeechSynth/CSpeechSynth.cpp +++ b/SpeechSynth/CSpeechSynth.cpp @@ -109,6 +109,8 @@ bool CSpeechSynthApp::OnNewMail(MOOSMSG_LIST &NewMail) // Is it necessary to free the memory allocated???? // GlobalFree(hMem); + + m_Comms.Notify("VOICE_EVENT_DONE",""); } if( (i->GetName()=="SHUTDOWN") && (MOOSStrCmp(i->GetString(),"true")) )