From cff60ece938e590b34671ca172ef0e717b5169b8 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Mon, 24 Apr 2023 08:37:03 +0200 Subject: [PATCH 1/9] Add ISocket class in communication interfaces (#25) * Add communication interfaces project and ByteArray * Add tests for ByteArray * Add interface class for a Socket * Improvements from code review * Rename configure to open * Add README * Extend README.md Extend README with implementation example for derived classes * Typo --------- Co-authored-by: Enrico Eberhard <32450951+eeberhard@users.noreply.github.com> --- source/Dockerfile | 21 +++++ source/README.md | 77 +++++++++++++++++ .../communication_interfaces/CMakeLists.txt | 55 ++++++++++++ .../communication_interfaces/ByteArray.hpp | 83 +++++++++++++++++++ .../sockets/ISocket.hpp | 46 ++++++++++ .../src/ByteArray.cpp | 72 ++++++++++++++++ .../test/test_communication_interfaces.cpp | 6 ++ .../test/tests/test_byte_array.cpp | 26 ++++++ source/dev-server.sh | 34 ++++++++ source/install.sh | 69 +++++++++++++++ 10 files changed, 489 insertions(+) create mode 100644 source/Dockerfile create mode 100644 source/README.md create mode 100644 source/communication_interfaces/CMakeLists.txt create mode 100644 source/communication_interfaces/include/communication_interfaces/ByteArray.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp create mode 100644 source/communication_interfaces/src/ByteArray.cpp create mode 100644 source/communication_interfaces/test/test_communication_interfaces.cpp create mode 100644 source/communication_interfaces/test/tests/test_byte_array.cpp create mode 100755 source/dev-server.sh create mode 100644 source/install.sh diff --git a/source/Dockerfile b/source/Dockerfile new file mode 100644 index 0000000..706464c --- /dev/null +++ b/source/Dockerfile @@ -0,0 +1,21 @@ +FROM ghcr.io/aica-technology/control-libraries/development-dependencies as source-dependencies + +RUN apt-get update && apt-get install -y \ + libzmq3-dev \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +WORKDIR /tmp + +ARG CPPZMQ_VERSION=4.7.1 +RUN wget https://github.com/zeromq/cppzmq/archive/v${CPPZMQ_VERSION}.tar.gz -O cppzmq-${CPPZMQ_VERSION}.tar.gz +RUN tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz +WORKDIR /tmp/cppzmq-${CPPZMQ_VERSION} +RUN mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install + + +WORKDIR /tmp +ARG CONTROL_LIBRARIES_BRANCH=v7.0.0 +RUN git clone -b ${CONTROL_LIBRARIES_BRANCH} --depth 1 https://github.com/aica-technology/control-libraries.git +RUN cd control-libraries/source && ./install.sh --auto --no-controllers --no-dynamical-systems --no-robot-model + +RUN rm -rf /tmp/* diff --git a/source/README.md b/source/README.md new file mode 100644 index 0000000..c10218e --- /dev/null +++ b/source/README.md @@ -0,0 +1,77 @@ +## Socket Interface + +The `ISocket` class is an interface for simple socket communication, defining functions for opening a socket, +sending and receiving bytes, and closing the socket connection. + +This interface extends from `state_representation::ParameterMap`, which allows for adding and retrieving parameters for +the socket connection. Further, the interface utilizes the `ByteArray` class to conveniently send and receive data in +the form of a dynamic array of bytes. + +The `ISocket` class defines an `open()` method to perform configuration steps to open the socket for communication. +If opening the socket fails, an exception is thrown. The `close()` method is also provided to perform steps to disconnect +and close the socket communication. + +The functions `receive_bytes(ByteArray&)` and `send_bytes(const ByteArray&)` perform the read and write logic of the socket +respectively. + +### Implementing a derived socket class + +To use this class, create a subclass that inherits from it and implement its pure virtual functions. The pure virtual +functions are `open()`, `receive_bytes(ByteArray&)`, and `send_bytes(const ByteArray&)`. + +The `close()` function can optionally be overridden to perform steps to disconnect and close the socket communication. +If a derived class defines any cleanup behavior in `close()`, it should also be invoked statically and explicitly +in the destructor of the derived class. + +An example is given below. + +```c++ +// DerivedSocket.hpp +class DerivedSocket : ISocket { +public: + ~DerivedSocket() override; + + void open() override; + + bool receive_bytes(ByteArray& buffer) override; + + bool send_bytes(const ByteArray& buffer) override; + + void close() override; +} +``` + +```c++ +// DerivedSocket.cpp +DerivedSocket::~DerivedSocket() { + DerivedSocket::close(); +} + +void DerivedSocket::open() { + // Configure and open the socket +} + +bool DerivedSocket::receive_bytes(ByteArray& buffer) { + // Read the contents of the socket into the buffer and return true on success. Otherwise, return false. + return true; +} + +bool DerivedSocket::send_bytes(const ByteArray& buffer) { + // Write the contents of the buffer onto the socket and return true on success. Otherwise, return false. + return true; +} + +void DerivedSocket::close() { + // Perform clean-up steps here +} +``` + +## Byte Array + +The `ByteArray` class is a container for a dynamic array of bytes (i.e. `char`). It provides methods for loading and +unloading data types to and from the ByteArray. The class acts as an interface definition to raw data (in case the +underlying structure of the raw data changes). Its intended use is for socket communication. + +To use this class, create a `ByteArray` object and use its member functions to load and unload data to and from the +buffer. The default behavior is to append/remove data from the end of the buffer. The class provides methods to copy the +buffer to and from a std::string or a std::vector. diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt new file mode 100644 index 0000000..261167d --- /dev/null +++ b/source/communication_interfaces/CMakeLists.txt @@ -0,0 +1,55 @@ +cmake_minimum_required(VERSION 3.15) + +project(communication_interfaces VERSION 0.0.1) + +# FIXME turn off +option(BUILD_TESTING "Build tests." ON) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 17) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +if(BUILD_TESTING) + enable_testing() + find_package(GTest REQUIRED) +else() + find_package(GTest QUIET) +endif() + +include(GNUInstallDirs) + +find_package(control_libraries 7.0.0 REQUIRED COMPONENTS state_representation) +find_package(cppzmq 4.7.1 REQUIRED) + +include_directories(include) + +add_library(${PROJECT_NAME} SHARED + ${PROJECT_SOURCE_DIR}/src/ByteArray.cpp +) +target_include_directories(${PROJECT_NAME} PUBLIC include) +target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq state_representation) + +install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) + +install(TARGETS ${PROJECT_NAME} + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin + ) + +if(BUILD_TESTING) + add_executable(test_${PROJECT_NAME} test/test_communication_interfaces.cpp) + file(GLOB_RECURSE MODULE_TEST_SOURCES test/tests test_*.cpp) + target_sources(test_${PROJECT_NAME} PRIVATE ${MODULE_TEST_SOURCES}) + target_link_libraries(test_${PROJECT_NAME} ${PROJECT_NAME} ${GTEST_LIBRARIES} pthread) + add_test(NAME test_${PROJECT_NAME} COMMAND test_${PROJECT_NAME}) +endif() diff --git a/source/communication_interfaces/include/communication_interfaces/ByteArray.hpp b/source/communication_interfaces/include/communication_interfaces/ByteArray.hpp new file mode 100644 index 0000000..05ddd0c --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/ByteArray.hpp @@ -0,0 +1,83 @@ +#pragma once + +#include +#include +#include + +namespace communication_interfaces { + +/** + * @brief The ByteArray wraps a dynamic array of bytes (i.e. char). + * @details The ByteArray provides convenient methods for loading and unloading data types to and from the ByteArray. + * The class actas as an interface definition to raw data (in case the underlying structure of the raw data changes). + * Its intended use is for socket communication. + * By default, data is appended/removed from the end of the array using the load/unload methods. + */ +class ByteArray { +public: + /** + * @brief Default constructor to create an empty ByteArray + */ + ByteArray(); + + ~ByteArray() = default; + + /** + * @brief Clear the buffer + */ + void reset(); + + /** + * @brief Load a void* (treated as char*) into the buffer + * @param value The value to load + * @param size The number of bytes to load + * @return True on success, false otherwise + */ + bool load(const void* value, const std::size_t& size); + + /** + * @brief Unload a void* (treated as char*) from the buffer + * @param value The value to unload + * @param size The number of bytes to unload + * @return True on success, false otherwise + */ + bool unload(void* value, const std::size_t& size); + + /** + * @brief Copy the content of the buffer to a std::string + * @param value The string to copy to + */ + void copy_to(std::string& value) const; + + /** + * @brief Copy the content of the buffer to a std::vector of char + * @param value The vector to copy into + */ + void copy_to(std::vector& value) const; + + /** + * @brief Set the buffer content from a std::string + * @param value The string to copy from + */ + void copy_from(const std::string& value); + + /** + * @brief Set the ByteArray buffer from a std::vector of char + * @param value The vector to copy from + */ + void copy_from(const std::vector& value); + + /** + * @brief Get the current size of the buffer + */ + [[nodiscard]] unsigned int get_buffer_size() const; + + /** + * @brief Get the maximal size of the buffer + */ + [[nodiscard]] unsigned int get_max_buffer_size() const; + +private: + std::deque buffer_; +}; +}// namespace communication_interfaces \ No newline at end of file diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp new file mode 100644 index 0000000..84aaf27 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include + +#include "communication_interfaces/ByteArray.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Interface class to define functions required for simple socket communication + */ +class ISocket : public state_representation::ParameterMap { +public: + /** + * @brief Default constructor + */ + ISocket() = default; + + virtual ~ISocket() = default; + + /** + * @brief Perform configuration steps to open the socket for communication + * @throws Exception if opening fails (FIXME: specify exception) + */ + virtual void open() = 0; + + /** + * @brief Receive bytes from the socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + virtual bool receive_bytes(ByteArray& buffer) = 0; + + /** + * @brief Send bytes to the socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + virtual bool send_bytes(const ByteArray& buffer) = 0; + + /** + * @brief Perform steps to disconnect and close the socket communication + */ + virtual void close() {} +}; +}// namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/ByteArray.cpp b/source/communication_interfaces/src/ByteArray.cpp new file mode 100644 index 0000000..558e66f --- /dev/null +++ b/source/communication_interfaces/src/ByteArray.cpp @@ -0,0 +1,72 @@ +#include "communication_interfaces/ByteArray.hpp" + +namespace communication_interfaces { + +ByteArray::ByteArray() { + this->reset(); +} + +void ByteArray::reset() { + this->buffer_.clear(); +} + +bool ByteArray::load(const void* value, const size_t& size) { + if (value == nullptr) { + return false; + } + if (this->get_buffer_size() + size > this->get_max_buffer_size()) { + return false; + } + try { + char* byte_ptr = (char*)value; + this->buffer_.insert(this->buffer_.end(), byte_ptr, byte_ptr + size); + return true; + } catch (const std::exception&) { + return false; + } +} + +bool ByteArray::unload(void* value, const std::size_t& size) { + if (value == nullptr) { + return false; + } + if (size > this->get_buffer_size()) { + return false; + } + try { + std::copy(this->buffer_.end() - static_cast(size), this->buffer_.end(), (char*)value); + this->buffer_.erase(this->buffer_.end() - static_cast(size), this->buffer_.end()); + return true; + } catch (const std::exception&) { + return false; + } +} + +void ByteArray::copy_to(std::vector& value) const { + value.assign(this->buffer_.begin(), this->buffer_.end()); +} + +void ByteArray::copy_to(std::string& value) const { + std::vector tmp; + this->copy_to(tmp); + value = std::string(tmp.data(), tmp.size()); +} + +void ByteArray::copy_from(const std::vector& value) { + this->reset(); + this->load(value.data(), value.size()); +} + +void ByteArray::copy_from(const std::string& value) { + this->reset(); + this->load(value.c_str(), value.size()); +} + +unsigned int ByteArray::get_buffer_size() const { + return this->buffer_.size(); +} + +unsigned int ByteArray::get_max_buffer_size() const { + return this->buffer_.max_size(); +} +}// namespace communication_interfaces diff --git a/source/communication_interfaces/test/test_communication_interfaces.cpp b/source/communication_interfaces/test/test_communication_interfaces.cpp new file mode 100644 index 0000000..697a9d7 --- /dev/null +++ b/source/communication_interfaces/test/test_communication_interfaces.cpp @@ -0,0 +1,6 @@ +#include + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/source/communication_interfaces/test/tests/test_byte_array.cpp b/source/communication_interfaces/test/tests/test_byte_array.cpp new file mode 100644 index 0000000..4de17b6 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_byte_array.cpp @@ -0,0 +1,26 @@ +#include + +#include "communication_interfaces/ByteArray.hpp" + +using namespace communication_interfaces; + +TEST(TestByteArray, ByteArray) { + std::string message = "test"; + auto data = ByteArray(); + data.copy_from(message); + EXPECT_EQ(data.get_buffer_size(), message.size()); + std::string result; + data.copy_to(result); + EXPECT_STREQ(result.c_str(), message.c_str()); + + data.load(message.c_str(), message.size()); + EXPECT_EQ(data.get_buffer_size(), 2 * message.size()); + data.copy_to(result); + EXPECT_STREQ(result.c_str(), (message + message).c_str()); + char* raw_result = new char[message.size()]; + data.unload(raw_result, message.size()); + EXPECT_EQ(data.get_buffer_size(), message.size()); + + data.reset(); + EXPECT_EQ(data.get_buffer_size(), 0); +} diff --git a/source/dev-server.sh b/source/dev-server.sh new file mode 100755 index 0000000..4274567 --- /dev/null +++ b/source/dev-server.sh @@ -0,0 +1,34 @@ +#!/bin/bash +CONTROL_LIBRARIES_BRANCH=develop +REMOTE_SSH_PORT=4420 + +IMAGE_NAME=aica-technology/communication-interfaces +IMAGE_STAGE=source-dependencies + +BUILD_FLAGS=() + +HELP_MESSAGE="Usage: ./dev-server.sh [-r] [-v] + +Build a Docker container for remote development. +Options: + -r, --rebuild Rebuild the image with no cache. + -v, --verbose Show all the output of the Docker + build process. + -h, --help Show this help message." + +while [ "$#" -gt 0 ]; do + case "$1" in + -r|--rebuild) BUILD_FLAGS+=(--no-cache); shift 1;; + -v|--verbose) BUILD_FLAGS+=(--progress=plain); shift 1;; + -h|--help) echo "${HELP_MESSAGE}"; exit 0;; + *) echo "Unknown option: $1" >&2; echo "${HELP_MESSAGE}"; exit 1;; + esac +done + +BUILD_FLAGS+=(-t "${IMAGE_NAME}:${IMAGE_STAGE}") +BUILD_FLAGS+=(--target "${IMAGE_STAGE}") + +docker pull ghcr.io/aica-technology/control-libraries/development-dependencies || exit 1 +DOCKER_BUILDKIT=1 docker build "${BUILD_FLAGS[@]}" . || exit 1 + +aica-docker server "${IMAGE_NAME}:${IMAGE_STAGE}" -u developer -p "${REMOTE_SSH_PORT}" diff --git a/source/install.sh b/source/install.sh new file mode 100644 index 0000000..f7de910 --- /dev/null +++ b/source/install.sh @@ -0,0 +1,69 @@ +#!/bin/bash +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + +INSTALL_DESTINATION="/usr/local" +AUTO_INSTALL="" +CPPZMQ_VERSION=4.7.1 + +HELP_MESSAGE="Usage: [sudo] ./install.sh [OPTIONS] +An install script for the communication_interfaces library. +Options: + -y, --auto Suppress any input prompts and + automatically approve install steps. + -d, --dir [path] Configure the installation directory + (default: ${INSTALL_DESTINATION}). + -h, --help Show this help message." + +function install_cppzmq() { + if [ "${INSTALL_CPP}" == 1 ]; then + apt-get update && apt-get install "${AUTO_INSTALL}" libzmq3-dev || exit 1 + + mkdir -p "${SCRIPT_DIR}"/install + cd "${SCRIPT_DIR}"/install || exit 1 + wget https://github.com/zeromq/cppzmq/archive/v${CPPZMQ_VERSION}.tar.gz -O cppzmq-${CPPZMQ_VERSION}.tar.gz && + tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz && + rm cppzmq-${CPPZMQ_VERSION}.tar.gz + + cd "${SCRIPT_DIR}"/install/cppzmq-"${CPPZMQ_VERSION}" || exit 1 + mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install || exit 1 + ldconfig + fi + + cd "${SCRIPT_DIR}" && rm -rf "${SCRIPT_DIR}"/install || exit 1 +} + +install_communication_interfaces () { + cd "${SCRIPT_DIR}"/communication_interfaces && mkdir -p build && cd build || exit 1 + cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX="${INSTALL_DESTINATION}" .. || exit 1 + + make -j && make install || exit 1 + ldconfig + + cd "${SCRIPT_DIR}" || exit 1 +} + +while [ "$#" -gt 0 ]; do + case "$1" in + -y|--auto) AUTO_INSTALL="-y"; shift 1;; + -d|--dir) INSTALL_DESTINATION=$2; shift 2;; + -h|--help) echo "${HELP_MESSAGE}"; exit 0;; + -*) echo "Unknown option: $1" >&2 + echo "${FAIL_MESSAGE}" + exit 1;; + esac +done + +$(pkg-config --exists state_representation) || exit 1 +if [ "$?" == 1 ]; then + echo ">>> STATE REPRESENTATION LIBRARY NOT FOUND!" + echo ">>> Install state_representation from https://github.com/aica-technology/control-libraries/source" + exit 1 +fi + +echo ">>> INSTALLING ZMQ DEPENDENCIES" +install_cppzmq || exit 1 + +echo ">>> INSTALLING COMMUNICATION INTERFACES" +install_communication_interfaces || exit 1 + +echo ">>> DONE" From 5e6b9ed5bb1795220a3bf3a297a00978822e8fe6 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Thu, 27 Apr 2023 11:24:12 +0200 Subject: [PATCH 2/9] Add UDP server and client classes (#27) * Add UDPSocket * Add UDP server and client with tests * Avoid parameters as class property * Correct signature of recvfrom --- .../communication_interfaces/CMakeLists.txt | 6 +- .../SocketConfigurationException.hpp | 16 +++ .../sockets/ISocket.hpp | 5 +- .../sockets/UDPClient.hpp | 37 +++++ .../sockets/UDPServer.hpp | 40 ++++++ .../sockets/UDPSocket.hpp | 72 ++++++++++ .../src/sockets/UDPClient.cpp | 21 +++ .../src/sockets/UDPServer.cpp | 21 +++ .../src/sockets/UDPSocket.cpp | 126 ++++++++++++++++++ .../test/tests/test_udp_communication.cpp | 81 +++++++++++ 10 files changed, 422 insertions(+), 3 deletions(-) create mode 100644 source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp create mode 100644 source/communication_interfaces/src/sockets/UDPClient.cpp create mode 100644 source/communication_interfaces/src/sockets/UDPServer.cpp create mode 100644 source/communication_interfaces/src/sockets/UDPSocket.cpp create mode 100644 source/communication_interfaces/test/tests/test_udp_communication.cpp diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index 261167d..acc40e1 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -2,8 +2,7 @@ cmake_minimum_required(VERSION 3.15) project(communication_interfaces VERSION 0.0.1) -# FIXME turn off -option(BUILD_TESTING "Build tests." ON) +option(BUILD_TESTING "Build tests." OFF) # Default to C99 if(NOT CMAKE_C_STANDARD) @@ -34,6 +33,9 @@ include_directories(include) add_library(${PROJECT_NAME} SHARED ${PROJECT_SOURCE_DIR}/src/ByteArray.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/UDPSocket.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/UDPClient.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/UDPServer.cpp ) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq state_representation) diff --git a/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp b/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp new file mode 100644 index 0000000..28f5f93 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace communication_interfaces::exceptions { + +/* + * @class SocketConfigurationException + * @brief Exception that is thrown when a socket configuration failed + */ +class SocketConfigurationException : public std::runtime_error { +public: + explicit SocketConfigurationException(const std::string& msg) : runtime_error(msg) {}; +}; +} // namespace communication_interfaces::exceptions diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp index 84aaf27..bec879a 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp @@ -16,11 +16,14 @@ class ISocket : public state_representation::ParameterMap { */ ISocket() = default; + /** + * @brief Default destructor + */ virtual ~ISocket() = default; /** * @brief Perform configuration steps to open the socket for communication - * @throws Exception if opening fails (FIXME: specify exception) + * @throws SocketConfigurationException if opening fails */ virtual void open() = 0; diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp new file mode 100644 index 0000000..50c7263 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp @@ -0,0 +1,37 @@ +#pragma once + +#include "communication_interfaces/sockets/UDPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Class to define a UDP client + */ +class UDPClient : public UDPSocket { +public: + /** + * @copydoc UDPSocket::UDPSocket() + */ + UDPClient(); + + /** + * @copydoc UDPSocket::UDPSocket(const state_representation::ParameterInterfaceList&) + */ + explicit UDPClient(const state_representation::ParameterInterfaceList& parameters); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @copydoc ISocket::receive_bytes(ByteArray&) + */ + bool receive_bytes(ByteArray& buffer) override; + + /** + * @copydoc ISocket::send_bytes(const ByteArray&) + */ + bool send_bytes(const ByteArray& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp new file mode 100644 index 0000000..b51f51a --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp @@ -0,0 +1,40 @@ +#pragma once + +#include "communication_interfaces/sockets/UDPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Class to define a UDP server + */ +class UDPServer : public UDPSocket { +public: + /** + * @copydoc UDPSocket::UDPSocket() + */ + UDPServer(); + + /** + * @copydoc UDPSocket::UDPSocket(const state_representation::ParameterInterfaceList&) + */ + explicit UDPServer(const std::list>& parameters); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @copydoc ISocket::receive_bytes(ByteArray&) + */ + bool receive_bytes(ByteArray& buffer) override; + + /** + * @copydoc ISocket::send_bytes(const ByteArray&) + */ + bool send_bytes(const ByteArray& buffer) override; + +private: + sockaddr_in client_address_; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp new file mode 100644 index 0000000..1a5fdd5 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp @@ -0,0 +1,72 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Abstract class to define a generic UDP socket + */ +class UDPSocket : public ISocket { +public: + /** + * @brief Close the socket by calling UDPSocket::close() + */ + ~UDPSocket() override; + + /** + * @brief Close the socket + */ + void close() override; + +protected: + /** + * @brief Add the parameters with no or default value to the map + */ + UDPSocket(); + + /** + * @brief Add and set the parameters with the parameters given as argument + * @param parameters The list of parameters + */ + explicit UDPSocket(const state_representation::ParameterInterfaceList& parameters); + + /** + * @brief Perform steps to open the socket on the desired IP/port, set reuse and timeout options and bind if desired. + * @param bind_socket If true, bind the socket (for a UDP server), no binding otherwise (for a UDP client) + */ + void open_socket(bool bind_socket); + + /** + * @brief Receive bytes from the socket + * @param address Reference to a sockaddr_in structure in which the sending address is to be stored + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + [[nodiscard]] bool recvfrom(sockaddr_in& address, ByteArray& buffer); + + /** + * @brief Send bytes to the socket + * @param address Reference to a sockaddr_in structure containing the destination address + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + [[nodiscard]] bool sendto(const sockaddr_in& address, const ByteArray& buffer) const; + + sockaddr_in server_address_; ///< Address of the UDP server + +private: + /** + * @brief Validate and set parameters + * @param parameter A parameter interface pointer + */ + void validate_and_set_parameter(const std::shared_ptr& parameter) override; + + std::shared_ptr> buffer_size_; ///< Maximal size of buffer to receive + + int server_fd_; ///< File descriptor of the socket + socklen_t addr_len_; ///< Length of the socket address +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPClient.cpp b/source/communication_interfaces/src/sockets/UDPClient.cpp new file mode 100644 index 0000000..26e6303 --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPClient.cpp @@ -0,0 +1,21 @@ +#include "communication_interfaces/sockets/UDPClient.hpp" + +namespace communication_interfaces::sockets { + +UDPClient::UDPClient() : UDPSocket() {} + +UDPClient::UDPClient(const std::list>& parameters) : + UDPSocket(parameters) {} + +void UDPClient::open() { + this->open_socket(false); +} + +bool UDPClient::receive_bytes(ByteArray& buffer) { + return this->recvfrom(this->server_address_, buffer); +} + +bool UDPClient::send_bytes(const ByteArray& buffer) { + return this->sendto(this->server_address_, buffer); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPServer.cpp b/source/communication_interfaces/src/sockets/UDPServer.cpp new file mode 100644 index 0000000..bead783 --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPServer.cpp @@ -0,0 +1,21 @@ +#include "communication_interfaces/sockets/UDPServer.hpp" + +namespace communication_interfaces::sockets { + +UDPServer::UDPServer() : UDPSocket(), client_address_() {} + +UDPServer::UDPServer(const std::list>& parameters) : + UDPSocket(parameters), client_address_() {} + +void UDPServer::open() { + this->open_socket(true); +} + +bool UDPServer::receive_bytes(ByteArray& buffer) { + return this->recvfrom(this->client_address_, buffer); +} + +bool UDPServer::send_bytes(const ByteArray& buffer) { + return this->sendto(this->client_address_, buffer); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPSocket.cpp b/source/communication_interfaces/src/sockets/UDPSocket.cpp new file mode 100644 index 0000000..3d9267a --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPSocket.cpp @@ -0,0 +1,126 @@ +#include "communication_interfaces/sockets/UDPSocket.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +using namespace state_representation; + +UDPSocket::UDPSocket() : + server_address_(), buffer_size_(std::make_shared>("buffer_size")), server_fd_(), addr_len_() { + this->parameters_.insert_or_assign("ip_address", std::make_shared>("ip_address")); + this->parameters_.insert_or_assign("port", std::make_shared>("port")); + this->parameters_.insert_or_assign("enable_reuse", std::make_shared>("enable_reuse", false)); + this->parameters_.insert_or_assign( + "timeout_duration_sec", std::make_shared>("timeout_duration_sec", 0.0)); + this->parameters_.insert(std::make_pair("buffer_size", this->buffer_size_)); +} + +UDPSocket::UDPSocket(const ParameterInterfaceList& parameters) : UDPSocket() { + this->set_parameters(parameters); +} + +UDPSocket::~UDPSocket() { + UDPSocket::close(); +} + +void UDPSocket::open_socket(bool bind_socket) { + if (this->buffer_size_->is_empty()) { + throw exceptions::SocketConfigurationException("Parameter 'buffer_size' is empty."); + } + + try { + this->addr_len_ = sizeof(this->server_address_); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_addr.s_addr = inet_addr(this->get_parameter_value("ip_address").c_str()); + this->server_address_.sin_port = htons(this->get_parameter_value("port")); + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + this->server_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (this->server_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + if (this->get_parameter_value("enable_reuse")) { + const int opt_reuse = 1; + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); + } + } + if (bind_socket) { + if (bind(this->server_fd_, (sockaddr*) &(this->server_address_), sizeof(server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Binding socket failed."); + } + } + + auto timeout_duration_sec = this->get_parameter_value("timeout_duration_sec"); + if (timeout_duration_sec > 0.0) { + timeval timeout{}; + auto secs = std::floor(timeout_duration_sec); + timeout.tv_sec = static_cast(secs); + timeout.tv_usec = static_cast((timeout_duration_sec - secs) * 1e6); + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket timeout failed."); + } + } +} + +bool UDPSocket::recvfrom(sockaddr_in& address, ByteArray& buffer) { + std::vector local_buffer(this->buffer_size_->get_value()); + auto receive_length = ::recvfrom( + this->server_fd_, local_buffer.data(), this->buffer_size_->get_value(), 0, (sockaddr*) &(address), + &(this->addr_len_)); + if (receive_length < 0) { + return false; + } + local_buffer.at(receive_length) = 0; + buffer.reset(); + buffer.load(local_buffer.data(), receive_length); + return true; +} + +bool UDPSocket::sendto(const sockaddr_in& address, const ByteArray& buffer) const { + std::vector local_buffer; + buffer.copy_to(local_buffer); + int send_length = ::sendto( + this->server_fd_, local_buffer.data(), local_buffer.size(), 0, (sockaddr*) &(address), this->addr_len_); + return send_length == static_cast(local_buffer.size()); +} + +void UDPSocket::validate_and_set_parameter(const std::shared_ptr& parameter) { + // TODO remove once this check is included in `assert_parameter_valid` + if (this->parameters_.find(parameter->get_name()) == this->parameters_.end()) { + throw state_representation::exceptions::InvalidParameterException( + "Invalid parameter '" + parameter->get_name() + "' for class UDPSocket."); + } + this->assert_parameter_valid(parameter); + if (parameter->is_empty()) { + throw state_representation::exceptions::InvalidParameterException( + "Parameter '" + parameter->get_name() + "' cannot be empty."); + } + if (parameter->get_name() == "timeout_duration_sec" && parameter->get_parameter_value() < 0.0) { + throw state_representation::exceptions::InvalidParameterException( + "Parameter 'timeout_duration_sec' cannot be negative."); + } + if (parameter->get_name() == "buffer_size") { + int value = parameter->get_parameter_value(); + if (value < 0) { + throw state_representation::exceptions::InvalidParameterException( + "Parameter 'buffer_size' cannot be negative."); + } + this->parameters_.at(parameter->get_name())->set_parameter_value(value); + } else { + this->parameters_.insert_or_assign(parameter->get_name(), parameter); + } +} + +void UDPSocket::close() { + if (this->server_fd_ >= 0) { + ::close(this->server_fd_); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/tests/test_udp_communication.cpp b/source/communication_interfaces/test/tests/test_udp_communication.cpp new file mode 100644 index 0000000..d38b441 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_udp_communication.cpp @@ -0,0 +1,81 @@ +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" +#include "communication_interfaces/sockets/UDPClient.hpp" +#include "communication_interfaces/sockets/UDPServer.hpp" + +using namespace communication_interfaces; + +class TestUDPSockets : public ::testing::Test { +public: + TestUDPSockets() { + params_.emplace_back(state_representation::make_shared_parameter("ip_address", "127.0.0.1")); + params_.emplace_back(state_representation::make_shared_parameter("port", 5000)); + params_.emplace_back(state_representation::make_shared_parameter("buffer_size", 100)); + } + + std::list> params_; +}; + +TEST_F(TestUDPSockets, SendReceive) { + const std::string send_string = "Hello world!"; + + // Create server socket and bind it to a port + sockets::UDPServer server(this->params_); + ASSERT_NO_THROW(server.open()); + + // Create client socket + sockets::UDPClient client(this->params_); + ASSERT_NO_THROW(client.open()); + + // Send test message from client to server + ByteArray message; + message.copy_from(send_string); + ASSERT_TRUE(client.send_bytes(message)); + + // Receive message on server + message.reset(); + ASSERT_TRUE(server.receive_bytes(message)); + + // Convert received message to string and compare with sent message + std::string received_string; + message.copy_to(received_string); + EXPECT_STREQ(received_string.c_str(), send_string.c_str()); +} + +TEST_F(TestUDPSockets, Timeout) { + this->params_.emplace_back(state_representation::make_shared_parameter("timeout_duration_sec", 5.0)); + + // Create server socket and bind it to a port + sockets::UDPServer server(this->params_); + + // Try to receive a message from client, but expect timeout + ByteArray received_bytes; + EXPECT_FALSE(server.receive_bytes(received_bytes)); +} + +TEST_F(TestUDPSockets, PortReuse) { + // Create server socket and bind it to a port + sockets::UDPServer server1(this->params_); + server1.open(); + + // Try to create a second server socket and bind it to the same port (expect failure) + sockets::UDPServer server2(this->params_); + EXPECT_THROW(server2.open(), exceptions::SocketConfigurationException); +} + +TEST_F(TestUDPSockets, OpenClose) { + // Create and open server socket + sockets::UDPServer server(this->params_); + server.open(); + + // Close server socket + server.close(); + + // Create and open client socket + sockets::UDPClient client(this->params_); + client.open(); + + // Try to send a message from the closed server socket (expect failure) + EXPECT_FALSE(server.send_bytes(ByteArray())); +} From d6c0e9c8e36e86657c6b92a66efc26c8955ae98c Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Wed, 3 May 2023 08:30:05 +0200 Subject: [PATCH 3/9] Add ZMQ publisher and subscriber classes (#29) * Add ZMQSocket * Add ZMQ publisher and subscriber with tests * Add docstrings --- .../communication_interfaces/CMakeLists.txt | 3 + .../sockets/ZMQPublisher.hpp | 34 ++++++++ .../sockets/ZMQSocket.hpp | 69 +++++++++++++++ .../sockets/ZMQSubscriber.hpp | 34 ++++++++ .../src/sockets/ZMQPublisher.cpp | 20 +++++ .../src/sockets/ZMQSocket.cpp | 86 +++++++++++++++++++ .../src/sockets/ZMQSubscriber.cpp | 22 +++++ .../test/tests/test_zmq_communication.cpp | 48 +++++++++++ 8 files changed, 316 insertions(+) create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp create mode 100644 source/communication_interfaces/src/sockets/ZMQPublisher.cpp create mode 100644 source/communication_interfaces/src/sockets/ZMQSocket.cpp create mode 100644 source/communication_interfaces/src/sockets/ZMQSubscriber.cpp create mode 100644 source/communication_interfaces/test/tests/test_zmq_communication.cpp diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index acc40e1..f1ed2a6 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -36,6 +36,9 @@ add_library(${PROJECT_NAME} SHARED ${PROJECT_SOURCE_DIR}/src/sockets/UDPSocket.cpp ${PROJECT_SOURCE_DIR}/src/sockets/UDPClient.cpp ${PROJECT_SOURCE_DIR}/src/sockets/UDPServer.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSocket.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisher.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSubscriber.cpp ) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq state_representation) diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp new file mode 100644 index 0000000..b68f654 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Class to define a ZMQ publisher + */ +class ZMQPublisher : public ZMQSocket { +public: + /** + * @copydoc ZMQSocket::ZMQSocket(const std::shared_ptr&) + */ + explicit ZMQPublisher(const std::shared_ptr& context); + + /** + * @copydoc ZMQSocket::ZMQSocket(const state_representation::ParameterInterfaceList&, const std::shared_ptr&) + */ + explicit ZMQPublisher( + const state_representation::ParameterInterfaceList& parameters, const std::shared_ptr& context + ); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @brief This method throws a runtime error as receiving is not available for a ZMQ publisher + */ + bool receive_bytes(ByteArray& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp new file mode 100644 index 0000000..43a058b --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp @@ -0,0 +1,69 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Abstract class to define a generic ZMQ socket + */ +class ZMQSocket : public ISocket { +public: + /** + * @brief Close the socket by calling ZMQSocket::close() + */ + ~ZMQSocket() override; + + /** + * @brief Close the socket + */ + void close() override; + + /** + * @copydoc ISocket::receive_bytes(ByteArray&) + */ + bool receive_bytes(ByteArray& buffer) override; + + /** + * @copydoc ISocket::send_bytes(ByteArray&) + */ + bool send_bytes(const ByteArray& buffer) override; + +protected: + /** + * @brief Add the parameters with no or default value to the map + * @param context The ZMQ context to be used for the sockets + * (it's recommended to only use one context per application) + */ + explicit ZMQSocket(const std::shared_ptr& context); + + /** + * @brief Add and set the parameters with the parameters given as argument + * @param parameters The list of parameters + * @param context The ZMQ context to be used for the sockets + * (it's recommended to only use one context per application) + */ + ZMQSocket( + const state_representation::ParameterInterfaceList& parameters, const std::shared_ptr& context + ); + + /** + * @brief Bind or connect the socket on the desired IP/port + */ + void open_socket(); + + std::shared_ptr context_; ///< ZMQ context + std::shared_ptr socket_; ///< ZMQ socket + +private: + /** + * @brief Validate and set parameters + * @param parameter A parameter interface pointer + */ + void validate_and_set_parameter(const std::shared_ptr& parameter) final; + + std::shared_ptr> wait_; ///< If false, send and receive are blocking +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp new file mode 100644 index 0000000..fc723cc --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Class to define a ZMQ subscriber + */ +class ZMQSubscriber : public ZMQSocket { +public: + /** + * @copydoc ZMQSocket::ZMQSocket(const std::shared_ptr&) + */ + explicit ZMQSubscriber(const std::shared_ptr& context); + + /** + * @copydoc ZMQSocket::ZMQSocket(const state_representation::ParameterInterfaceList&, const std::shared_ptr&) + */ + explicit ZMQSubscriber( + const state_representation::ParameterInterfaceList& parameters, const std::shared_ptr& context + ); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @brief This method throws a runtime error as sending is not available for a ZMQ publisher + */ + bool send_bytes(const ByteArray& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp new file mode 100644 index 0000000..1beb768 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp @@ -0,0 +1,20 @@ +#include "communication_interfaces/sockets/ZMQPublisher.hpp" + +namespace communication_interfaces::sockets { + +using namespace state_representation; + +ZMQPublisher::ZMQPublisher(const std::shared_ptr& context) : ZMQSocket(context) {} + +ZMQPublisher::ZMQPublisher(const ParameterInterfaceList& parameters, const std::shared_ptr& context) : + ZMQSocket(parameters, context) {} + +void ZMQPublisher::open() { + this->socket_ = std::make_shared(*this->context_, ZMQ_PUB); + this->open_socket(); +} + +bool ZMQPublisher::receive_bytes(ByteArray&) { + throw std::runtime_error("Receive not available for socket of type ZMQPublisher"); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSocket.cpp b/source/communication_interfaces/src/sockets/ZMQSocket.cpp new file mode 100644 index 0000000..ccabb8f --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQSocket.cpp @@ -0,0 +1,86 @@ +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +using namespace state_representation; + +ZMQSocket::ZMQSocket(const std::shared_ptr& context) : + context_(context), wait_(std::make_shared>("wait", false)) { + this->parameters_.insert_or_assign("ip_address", std::make_shared>("ip_address")); + this->parameters_.insert_or_assign("port", std::make_shared>("port")); + this->parameters_.insert_or_assign("bind_socket", std::make_shared>("bind_socket")); + this->parameters_.insert(std::make_pair("wait", this->wait_)); +} + +ZMQSocket::ZMQSocket(const ParameterInterfaceList& parameters, const std::shared_ptr& context) : + ZMQSocket(context) { + this->set_parameters(parameters); +} + +ZMQSocket::~ZMQSocket() { + ZMQSocket::close(); +} + +void ZMQSocket::open_socket() { + try { + auto address = "tcp://" + this->get_parameter_value("ip_address") + ":" + + this->get_parameter_value("port"); + if (this->get_parameter_value("bind_socket")) { + this->socket_->bind(address); + } else { + this->socket_->connect(address); + } + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } +} + +bool ZMQSocket::receive_bytes(ByteArray& buffer) { + zmq::recv_flags recv_flag = this->wait_->get_value() ? zmq::recv_flags::none : zmq::recv_flags::dontwait; + zmq::message_t message; + try { + auto received = this->socket_->recv(message, recv_flag); + if (received.has_value()) { + buffer.reset(); + buffer.load(message.data(), message.size()); + } + return received.has_value(); + } catch (const zmq::error_t&) { + return false; + } +} + +bool ZMQSocket::send_bytes(const ByteArray& buffer) { + zmq::send_flags send_flags = this->wait_->get_value() ? zmq::send_flags::none : zmq::send_flags::dontwait; + std::vector local_buffer; + buffer.copy_to(local_buffer); + zmq::message_t msg(local_buffer.begin(), local_buffer.end()); + try { + auto sent = this->socket_->send(msg, send_flags); + return sent.has_value(); + } catch (const zmq::error_t&) { + return false; + } +} + +void ZMQSocket::validate_and_set_parameter(const std::shared_ptr& parameter) { + this->assert_parameter_valid(parameter); + if (parameter->is_empty()) { + throw state_representation::exceptions::InvalidParameterException( + "Parameter '" + parameter->get_name() + "' cannot be empty."); + } + if (parameter->get_name() == "wait") { + this->parameters_.at(parameter->get_name())->set_parameter_value(parameter->get_parameter_value()); + } else { + this->parameters_.insert_or_assign(parameter->get_name(), parameter); + } +} + +void ZMQSocket::close() { + if (this->socket_ != nullptr && this->socket_->connected()) { + this->socket_->close(); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp new file mode 100644 index 0000000..86d0426 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp @@ -0,0 +1,22 @@ +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +namespace communication_interfaces::sockets { + +using namespace state_representation; + +ZMQSubscriber::ZMQSubscriber(const std::shared_ptr& context) : ZMQSocket(context) {} + +ZMQSubscriber::ZMQSubscriber(const ParameterInterfaceList& parameters, const std::shared_ptr& context) : + ZMQSocket(parameters, context) {} + +void ZMQSubscriber::open() { + this->socket_ = std::make_shared(*this->context_, ZMQ_SUB); + this->open_socket(); + this->socket_->set(zmq::sockopt::conflate, 1); + this->socket_->set(zmq::sockopt::subscribe, ""); +} + +bool ZMQSubscriber::send_bytes(const ByteArray&) { + throw std::runtime_error("Send not available for socket of type ZMQSubscriber"); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/tests/test_zmq_communication.cpp b/source/communication_interfaces/test/tests/test_zmq_communication.cpp new file mode 100644 index 0000000..4cc5744 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_zmq_communication.cpp @@ -0,0 +1,48 @@ +#include +#include + +#include "communication_interfaces/sockets/ZMQPublisher.hpp" +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +using namespace communication_interfaces; +using namespace std::chrono_literals; + +class TestZMQSockets : public ::testing::Test { +public: + TestZMQSockets() { + params_.emplace_back(state_representation::make_shared_parameter("ip_address", "127.0.0.1")); + params_.emplace_back(state_representation::make_shared_parameter("port", "5000")); + context_ = std::make_shared(1); + } + + std::shared_ptr context_; + state_representation::ParameterInterfaceList params_; +}; + +TEST_F(TestZMQSockets, SendReceive) { + const std::string send_string = "Hello world!"; + + this->params_.emplace_back(state_representation::make_shared_parameter("bind_socket", true)); + sockets::ZMQPublisher publisher(this->params_, this->context_); + + this->params_.pop_back(); + this->params_.emplace_back(state_representation::make_shared_parameter("bind_socket", false)); + sockets::ZMQSubscriber subscriber(this->params_, this->context_); + + publisher.open(); + subscriber.open(); + + ByteArray message; + message.copy_from(send_string); + for (int i = 0; i < 5; ++i) { + EXPECT_TRUE(publisher.send_bytes(message)); + usleep(10000); + } + message.reset(); + ASSERT_TRUE(subscriber.receive_bytes(message)); + std::string received_string; + message.copy_to(received_string); + EXPECT_STREQ(received_string.c_str(), send_string.c_str()); + publisher.close(); + subscriber.close(); +} From 89b9a9ed3f224c9971f0df60307db1704dc09bb5 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Fri, 5 May 2023 13:20:52 +0200 Subject: [PATCH 4/9] Add pkg-config files (#33) --- .../communication_interfaces/CMakeLists.txt | 51 +++++++++++++++++-- .../communication_interfaces-config.cmake.in | 6 +++ .../communication_interfaces.pc.in | 9 ++++ source/install.sh | 47 +++++++++++++---- 4 files changed, 99 insertions(+), 14 deletions(-) create mode 100644 source/communication_interfaces/communication_interfaces-config.cmake.in create mode 100644 source/communication_interfaces/communication_interfaces.pc.in diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index f1ed2a6..383d360 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -1,6 +1,7 @@ cmake_minimum_required(VERSION 3.15) -project(communication_interfaces VERSION 0.0.1) +set (COMMUNICATION_INTERFACES_VERSION 0.0.1) +project(communication_interfaces VERSION ${COMMUNICATION_INTERFACES_VERSION}) option(BUILD_TESTING "Build tests." OFF) @@ -24,10 +25,22 @@ else() find_package(GTest QUIET) endif() +macro(add_project_dependency) + find_package(${ARGV}) + if(PKG_EXTERNAL_DEPS) + set(PKG_EXTERNAL_DEPS "${PKG_EXTERNAL_DEPS}, ${ARGV0} >= ${ARGV1}") + else() + set(PKG_EXTERNAL_DEPS "${ARGV0} >= ${ARGV1}") + endif() +endmacro() + include(GNUInstallDirs) +include(CMakePackageConfigHelpers) +include(FindPkgConfig) -find_package(control_libraries 7.0.0 REQUIRED COMPONENTS state_representation) -find_package(cppzmq 4.7.1 REQUIRED) +add_project_dependency(control_libraries 7.0.0 REQUIRED COMPONENTS state_representation) +add_project_dependency(clproto 7.0.0 REQUIRED) +add_project_dependency(cppzmq 4.7.1 REQUIRED) include_directories(include) @@ -58,3 +71,35 @@ if(BUILD_TESTING) target_link_libraries(test_${PROJECT_NAME} ${PROJECT_NAME} ${GTEST_LIBRARIES} pthread) add_test(NAME test_${PROJECT_NAME} COMMAND test_${PROJECT_NAME}) endif() + + +# generate the version file for the config file +write_basic_package_version_file( + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake" + VERSION "${PROJECT_VERSION}" + COMPATIBILITY SameMajorVersion +) + +# create config file +configure_package_config_file(${CMAKE_CURRENT_SOURCE_DIR}/${PROJECT_NAME}-config.cmake.in + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake" + INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME} + ) + +# install config files +install(FILES + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake" + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME} + ) + +if(${PKG_CONFIG_FOUND}) + set(PKG_NAME ${PROJECT_NAME}) + set(PKG_DESC "Communication Interfaces") + set(pkg_conf_file "${PROJECT_NAME}.pc") + configure_file("${pkg_conf_file}.in" "${CMAKE_BINARY_DIR}/${pkg_conf_file}" @ONLY) + install(FILES "${CMAKE_BINARY_DIR}/${pkg_conf_file}" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig/ COMPONENT pkgconfig) +else() + message(WARNING "Could not find pkg-config executable, skipping generation of pkg-config files.") +endif() diff --git a/source/communication_interfaces/communication_interfaces-config.cmake.in b/source/communication_interfaces/communication_interfaces-config.cmake.in new file mode 100644 index 0000000..e640599 --- /dev/null +++ b/source/communication_interfaces/communication_interfaces-config.cmake.in @@ -0,0 +1,6 @@ +@PACKAGE_INIT@ + +include(CMakeFindDependencyMacro) +find_dependency(state_representation) +find_dependency(clproto) +find_dependency(cppzmq) \ No newline at end of file diff --git a/source/communication_interfaces/communication_interfaces.pc.in b/source/communication_interfaces/communication_interfaces.pc.in new file mode 100644 index 0000000..a415858 --- /dev/null +++ b/source/communication_interfaces/communication_interfaces.pc.in @@ -0,0 +1,9 @@ +prefix=@CMAKE_INSTALL_PREFIX@ +exec_prefix=${prefix} +includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@ + +Name: @PKG_NAME@ +Description: @PKG_DESC@ +Version: @COMMUNICATION_INTERFACES_VERSION@ +Requires: @PKG_EXTERNAL_DEPS@ +Cflags: -I${includedir} diff --git a/source/install.sh b/source/install.sh index f7de910..fa58529 100644 --- a/source/install.sh +++ b/source/install.sh @@ -12,22 +12,26 @@ Options: automatically approve install steps. -d, --dir [path] Configure the installation directory (default: ${INSTALL_DESTINATION}). + --build-tests Build the unittest targets. + --clean Delete any previously installed header + files from ${INSTALL_DESTINATION}/include and any + shared library files from ${INSTALL_DESTINATION}/lib. + --cleandir [path] Delete any previously installed header + and library files from the specified path. -h, --help Show this help message." function install_cppzmq() { - if [ "${INSTALL_CPP}" == 1 ]; then - apt-get update && apt-get install "${AUTO_INSTALL}" libzmq3-dev || exit 1 + apt-get update && apt-get install "${AUTO_INSTALL}" libzmq3-dev || exit 1 - mkdir -p "${SCRIPT_DIR}"/install - cd "${SCRIPT_DIR}"/install || exit 1 - wget https://github.com/zeromq/cppzmq/archive/v${CPPZMQ_VERSION}.tar.gz -O cppzmq-${CPPZMQ_VERSION}.tar.gz && - tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz && - rm cppzmq-${CPPZMQ_VERSION}.tar.gz + mkdir -p "${SCRIPT_DIR}"/install + cd "${SCRIPT_DIR}"/install || exit 1 + wget https://github.com/zeromq/cppzmq/archive/v${CPPZMQ_VERSION}.tar.gz -O cppzmq-${CPPZMQ_VERSION}.tar.gz && + tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz && + rm cppzmq-${CPPZMQ_VERSION}.tar.gz - cd "${SCRIPT_DIR}"/install/cppzmq-"${CPPZMQ_VERSION}" || exit 1 - mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install || exit 1 - ldconfig - fi + cd "${SCRIPT_DIR}"/install/cppzmq-"${CPPZMQ_VERSION}" || exit 1 + mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install || exit 1 + ldconfig cd "${SCRIPT_DIR}" && rm -rf "${SCRIPT_DIR}"/install || exit 1 } @@ -42,10 +46,24 @@ install_communication_interfaces () { cd "${SCRIPT_DIR}" || exit 1 } +function uninstall() { + function delete_components() { + rm -r "${INSTALL_DESTINATION}"/include/communication_interfaces + rm -r "${INSTALL_DESTINATION}"/lib/libcommunication_interfaces*.so + } + + delete_components >/dev/null 2>&1 + + echo "Deleted any communication_interfaces artefacts from ${INSTALL_DESTINATION}." +} + while [ "$#" -gt 0 ]; do case "$1" in -y|--auto) AUTO_INSTALL="-y"; shift 1;; -d|--dir) INSTALL_DESTINATION=$2; shift 2;; + --build-tests) BUILD_TESTING="ON"; shift 1;; + --clean) uninstall; exit 0;; + --cleandir) INSTALL_DESTINATION=$2; uninstall; exit 0;; -h|--help) echo "${HELP_MESSAGE}"; exit 0;; -*) echo "Unknown option: $1" >&2 echo "${FAIL_MESSAGE}" @@ -60,6 +78,13 @@ if [ "$?" == 1 ]; then exit 1 fi +$(pkg-config --exists clproto) || exit 1 +if [ "$?" == 1 ]; then + echo ">>> CL PROTO LIBRARY NOT FOUND!" + echo ">>> Install clproto from https://github.com/aica-technology/control-libraries/protocol" + exit 1 +fi + echo ">>> INSTALLING ZMQ DEPENDENCIES" install_cppzmq || exit 1 From 1e63ac2a1309c1a33d60d8de387702e5c06940c3 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Thu, 25 May 2023 17:51:18 +0200 Subject: [PATCH 5/9] Remove state representation dependencies (#37) * Remove state representation from install scripts and CMake dependencies * Remove ParameterMap as base class of ISocket * Construct UDP socket with configuration struct * Update UDP client and server * Construct ZMQ socket with configuration struct * Update ZMQ subscriber and publisher * Update tests * Use UDP config with no constructor and initializer list * Make context part of the configuration struct * Update tests --- source/Dockerfile | 6 -- .../communication_interfaces/CMakeLists.txt | 4 +- .../communication_interfaces-config.cmake.in | 2 - .../sockets/ISocket.hpp | 4 +- .../sockets/UDPClient.hpp | 10 +-- .../sockets/UDPServer.hpp | 10 +-- .../sockets/UDPSocket.hpp | 33 ++++----- .../sockets/ZMQPublisher.hpp | 12 +--- .../sockets/ZMQSocket.hpp | 41 +++++------ .../sockets/ZMQSubscriber.hpp | 12 +--- .../src/sockets/UDPClient.cpp | 5 +- .../src/sockets/UDPServer.cpp | 5 +- .../src/sockets/UDPSocket.cpp | 70 ++++--------------- .../src/sockets/ZMQPublisher.cpp | 9 +-- .../src/sockets/ZMQSocket.cpp | 37 ++-------- .../src/sockets/ZMQSubscriber.cpp | 9 +-- .../test/tests/test_udp_communication.cpp | 23 +++--- .../test/tests/test_zmq_communication.cpp | 19 ++--- source/dev-server.sh | 1 - source/install.sh | 14 ---- 20 files changed, 91 insertions(+), 235 deletions(-) diff --git a/source/Dockerfile b/source/Dockerfile index 706464c..7c2b323 100644 --- a/source/Dockerfile +++ b/source/Dockerfile @@ -12,10 +12,4 @@ RUN tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz WORKDIR /tmp/cppzmq-${CPPZMQ_VERSION} RUN mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install - -WORKDIR /tmp -ARG CONTROL_LIBRARIES_BRANCH=v7.0.0 -RUN git clone -b ${CONTROL_LIBRARIES_BRANCH} --depth 1 https://github.com/aica-technology/control-libraries.git -RUN cd control-libraries/source && ./install.sh --auto --no-controllers --no-dynamical-systems --no-robot-model - RUN rm -rf /tmp/* diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index 383d360..0fff133 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -38,8 +38,6 @@ include(GNUInstallDirs) include(CMakePackageConfigHelpers) include(FindPkgConfig) -add_project_dependency(control_libraries 7.0.0 REQUIRED COMPONENTS state_representation) -add_project_dependency(clproto 7.0.0 REQUIRED) add_project_dependency(cppzmq 4.7.1 REQUIRED) include_directories(include) @@ -54,7 +52,7 @@ add_library(${PROJECT_NAME} SHARED ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSubscriber.cpp ) target_include_directories(${PROJECT_NAME} PUBLIC include) -target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq state_representation) +target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq) install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) diff --git a/source/communication_interfaces/communication_interfaces-config.cmake.in b/source/communication_interfaces/communication_interfaces-config.cmake.in index e640599..cc4e2c5 100644 --- a/source/communication_interfaces/communication_interfaces-config.cmake.in +++ b/source/communication_interfaces/communication_interfaces-config.cmake.in @@ -1,6 +1,4 @@ @PACKAGE_INIT@ include(CMakeFindDependencyMacro) -find_dependency(state_representation) -find_dependency(clproto) find_dependency(cppzmq) \ No newline at end of file diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp index bec879a..d4f19cd 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp @@ -1,7 +1,5 @@ #pragma once -#include - #include "communication_interfaces/ByteArray.hpp" namespace communication_interfaces::sockets { @@ -9,7 +7,7 @@ namespace communication_interfaces::sockets { /** * @brief Interface class to define functions required for simple socket communication */ -class ISocket : public state_representation::ParameterMap { +class ISocket { public: /** * @brief Default constructor diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp index 50c7263..8f11ac9 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp @@ -5,19 +5,15 @@ namespace communication_interfaces::sockets { /** + * @class UDPClient * @brief Class to define a UDP client */ class UDPClient : public UDPSocket { public: /** - * @copydoc UDPSocket::UDPSocket() + * @copydoc UDPSocket::UDPSocket(UDPSocketConfiguration) */ - UDPClient(); - - /** - * @copydoc UDPSocket::UDPSocket(const state_representation::ParameterInterfaceList&) - */ - explicit UDPClient(const state_representation::ParameterInterfaceList& parameters); + UDPClient(UDPSocketConfiguration configuration); /** * @copydoc ISocket::open() diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp index b51f51a..7cf23d6 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp @@ -5,19 +5,15 @@ namespace communication_interfaces::sockets { /** + * @class UDPServer * @brief Class to define a UDP server */ class UDPServer : public UDPSocket { public: /** - * @copydoc UDPSocket::UDPSocket() + * @copydoc UDPSocket::UDPSocket(UDPSocketConfiguration) */ - UDPServer(); - - /** - * @copydoc UDPSocket::UDPSocket(const state_representation::ParameterInterfaceList&) - */ - explicit UDPServer(const std::list>& parameters); + UDPServer(UDPSocketConfiguration configuration); /** * @copydoc ISocket::open() diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp index 1a5fdd5..9a2ca76 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp @@ -7,6 +7,19 @@ namespace communication_interfaces::sockets { /** + * @struct UDPSocketConfiguration + * @brief Configuration parameters for a UDP sockets + */ +struct UDPSocketConfiguration { + std::string ip_address; + int port; + int buffer_size; + bool enable_reuse = false; + double timeout_duration_sec = 0.0; +}; + +/** + * @class UDPSocket * @brief Abstract class to define a generic UDP socket */ class UDPSocket : public ISocket { @@ -23,15 +36,10 @@ class UDPSocket : public ISocket { protected: /** - * @brief Add the parameters with no or default value to the map + * @brief Constructor taking the configuration struct + * @param The configuration struct */ - UDPSocket(); - - /** - * @brief Add and set the parameters with the parameters given as argument - * @param parameters The list of parameters - */ - explicit UDPSocket(const state_representation::ParameterInterfaceList& parameters); + explicit UDPSocket(UDPSocketConfiguration configuration); /** * @brief Perform steps to open the socket on the desired IP/port, set reuse and timeout options and bind if desired. @@ -58,14 +66,7 @@ class UDPSocket : public ISocket { sockaddr_in server_address_; ///< Address of the UDP server private: - /** - * @brief Validate and set parameters - * @param parameter A parameter interface pointer - */ - void validate_and_set_parameter(const std::shared_ptr& parameter) override; - - std::shared_ptr> buffer_size_; ///< Maximal size of buffer to receive - + UDPSocketConfiguration config_; ///< Socket configuration struct int server_fd_; ///< File descriptor of the socket socklen_t addr_len_; ///< Length of the socket address }; diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp index b68f654..86e1ea6 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp @@ -5,21 +5,15 @@ namespace communication_interfaces::sockets { /** + * @class ZMQPublisher * @brief Class to define a ZMQ publisher */ class ZMQPublisher : public ZMQSocket { public: /** - * @copydoc ZMQSocket::ZMQSocket(const std::shared_ptr&) + * @copydoc ZMQSocket::ZMQSocket(ZMQSocketConfiguration) */ - explicit ZMQPublisher(const std::shared_ptr& context); - - /** - * @copydoc ZMQSocket::ZMQSocket(const state_representation::ParameterInterfaceList&, const std::shared_ptr&) - */ - explicit ZMQPublisher( - const state_representation::ParameterInterfaceList& parameters, const std::shared_ptr& context - ); + explicit ZMQPublisher(ZMQSocketConfiguration configuration); /** * @copydoc ISocket::open() diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp index 43a058b..c06dcf8 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp @@ -7,6 +7,19 @@ namespace communication_interfaces::sockets { /** + * @struct ZMQSocketConfiguration + * @brief Configuration parameters for a ZMQ socket + */ +struct ZMQSocketConfiguration { + std::shared_ptr context; + std::string ip_address; + std::string port; + bool bind = true; + bool wait = true; +}; + +/** + * @class ZMQSocket * @brief Abstract class to define a generic ZMQ socket */ class ZMQSocket : public ISocket { @@ -32,38 +45,18 @@ class ZMQSocket : public ISocket { bool send_bytes(const ByteArray& buffer) override; protected: - /** - * @brief Add the parameters with no or default value to the map - * @param context The ZMQ context to be used for the sockets - * (it's recommended to only use one context per application) - */ - explicit ZMQSocket(const std::shared_ptr& context); - /** - * @brief Add and set the parameters with the parameters given as argument - * @param parameters The list of parameters - * @param context The ZMQ context to be used for the sockets - * (it's recommended to only use one context per application) + * @brief Constructor taking the configuration struct + * @param The configuration struct */ - ZMQSocket( - const state_representation::ParameterInterfaceList& parameters, const std::shared_ptr& context - ); + ZMQSocket(ZMQSocketConfiguration configuration); /** * @brief Bind or connect the socket on the desired IP/port */ void open_socket(); - std::shared_ptr context_; ///< ZMQ context + ZMQSocketConfiguration config_; ///< Socket configuration struct std::shared_ptr socket_; ///< ZMQ socket - -private: - /** - * @brief Validate and set parameters - * @param parameter A parameter interface pointer - */ - void validate_and_set_parameter(const std::shared_ptr& parameter) final; - - std::shared_ptr> wait_; ///< If false, send and receive are blocking }; } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp index fc723cc..47e3327 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp @@ -5,21 +5,15 @@ namespace communication_interfaces::sockets { /** + * @class ZMQSubscriber * @brief Class to define a ZMQ subscriber */ class ZMQSubscriber : public ZMQSocket { public: /** - * @copydoc ZMQSocket::ZMQSocket(const std::shared_ptr&) + * @copydoc ZMQSocket::ZMQSocket(ZMQSocketConfiguration) */ - explicit ZMQSubscriber(const std::shared_ptr& context); - - /** - * @copydoc ZMQSocket::ZMQSocket(const state_representation::ParameterInterfaceList&, const std::shared_ptr&) - */ - explicit ZMQSubscriber( - const state_representation::ParameterInterfaceList& parameters, const std::shared_ptr& context - ); + explicit ZMQSubscriber(ZMQSocketConfiguration configuration); /** * @copydoc ISocket::open() diff --git a/source/communication_interfaces/src/sockets/UDPClient.cpp b/source/communication_interfaces/src/sockets/UDPClient.cpp index 26e6303..3a8e4ed 100644 --- a/source/communication_interfaces/src/sockets/UDPClient.cpp +++ b/source/communication_interfaces/src/sockets/UDPClient.cpp @@ -2,10 +2,7 @@ namespace communication_interfaces::sockets { -UDPClient::UDPClient() : UDPSocket() {} - -UDPClient::UDPClient(const std::list>& parameters) : - UDPSocket(parameters) {} +UDPClient::UDPClient(UDPSocketConfiguration configuration) : UDPSocket(std::move(configuration)) {} void UDPClient::open() { this->open_socket(false); diff --git a/source/communication_interfaces/src/sockets/UDPServer.cpp b/source/communication_interfaces/src/sockets/UDPServer.cpp index bead783..9dbb265 100644 --- a/source/communication_interfaces/src/sockets/UDPServer.cpp +++ b/source/communication_interfaces/src/sockets/UDPServer.cpp @@ -2,10 +2,7 @@ namespace communication_interfaces::sockets { -UDPServer::UDPServer() : UDPSocket(), client_address_() {} - -UDPServer::UDPServer(const std::list>& parameters) : - UDPSocket(parameters), client_address_() {} +UDPServer::UDPServer(UDPSocketConfiguration configuration) : UDPSocket(std::move(configuration)) {} void UDPServer::open() { this->open_socket(true); diff --git a/source/communication_interfaces/src/sockets/UDPSocket.cpp b/source/communication_interfaces/src/sockets/UDPSocket.cpp index 3d9267a..e59461d 100644 --- a/source/communication_interfaces/src/sockets/UDPSocket.cpp +++ b/source/communication_interfaces/src/sockets/UDPSocket.cpp @@ -1,5 +1,6 @@ #include "communication_interfaces/sockets/UDPSocket.hpp" +#include #include #include @@ -7,20 +8,11 @@ namespace communication_interfaces::sockets { -using namespace state_representation; - -UDPSocket::UDPSocket() : - server_address_(), buffer_size_(std::make_shared>("buffer_size")), server_fd_(), addr_len_() { - this->parameters_.insert_or_assign("ip_address", std::make_shared>("ip_address")); - this->parameters_.insert_or_assign("port", std::make_shared>("port")); - this->parameters_.insert_or_assign("enable_reuse", std::make_shared>("enable_reuse", false)); - this->parameters_.insert_or_assign( - "timeout_duration_sec", std::make_shared>("timeout_duration_sec", 0.0)); - this->parameters_.insert(std::make_pair("buffer_size", this->buffer_size_)); -} - -UDPSocket::UDPSocket(const ParameterInterfaceList& parameters) : UDPSocket() { - this->set_parameters(parameters); +UDPSocket::UDPSocket(UDPSocketConfiguration configuration) : + server_address_(), config_(std::move(configuration)), server_fd_(), addr_len_() { + if (this->config_.buffer_size <= 0) { + throw exceptions::SocketConfigurationException("Configuration parameter 'buffer_size' has to be greater than 0."); + } } UDPSocket::~UDPSocket() { @@ -28,15 +20,11 @@ UDPSocket::~UDPSocket() { } void UDPSocket::open_socket(bool bind_socket) { - if (this->buffer_size_->is_empty()) { - throw exceptions::SocketConfigurationException("Parameter 'buffer_size' is empty."); - } - try { this->addr_len_ = sizeof(this->server_address_); this->server_address_.sin_family = AF_INET; - this->server_address_.sin_addr.s_addr = inet_addr(this->get_parameter_value("ip_address").c_str()); - this->server_address_.sin_port = htons(this->get_parameter_value("port")); + this->server_address_.sin_addr.s_addr = inet_addr(this->config_.ip_address.c_str()); + this->server_address_.sin_port = htons(this->config_.port); } catch (const std::exception& ex) { throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); } @@ -45,7 +33,7 @@ void UDPSocket::open_socket(bool bind_socket) { if (this->server_fd_ < 0) { throw exceptions::SocketConfigurationException("Opening socket failed"); } - if (this->get_parameter_value("enable_reuse")) { + if (this->config_.enable_reuse) { const int opt_reuse = 1; if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); @@ -57,12 +45,12 @@ void UDPSocket::open_socket(bool bind_socket) { } } - auto timeout_duration_sec = this->get_parameter_value("timeout_duration_sec"); - if (timeout_duration_sec > 0.0) { + if (this->config_.timeout_duration_sec > 0.0 + && this->config_.timeout_duration_sec < std::numeric_limits::max()) { timeval timeout{}; - auto secs = std::floor(timeout_duration_sec); + auto secs = std::floor(this->config_.timeout_duration_sec); timeout.tv_sec = static_cast(secs); - timeout.tv_usec = static_cast((timeout_duration_sec - secs) * 1e6); + timeout.tv_usec = static_cast((this->config_.timeout_duration_sec - secs) * 1e6); if (setsockopt(this->server_fd_, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0) { throw exceptions::SocketConfigurationException("Setting socket timeout failed."); } @@ -70,10 +58,9 @@ void UDPSocket::open_socket(bool bind_socket) { } bool UDPSocket::recvfrom(sockaddr_in& address, ByteArray& buffer) { - std::vector local_buffer(this->buffer_size_->get_value()); + std::vector local_buffer(this->config_.buffer_size); auto receive_length = ::recvfrom( - this->server_fd_, local_buffer.data(), this->buffer_size_->get_value(), 0, (sockaddr*) &(address), - &(this->addr_len_)); + this->server_fd_, local_buffer.data(), this->config_.buffer_size, 0, (sockaddr*) &(address), &(this->addr_len_)); if (receive_length < 0) { return false; } @@ -91,33 +78,6 @@ bool UDPSocket::sendto(const sockaddr_in& address, const ByteArray& buffer) cons return send_length == static_cast(local_buffer.size()); } -void UDPSocket::validate_and_set_parameter(const std::shared_ptr& parameter) { - // TODO remove once this check is included in `assert_parameter_valid` - if (this->parameters_.find(parameter->get_name()) == this->parameters_.end()) { - throw state_representation::exceptions::InvalidParameterException( - "Invalid parameter '" + parameter->get_name() + "' for class UDPSocket."); - } - this->assert_parameter_valid(parameter); - if (parameter->is_empty()) { - throw state_representation::exceptions::InvalidParameterException( - "Parameter '" + parameter->get_name() + "' cannot be empty."); - } - if (parameter->get_name() == "timeout_duration_sec" && parameter->get_parameter_value() < 0.0) { - throw state_representation::exceptions::InvalidParameterException( - "Parameter 'timeout_duration_sec' cannot be negative."); - } - if (parameter->get_name() == "buffer_size") { - int value = parameter->get_parameter_value(); - if (value < 0) { - throw state_representation::exceptions::InvalidParameterException( - "Parameter 'buffer_size' cannot be negative."); - } - this->parameters_.at(parameter->get_name())->set_parameter_value(value); - } else { - this->parameters_.insert_or_assign(parameter->get_name(), parameter); - } -} - void UDPSocket::close() { if (this->server_fd_ >= 0) { ::close(this->server_fd_); diff --git a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp index 1beb768..abaceff 100644 --- a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp +++ b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp @@ -2,15 +2,10 @@ namespace communication_interfaces::sockets { -using namespace state_representation; - -ZMQPublisher::ZMQPublisher(const std::shared_ptr& context) : ZMQSocket(context) {} - -ZMQPublisher::ZMQPublisher(const ParameterInterfaceList& parameters, const std::shared_ptr& context) : - ZMQSocket(parameters, context) {} +ZMQPublisher::ZMQPublisher(ZMQSocketConfiguration configuration) : ZMQSocket(std::move(configuration)) {} void ZMQPublisher::open() { - this->socket_ = std::make_shared(*this->context_, ZMQ_PUB); + this->socket_ = std::make_shared(*this->config_.context, ZMQ_PUB); this->open_socket(); } diff --git a/source/communication_interfaces/src/sockets/ZMQSocket.cpp b/source/communication_interfaces/src/sockets/ZMQSocket.cpp index ccabb8f..57e58c3 100644 --- a/source/communication_interfaces/src/sockets/ZMQSocket.cpp +++ b/source/communication_interfaces/src/sockets/ZMQSocket.cpp @@ -4,20 +4,7 @@ namespace communication_interfaces::sockets { -using namespace state_representation; - -ZMQSocket::ZMQSocket(const std::shared_ptr& context) : - context_(context), wait_(std::make_shared>("wait", false)) { - this->parameters_.insert_or_assign("ip_address", std::make_shared>("ip_address")); - this->parameters_.insert_or_assign("port", std::make_shared>("port")); - this->parameters_.insert_or_assign("bind_socket", std::make_shared>("bind_socket")); - this->parameters_.insert(std::make_pair("wait", this->wait_)); -} - -ZMQSocket::ZMQSocket(const ParameterInterfaceList& parameters, const std::shared_ptr& context) : - ZMQSocket(context) { - this->set_parameters(parameters); -} +ZMQSocket::ZMQSocket(ZMQSocketConfiguration configuration) : config_(std::move(configuration)) {} ZMQSocket::~ZMQSocket() { ZMQSocket::close(); @@ -25,9 +12,8 @@ ZMQSocket::~ZMQSocket() { void ZMQSocket::open_socket() { try { - auto address = "tcp://" + this->get_parameter_value("ip_address") + ":" - + this->get_parameter_value("port"); - if (this->get_parameter_value("bind_socket")) { + auto address = "tcp://" + this->config_.ip_address + ":" + this->config_.port; + if (this->config_.bind) { this->socket_->bind(address); } else { this->socket_->connect(address); @@ -38,7 +24,7 @@ void ZMQSocket::open_socket() { } bool ZMQSocket::receive_bytes(ByteArray& buffer) { - zmq::recv_flags recv_flag = this->wait_->get_value() ? zmq::recv_flags::none : zmq::recv_flags::dontwait; + zmq::recv_flags recv_flag = this->config_.wait ? zmq::recv_flags::none : zmq::recv_flags::dontwait; zmq::message_t message; try { auto received = this->socket_->recv(message, recv_flag); @@ -53,7 +39,7 @@ bool ZMQSocket::receive_bytes(ByteArray& buffer) { } bool ZMQSocket::send_bytes(const ByteArray& buffer) { - zmq::send_flags send_flags = this->wait_->get_value() ? zmq::send_flags::none : zmq::send_flags::dontwait; + zmq::send_flags send_flags = this->config_.wait ? zmq::send_flags::none : zmq::send_flags::dontwait; std::vector local_buffer; buffer.copy_to(local_buffer); zmq::message_t msg(local_buffer.begin(), local_buffer.end()); @@ -65,19 +51,6 @@ bool ZMQSocket::send_bytes(const ByteArray& buffer) { } } -void ZMQSocket::validate_and_set_parameter(const std::shared_ptr& parameter) { - this->assert_parameter_valid(parameter); - if (parameter->is_empty()) { - throw state_representation::exceptions::InvalidParameterException( - "Parameter '" + parameter->get_name() + "' cannot be empty."); - } - if (parameter->get_name() == "wait") { - this->parameters_.at(parameter->get_name())->set_parameter_value(parameter->get_parameter_value()); - } else { - this->parameters_.insert_or_assign(parameter->get_name(), parameter); - } -} - void ZMQSocket::close() { if (this->socket_ != nullptr && this->socket_->connected()) { this->socket_->close(); diff --git a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp index 86d0426..ca44bee 100644 --- a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp +++ b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp @@ -2,15 +2,10 @@ namespace communication_interfaces::sockets { -using namespace state_representation; - -ZMQSubscriber::ZMQSubscriber(const std::shared_ptr& context) : ZMQSocket(context) {} - -ZMQSubscriber::ZMQSubscriber(const ParameterInterfaceList& parameters, const std::shared_ptr& context) : - ZMQSocket(parameters, context) {} +ZMQSubscriber::ZMQSubscriber(ZMQSocketConfiguration configuration) : ZMQSocket(std::move(configuration)) {} void ZMQSubscriber::open() { - this->socket_ = std::make_shared(*this->context_, ZMQ_SUB); + this->socket_ = std::make_shared(*this->config_.context, ZMQ_SUB); this->open_socket(); this->socket_->set(zmq::sockopt::conflate, 1); this->socket_->set(zmq::sockopt::subscribe, ""); diff --git a/source/communication_interfaces/test/tests/test_udp_communication.cpp b/source/communication_interfaces/test/tests/test_udp_communication.cpp index d38b441..dcbaa34 100644 --- a/source/communication_interfaces/test/tests/test_udp_communication.cpp +++ b/source/communication_interfaces/test/tests/test_udp_communication.cpp @@ -9,23 +9,21 @@ using namespace communication_interfaces; class TestUDPSockets : public ::testing::Test { public: TestUDPSockets() { - params_.emplace_back(state_representation::make_shared_parameter("ip_address", "127.0.0.1")); - params_.emplace_back(state_representation::make_shared_parameter("port", 5000)); - params_.emplace_back(state_representation::make_shared_parameter("buffer_size", 100)); + config_ = {"127.0.0.1", 5000, 100}; } - std::list> params_; + sockets::UDPSocketConfiguration config_; }; TEST_F(TestUDPSockets, SendReceive) { const std::string send_string = "Hello world!"; // Create server socket and bind it to a port - sockets::UDPServer server(this->params_); + sockets::UDPServer server(this->config_); ASSERT_NO_THROW(server.open()); // Create client socket - sockets::UDPClient client(this->params_); + sockets::UDPClient client(this->config_); ASSERT_NO_THROW(client.open()); // Send test message from client to server @@ -44,10 +42,9 @@ TEST_F(TestUDPSockets, SendReceive) { } TEST_F(TestUDPSockets, Timeout) { - this->params_.emplace_back(state_representation::make_shared_parameter("timeout_duration_sec", 5.0)); - // Create server socket and bind it to a port - sockets::UDPServer server(this->params_); + this->config_.timeout_duration_sec = 5.0; + sockets::UDPServer server(this->config_); // Try to receive a message from client, but expect timeout ByteArray received_bytes; @@ -56,24 +53,24 @@ TEST_F(TestUDPSockets, Timeout) { TEST_F(TestUDPSockets, PortReuse) { // Create server socket and bind it to a port - sockets::UDPServer server1(this->params_); + sockets::UDPServer server1(this->config_); server1.open(); // Try to create a second server socket and bind it to the same port (expect failure) - sockets::UDPServer server2(this->params_); + sockets::UDPServer server2(this->config_); EXPECT_THROW(server2.open(), exceptions::SocketConfigurationException); } TEST_F(TestUDPSockets, OpenClose) { // Create and open server socket - sockets::UDPServer server(this->params_); + sockets::UDPServer server(this->config_); server.open(); // Close server socket server.close(); // Create and open client socket - sockets::UDPClient client(this->params_); + sockets::UDPClient client(this->config_); client.open(); // Try to send a message from the closed server socket (expect failure) diff --git a/source/communication_interfaces/test/tests/test_zmq_communication.cpp b/source/communication_interfaces/test/tests/test_zmq_communication.cpp index 4cc5744..1f934de 100644 --- a/source/communication_interfaces/test/tests/test_zmq_communication.cpp +++ b/source/communication_interfaces/test/tests/test_zmq_communication.cpp @@ -10,24 +10,19 @@ using namespace std::chrono_literals; class TestZMQSockets : public ::testing::Test { public: TestZMQSockets() { - params_.emplace_back(state_representation::make_shared_parameter("ip_address", "127.0.0.1")); - params_.emplace_back(state_representation::make_shared_parameter("port", "5000")); - context_ = std::make_shared(1); - } + auto context = std::make_shared(1); + config_ = {context, "127.0.0.1", "4000"}; + } - std::shared_ptr context_; - state_representation::ParameterInterfaceList params_; + sockets::ZMQSocketConfiguration config_; }; TEST_F(TestZMQSockets, SendReceive) { const std::string send_string = "Hello world!"; - this->params_.emplace_back(state_representation::make_shared_parameter("bind_socket", true)); - sockets::ZMQPublisher publisher(this->params_, this->context_); - - this->params_.pop_back(); - this->params_.emplace_back(state_representation::make_shared_parameter("bind_socket", false)); - sockets::ZMQSubscriber subscriber(this->params_, this->context_); + sockets::ZMQPublisher publisher(this->config_); + this->config_.bind = false; + sockets::ZMQSubscriber subscriber(this->config_); publisher.open(); subscriber.open(); diff --git a/source/dev-server.sh b/source/dev-server.sh index 4274567..47c0c15 100755 --- a/source/dev-server.sh +++ b/source/dev-server.sh @@ -1,5 +1,4 @@ #!/bin/bash -CONTROL_LIBRARIES_BRANCH=develop REMOTE_SSH_PORT=4420 IMAGE_NAME=aica-technology/communication-interfaces diff --git a/source/install.sh b/source/install.sh index fa58529..8bc904c 100644 --- a/source/install.sh +++ b/source/install.sh @@ -71,20 +71,6 @@ while [ "$#" -gt 0 ]; do esac done -$(pkg-config --exists state_representation) || exit 1 -if [ "$?" == 1 ]; then - echo ">>> STATE REPRESENTATION LIBRARY NOT FOUND!" - echo ">>> Install state_representation from https://github.com/aica-technology/control-libraries/source" - exit 1 -fi - -$(pkg-config --exists clproto) || exit 1 -if [ "$?" == 1 ]; then - echo ">>> CL PROTO LIBRARY NOT FOUND!" - echo ">>> Install clproto from https://github.com/aica-technology/control-libraries/protocol" - exit 1 -fi - echo ">>> INSTALLING ZMQ DEPENDENCIES" install_cppzmq || exit 1 From 3ced0bdd92482cd562c1c41933d0dd588719bb71 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Thu, 25 May 2023 22:15:17 +0200 Subject: [PATCH 6/9] Remove ByteArray (#40) * Update README * Remove ByteArray * Apply suggestions from code review Co-authored-by: Enrico Eberhard <32450951+eeberhard@users.noreply.github.com> --------- Co-authored-by: Enrico Eberhard <32450951+eeberhard@users.noreply.github.com> --- source/README.md | 40 ++++----- .../communication_interfaces/CMakeLists.txt | 1 - .../communication_interfaces/ByteArray.hpp | 83 ------------------- .../sockets/ISocket.hpp | 6 +- .../sockets/UDPClient.hpp | 8 +- .../sockets/UDPServer.hpp | 8 +- .../sockets/UDPSocket.hpp | 4 +- .../sockets/ZMQPublisher.hpp | 2 +- .../sockets/ZMQSocket.hpp | 8 +- .../sockets/ZMQSubscriber.hpp | 2 +- .../src/ByteArray.cpp | 72 ---------------- .../src/sockets/UDPClient.cpp | 4 +- .../src/sockets/UDPServer.cpp | 4 +- .../src/sockets/UDPSocket.cpp | 17 ++-- .../src/sockets/ZMQPublisher.cpp | 2 +- .../src/sockets/ZMQSocket.cpp | 17 ++-- .../src/sockets/ZMQSubscriber.cpp | 2 +- .../test/tests/test_byte_array.cpp | 26 ------ .../test/tests/test_udp_communication.cpp | 16 ++-- .../test/tests/test_zmq_communication.cpp | 12 +-- 20 files changed, 71 insertions(+), 263 deletions(-) delete mode 100644 source/communication_interfaces/include/communication_interfaces/ByteArray.hpp delete mode 100644 source/communication_interfaces/src/ByteArray.cpp delete mode 100644 source/communication_interfaces/test/tests/test_byte_array.cpp diff --git a/source/README.md b/source/README.md index c10218e..7ad135b 100644 --- a/source/README.md +++ b/source/README.md @@ -3,21 +3,19 @@ The `ISocket` class is an interface for simple socket communication, defining functions for opening a socket, sending and receiving bytes, and closing the socket connection. -This interface extends from `state_representation::ParameterMap`, which allows for adding and retrieving parameters for -the socket connection. Further, the interface utilizes the `ByteArray` class to conveniently send and receive data in -the form of a dynamic array of bytes. - The `ISocket` class defines an `open()` method to perform configuration steps to open the socket for communication. If opening the socket fails, an exception is thrown. The `close()` method is also provided to perform steps to disconnect and close the socket communication. -The functions `receive_bytes(ByteArray&)` and `send_bytes(const ByteArray&)` perform the read and write logic of the socket +The functions `receive_bytes(std::string&)` and `send_bytes(const std::string&)` perform the read and write logic of the socket respectively. ### Implementing a derived socket class To use this class, create a subclass that inherits from it and implement its pure virtual functions. The pure virtual -functions are `open()`, `receive_bytes(ByteArray&)`, and `send_bytes(const ByteArray&)`. +functions are `open()`, `receive_bytes(std::string&)`, and `send_bytes(const std::string&)`. + +Configuration parameters should be passed with a configuration struct, resulting in a single argument constructor. The `close()` function can optionally be overridden to perform steps to disconnect and close the socket communication. If a derived class defines any cleanup behavior in `close()`, it should also be invoked statically and explicitly @@ -27,15 +25,23 @@ An example is given below. ```c++ // DerivedSocket.hpp + +struct DerivedSocketConfig { + int param1; + double param2; +}; + class DerivedSocket : ISocket { public: + DerivedSocket(DerivedSocketConfig configuration); + ~DerivedSocket() override; void open() override; - bool receive_bytes(ByteArray& buffer) override; + bool receive_bytes(std::string& buffer) override; - bool send_bytes(const ByteArray& buffer) override; + bool send_bytes(const std::string& buffer) override; void close() override; } @@ -43,6 +49,10 @@ public: ```c++ // DerivedSocket.cpp +DerivedSocket::DerivedSocket(DerivedSocketConfig configuraiton) { + // save configuration parameters for later use +} + DerivedSocket::~DerivedSocket() { DerivedSocket::close(); } @@ -51,12 +61,12 @@ void DerivedSocket::open() { // Configure and open the socket } -bool DerivedSocket::receive_bytes(ByteArray& buffer) { +bool DerivedSocket::receive_bytes(std::string& buffer) { // Read the contents of the socket into the buffer and return true on success. Otherwise, return false. return true; } -bool DerivedSocket::send_bytes(const ByteArray& buffer) { +bool DerivedSocket::send_bytes(const std::string& buffer) { // Write the contents of the buffer onto the socket and return true on success. Otherwise, return false. return true; } @@ -65,13 +75,3 @@ void DerivedSocket::close() { // Perform clean-up steps here } ``` - -## Byte Array - -The `ByteArray` class is a container for a dynamic array of bytes (i.e. `char`). It provides methods for loading and -unloading data types to and from the ByteArray. The class acts as an interface definition to raw data (in case the -underlying structure of the raw data changes). Its intended use is for socket communication. - -To use this class, create a `ByteArray` object and use its member functions to load and unload data to and from the -buffer. The default behavior is to append/remove data from the end of the buffer. The class provides methods to copy the -buffer to and from a std::string or a std::vector. diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index 0fff133..9a156a2 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -43,7 +43,6 @@ add_project_dependency(cppzmq 4.7.1 REQUIRED) include_directories(include) add_library(${PROJECT_NAME} SHARED - ${PROJECT_SOURCE_DIR}/src/ByteArray.cpp ${PROJECT_SOURCE_DIR}/src/sockets/UDPSocket.cpp ${PROJECT_SOURCE_DIR}/src/sockets/UDPClient.cpp ${PROJECT_SOURCE_DIR}/src/sockets/UDPServer.cpp diff --git a/source/communication_interfaces/include/communication_interfaces/ByteArray.hpp b/source/communication_interfaces/include/communication_interfaces/ByteArray.hpp deleted file mode 100644 index 05ddd0c..0000000 --- a/source/communication_interfaces/include/communication_interfaces/ByteArray.hpp +++ /dev/null @@ -1,83 +0,0 @@ -#pragma once - -#include -#include -#include - -namespace communication_interfaces { - -/** - * @brief The ByteArray wraps a dynamic array of bytes (i.e. char). - * @details The ByteArray provides convenient methods for loading and unloading data types to and from the ByteArray. - * The class actas as an interface definition to raw data (in case the underlying structure of the raw data changes). - * Its intended use is for socket communication. - * By default, data is appended/removed from the end of the array using the load/unload methods. - */ -class ByteArray { -public: - /** - * @brief Default constructor to create an empty ByteArray - */ - ByteArray(); - - ~ByteArray() = default; - - /** - * @brief Clear the buffer - */ - void reset(); - - /** - * @brief Load a void* (treated as char*) into the buffer - * @param value The value to load - * @param size The number of bytes to load - * @return True on success, false otherwise - */ - bool load(const void* value, const std::size_t& size); - - /** - * @brief Unload a void* (treated as char*) from the buffer - * @param value The value to unload - * @param size The number of bytes to unload - * @return True on success, false otherwise - */ - bool unload(void* value, const std::size_t& size); - - /** - * @brief Copy the content of the buffer to a std::string - * @param value The string to copy to - */ - void copy_to(std::string& value) const; - - /** - * @brief Copy the content of the buffer to a std::vector of char - * @param value The vector to copy into - */ - void copy_to(std::vector& value) const; - - /** - * @brief Set the buffer content from a std::string - * @param value The string to copy from - */ - void copy_from(const std::string& value); - - /** - * @brief Set the ByteArray buffer from a std::vector of char - * @param value The vector to copy from - */ - void copy_from(const std::vector& value); - - /** - * @brief Get the current size of the buffer - */ - [[nodiscard]] unsigned int get_buffer_size() const; - - /** - * @brief Get the maximal size of the buffer - */ - [[nodiscard]] unsigned int get_max_buffer_size() const; - -private: - std::deque buffer_; -}; -}// namespace communication_interfaces \ No newline at end of file diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp index d4f19cd..2badfe7 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp @@ -1,6 +1,6 @@ #pragma once -#include "communication_interfaces/ByteArray.hpp" +#include namespace communication_interfaces::sockets { @@ -30,14 +30,14 @@ class ISocket { * @param buffer The buffer to fill with the received bytes * @return True if bytes were received, false otherwise */ - virtual bool receive_bytes(ByteArray& buffer) = 0; + virtual bool receive_bytes(std::string& buffer) = 0; /** * @brief Send bytes to the socket * @param buffer The buffer with the bytes to send * @return True if bytes were sent, false otherwise */ - virtual bool send_bytes(const ByteArray& buffer) = 0; + virtual bool send_bytes(const std::string& buffer) = 0; /** * @brief Perform steps to disconnect and close the socket communication diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp index 8f11ac9..e27ecff 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp @@ -21,13 +21,13 @@ class UDPClient : public UDPSocket { void open() override; /** - * @copydoc ISocket::receive_bytes(ByteArray&) + * @copydoc ISocket::receive_bytes(std::string&) */ - bool receive_bytes(ByteArray& buffer) override; + bool receive_bytes(std::string& buffer) override; /** - * @copydoc ISocket::send_bytes(const ByteArray&) + * @copydoc ISocket::send_bytes(const std::string&) */ - bool send_bytes(const ByteArray& buffer) override; + bool send_bytes(const std::string& buffer) override; }; } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp index 7cf23d6..ce39a46 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp @@ -21,14 +21,14 @@ class UDPServer : public UDPSocket { void open() override; /** - * @copydoc ISocket::receive_bytes(ByteArray&) + * @copydoc ISocket::receive_bytes(std::string&) */ - bool receive_bytes(ByteArray& buffer) override; + bool receive_bytes(std::string& buffer) override; /** - * @copydoc ISocket::send_bytes(const ByteArray&) + * @copydoc ISocket::send_bytes(const std::string&) */ - bool send_bytes(const ByteArray& buffer) override; + bool send_bytes(const std::string& buffer) override; private: sockaddr_in client_address_; diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp index 9a2ca76..f5bc55d 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp @@ -53,7 +53,7 @@ class UDPSocket : public ISocket { * @param buffer The buffer to fill with the received bytes * @return True if bytes were received, false otherwise */ - [[nodiscard]] bool recvfrom(sockaddr_in& address, ByteArray& buffer); + [[nodiscard]] bool recvfrom(sockaddr_in& address, std::string& buffer); /** * @brief Send bytes to the socket @@ -61,7 +61,7 @@ class UDPSocket : public ISocket { * @param buffer The buffer with the bytes to send * @return True if bytes were sent, false otherwise */ - [[nodiscard]] bool sendto(const sockaddr_in& address, const ByteArray& buffer) const; + [[nodiscard]] bool sendto(const sockaddr_in& address, const std::string& buffer) const; sockaddr_in server_address_; ///< Address of the UDP server diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp index 86e1ea6..1fdc38c 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp @@ -23,6 +23,6 @@ class ZMQPublisher : public ZMQSocket { /** * @brief This method throws a runtime error as receiving is not available for a ZMQ publisher */ - bool receive_bytes(ByteArray& buffer) override; + bool receive_bytes(std::string& buffer) override; }; } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp index c06dcf8..53f8500 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp @@ -35,14 +35,14 @@ class ZMQSocket : public ISocket { void close() override; /** - * @copydoc ISocket::receive_bytes(ByteArray&) + * @copydoc ISocket::receive_bytes(std::string&) */ - bool receive_bytes(ByteArray& buffer) override; + bool receive_bytes(std::string& buffer) override; /** - * @copydoc ISocket::send_bytes(ByteArray&) + * @copydoc ISocket::send_bytes(const std::string&) */ - bool send_bytes(const ByteArray& buffer) override; + bool send_bytes(const std::string& buffer) override; protected: /** diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp index 47e3327..fd4a62c 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp @@ -23,6 +23,6 @@ class ZMQSubscriber : public ZMQSocket { /** * @brief This method throws a runtime error as sending is not available for a ZMQ publisher */ - bool send_bytes(const ByteArray& buffer) override; + bool send_bytes(const std::string& buffer) override; }; } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/ByteArray.cpp b/source/communication_interfaces/src/ByteArray.cpp deleted file mode 100644 index 558e66f..0000000 --- a/source/communication_interfaces/src/ByteArray.cpp +++ /dev/null @@ -1,72 +0,0 @@ -#include "communication_interfaces/ByteArray.hpp" - -namespace communication_interfaces { - -ByteArray::ByteArray() { - this->reset(); -} - -void ByteArray::reset() { - this->buffer_.clear(); -} - -bool ByteArray::load(const void* value, const size_t& size) { - if (value == nullptr) { - return false; - } - if (this->get_buffer_size() + size > this->get_max_buffer_size()) { - return false; - } - try { - char* byte_ptr = (char*)value; - this->buffer_.insert(this->buffer_.end(), byte_ptr, byte_ptr + size); - return true; - } catch (const std::exception&) { - return false; - } -} - -bool ByteArray::unload(void* value, const std::size_t& size) { - if (value == nullptr) { - return false; - } - if (size > this->get_buffer_size()) { - return false; - } - try { - std::copy(this->buffer_.end() - static_cast(size), this->buffer_.end(), (char*)value); - this->buffer_.erase(this->buffer_.end() - static_cast(size), this->buffer_.end()); - return true; - } catch (const std::exception&) { - return false; - } -} - -void ByteArray::copy_to(std::vector& value) const { - value.assign(this->buffer_.begin(), this->buffer_.end()); -} - -void ByteArray::copy_to(std::string& value) const { - std::vector tmp; - this->copy_to(tmp); - value = std::string(tmp.data(), tmp.size()); -} - -void ByteArray::copy_from(const std::vector& value) { - this->reset(); - this->load(value.data(), value.size()); -} - -void ByteArray::copy_from(const std::string& value) { - this->reset(); - this->load(value.c_str(), value.size()); -} - -unsigned int ByteArray::get_buffer_size() const { - return this->buffer_.size(); -} - -unsigned int ByteArray::get_max_buffer_size() const { - return this->buffer_.max_size(); -} -}// namespace communication_interfaces diff --git a/source/communication_interfaces/src/sockets/UDPClient.cpp b/source/communication_interfaces/src/sockets/UDPClient.cpp index 3a8e4ed..52e3da4 100644 --- a/source/communication_interfaces/src/sockets/UDPClient.cpp +++ b/source/communication_interfaces/src/sockets/UDPClient.cpp @@ -8,11 +8,11 @@ void UDPClient::open() { this->open_socket(false); } -bool UDPClient::receive_bytes(ByteArray& buffer) { +bool UDPClient::receive_bytes(std::string& buffer) { return this->recvfrom(this->server_address_, buffer); } -bool UDPClient::send_bytes(const ByteArray& buffer) { +bool UDPClient::send_bytes(const std::string& buffer) { return this->sendto(this->server_address_, buffer); } } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPServer.cpp b/source/communication_interfaces/src/sockets/UDPServer.cpp index 9dbb265..d392b57 100644 --- a/source/communication_interfaces/src/sockets/UDPServer.cpp +++ b/source/communication_interfaces/src/sockets/UDPServer.cpp @@ -8,11 +8,11 @@ void UDPServer::open() { this->open_socket(true); } -bool UDPServer::receive_bytes(ByteArray& buffer) { +bool UDPServer::receive_bytes(std::string& buffer) { return this->recvfrom(this->client_address_, buffer); } -bool UDPServer::send_bytes(const ByteArray& buffer) { +bool UDPServer::send_bytes(const std::string& buffer) { return this->sendto(this->client_address_, buffer); } } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPSocket.cpp b/source/communication_interfaces/src/sockets/UDPSocket.cpp index e59461d..7ee0d88 100644 --- a/source/communication_interfaces/src/sockets/UDPSocket.cpp +++ b/source/communication_interfaces/src/sockets/UDPSocket.cpp @@ -1,8 +1,9 @@ #include "communication_interfaces/sockets/UDPSocket.hpp" #include -#include #include +#include +#include #include "communication_interfaces/exceptions/SocketConfigurationException.hpp" @@ -57,25 +58,21 @@ void UDPSocket::open_socket(bool bind_socket) { } } -bool UDPSocket::recvfrom(sockaddr_in& address, ByteArray& buffer) { +bool UDPSocket::recvfrom(sockaddr_in& address, std::string& buffer) { std::vector local_buffer(this->config_.buffer_size); auto receive_length = ::recvfrom( this->server_fd_, local_buffer.data(), this->config_.buffer_size, 0, (sockaddr*) &(address), &(this->addr_len_)); if (receive_length < 0) { return false; } - local_buffer.at(receive_length) = 0; - buffer.reset(); - buffer.load(local_buffer.data(), receive_length); + buffer.assign(local_buffer.data(), local_buffer.size()); return true; } -bool UDPSocket::sendto(const sockaddr_in& address, const ByteArray& buffer) const { - std::vector local_buffer; - buffer.copy_to(local_buffer); +bool UDPSocket::sendto(const sockaddr_in& address, const std::string& buffer) const { int send_length = ::sendto( - this->server_fd_, local_buffer.data(), local_buffer.size(), 0, (sockaddr*) &(address), this->addr_len_); - return send_length == static_cast(local_buffer.size()); + this->server_fd_, buffer.data(), buffer.size(), 0, (sockaddr*) &(address), this->addr_len_); + return send_length == static_cast(buffer.size()); } void UDPSocket::close() { diff --git a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp index abaceff..48c9486 100644 --- a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp +++ b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp @@ -9,7 +9,7 @@ void ZMQPublisher::open() { this->open_socket(); } -bool ZMQPublisher::receive_bytes(ByteArray&) { +bool ZMQPublisher::receive_bytes(std::string&) { throw std::runtime_error("Receive not available for socket of type ZMQPublisher"); } } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSocket.cpp b/source/communication_interfaces/src/sockets/ZMQSocket.cpp index 57e58c3..85f2c8d 100644 --- a/source/communication_interfaces/src/sockets/ZMQSocket.cpp +++ b/source/communication_interfaces/src/sockets/ZMQSocket.cpp @@ -23,14 +23,13 @@ void ZMQSocket::open_socket() { } } -bool ZMQSocket::receive_bytes(ByteArray& buffer) { +bool ZMQSocket::receive_bytes(std::string& buffer) { zmq::recv_flags recv_flag = this->config_.wait ? zmq::recv_flags::none : zmq::recv_flags::dontwait; zmq::message_t message; try { auto received = this->socket_->recv(message, recv_flag); if (received.has_value()) { - buffer.reset(); - buffer.load(message.data(), message.size()); + buffer = std::string(static_cast(message.data()), message.size()); } return received.has_value(); } catch (const zmq::error_t&) { @@ -38,14 +37,16 @@ bool ZMQSocket::receive_bytes(ByteArray& buffer) { } } -bool ZMQSocket::send_bytes(const ByteArray& buffer) { +bool ZMQSocket::send_bytes(const std::string& buffer) { zmq::send_flags send_flags = this->config_.wait ? zmq::send_flags::none : zmq::send_flags::dontwait; - std::vector local_buffer; - buffer.copy_to(local_buffer); - zmq::message_t msg(local_buffer.begin(), local_buffer.end()); + zmq::message_t msg(buffer.size()); + memcpy(msg.data(), buffer.data(), buffer.size()); try { auto sent = this->socket_->send(msg, send_flags); - return sent.has_value(); + if (!sent.has_value()) { + return false; + } + return *sent == buffer.size(); } catch (const zmq::error_t&) { return false; } diff --git a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp index ca44bee..e9a8d94 100644 --- a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp +++ b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp @@ -11,7 +11,7 @@ void ZMQSubscriber::open() { this->socket_->set(zmq::sockopt::subscribe, ""); } -bool ZMQSubscriber::send_bytes(const ByteArray&) { +bool ZMQSubscriber::send_bytes(const std::string&) { throw std::runtime_error("Send not available for socket of type ZMQSubscriber"); } } // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/tests/test_byte_array.cpp b/source/communication_interfaces/test/tests/test_byte_array.cpp deleted file mode 100644 index 4de17b6..0000000 --- a/source/communication_interfaces/test/tests/test_byte_array.cpp +++ /dev/null @@ -1,26 +0,0 @@ -#include - -#include "communication_interfaces/ByteArray.hpp" - -using namespace communication_interfaces; - -TEST(TestByteArray, ByteArray) { - std::string message = "test"; - auto data = ByteArray(); - data.copy_from(message); - EXPECT_EQ(data.get_buffer_size(), message.size()); - std::string result; - data.copy_to(result); - EXPECT_STREQ(result.c_str(), message.c_str()); - - data.load(message.c_str(), message.size()); - EXPECT_EQ(data.get_buffer_size(), 2 * message.size()); - data.copy_to(result); - EXPECT_STREQ(result.c_str(), (message + message).c_str()); - char* raw_result = new char[message.size()]; - data.unload(raw_result, message.size()); - EXPECT_EQ(data.get_buffer_size(), message.size()); - - data.reset(); - EXPECT_EQ(data.get_buffer_size(), 0); -} diff --git a/source/communication_interfaces/test/tests/test_udp_communication.cpp b/source/communication_interfaces/test/tests/test_udp_communication.cpp index dcbaa34..98aabe0 100644 --- a/source/communication_interfaces/test/tests/test_udp_communication.cpp +++ b/source/communication_interfaces/test/tests/test_udp_communication.cpp @@ -17,6 +17,7 @@ class TestUDPSockets : public ::testing::Test { TEST_F(TestUDPSockets, SendReceive) { const std::string send_string = "Hello world!"; + std::string receive_string; // Create server socket and bind it to a port sockets::UDPServer server(this->config_); @@ -27,18 +28,13 @@ TEST_F(TestUDPSockets, SendReceive) { ASSERT_NO_THROW(client.open()); // Send test message from client to server - ByteArray message; - message.copy_from(send_string); - ASSERT_TRUE(client.send_bytes(message)); + ASSERT_TRUE(client.send_bytes(send_string)); // Receive message on server - message.reset(); - ASSERT_TRUE(server.receive_bytes(message)); + ASSERT_TRUE(server.receive_bytes(receive_string)); // Convert received message to string and compare with sent message - std::string received_string; - message.copy_to(received_string); - EXPECT_STREQ(received_string.c_str(), send_string.c_str()); + EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); } TEST_F(TestUDPSockets, Timeout) { @@ -47,7 +43,7 @@ TEST_F(TestUDPSockets, Timeout) { sockets::UDPServer server(this->config_); // Try to receive a message from client, but expect timeout - ByteArray received_bytes; + std::string received_bytes; EXPECT_FALSE(server.receive_bytes(received_bytes)); } @@ -74,5 +70,5 @@ TEST_F(TestUDPSockets, OpenClose) { client.open(); // Try to send a message from the closed server socket (expect failure) - EXPECT_FALSE(server.send_bytes(ByteArray())); + EXPECT_FALSE(server.send_bytes(std::string())); } diff --git a/source/communication_interfaces/test/tests/test_zmq_communication.cpp b/source/communication_interfaces/test/tests/test_zmq_communication.cpp index 1f934de..bddfdb6 100644 --- a/source/communication_interfaces/test/tests/test_zmq_communication.cpp +++ b/source/communication_interfaces/test/tests/test_zmq_communication.cpp @@ -19,6 +19,7 @@ class TestZMQSockets : public ::testing::Test { TEST_F(TestZMQSockets, SendReceive) { const std::string send_string = "Hello world!"; + std::string receive_string; sockets::ZMQPublisher publisher(this->config_); this->config_.bind = false; @@ -27,17 +28,12 @@ TEST_F(TestZMQSockets, SendReceive) { publisher.open(); subscriber.open(); - ByteArray message; - message.copy_from(send_string); for (int i = 0; i < 5; ++i) { - EXPECT_TRUE(publisher.send_bytes(message)); + EXPECT_TRUE(publisher.send_bytes(send_string)); usleep(10000); } - message.reset(); - ASSERT_TRUE(subscriber.receive_bytes(message)); - std::string received_string; - message.copy_to(received_string); - EXPECT_STREQ(received_string.c_str(), send_string.c_str()); + ASSERT_TRUE(subscriber.receive_bytes(receive_string)); + EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); publisher.close(); subscriber.close(); } From 8dd9d53d322f785dcc79a983e7c99adb0b08f06f Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Fri, 26 May 2023 12:06:34 +0200 Subject: [PATCH 7/9] Abstract ZMQ publisher and subscriber into one class (#36) * Add class that combines ZMQ publisher and subscriber * Update tests --- .../communication_interfaces/CMakeLists.txt | 5 +- .../sockets/ZMQPublisherSubscriber.hpp | 68 +++++++++++++++++++ .../sockets/ZMQSocket.hpp | 2 +- .../src/sockets/ZMQPublisherSubscriber.cpp | 41 +++++++++++ .../test/tests/test_zmq_communication.cpp | 35 +++++++++- 5 files changed, 146 insertions(+), 5 deletions(-) create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp create mode 100644 source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index 9a156a2..a23e542 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.15) -set (COMMUNICATION_INTERFACES_VERSION 0.0.1) +set(COMMUNICATION_INTERFACES_VERSION 0.0.1) project(communication_interfaces VERSION ${COMMUNICATION_INTERFACES_VERSION}) option(BUILD_TESTING "Build tests." OFF) @@ -49,7 +49,8 @@ add_library(${PROJECT_NAME} SHARED ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSocket.cpp ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisher.cpp ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSubscriber.cpp -) + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisherSubscriber.cpp + ) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq) diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp new file mode 100644 index 0000000..fcadd94 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include "communication_interfaces/sockets/ISocket.hpp" +#include "communication_interfaces/sockets/ZMQPublisher.hpp" +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct ZMQCombinedSocketsConfiguration + * @brief Configuration parameters for a the combination of a ZMQ Publisher and Pubscriber socket + */ +struct ZMQCombinedSocketsConfiguration { + std::shared_ptr context; + std::string ip_address; + std::string publisher_port; + std::string subscriber_port; + bool bind_publisher = true; + bool bind_subscriber = true; + bool wait = false; +}; + +/** + * @brief A class that combines both a ZMQ Publisher and Subscriber socket into one single object + */ +class ZMQPublisherSubscriber : public ISocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit ZMQPublisherSubscriber(ZMQCombinedSocketsConfiguration configuration); + + /** + * @brief Close the sockets by calling ZMQPublisherSubscriber::close() + */ + ~ZMQPublisherSubscriber() override; + + /** + * @brief Open the internal ZMQ Publisher and Subscriber sockets for communication + * @throws SocketConfigurationException if opening fails + */ + void open() override; + + /** + * @brief Receive bytes from the internal ZMQ Subscriber socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @brief Send bytes with the internal ZMQ Publisher socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + bool send_bytes(const std::string& buffer) override; + + /** + * @brief Close the sockets + */ + void close() override; + +private: + std::shared_ptr pub_; ///< ZMQ publisher + std::shared_ptr sub_; ///< ZMQ subscriber +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp index 53f8500..433dcb2 100644 --- a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp @@ -15,7 +15,7 @@ struct ZMQSocketConfiguration { std::string ip_address; std::string port; bool bind = true; - bool wait = true; + bool wait = false; }; /** diff --git a/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp new file mode 100644 index 0000000..cdbf5f6 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp @@ -0,0 +1,41 @@ +#include "communication_interfaces/sockets/ZMQPublisherSubscriber.hpp" + +namespace communication_interfaces::sockets { + +ZMQPublisherSubscriber::ZMQPublisherSubscriber(ZMQCombinedSocketsConfiguration configuration) { + this->pub_ = std::make_shared( + ZMQSocketConfiguration( + { + configuration.context, configuration.ip_address, configuration.publisher_port, + configuration.bind_publisher, configuration.wait + })); + this->sub_ = std::make_shared( + ZMQSocketConfiguration( + { + configuration.context, configuration.ip_address, configuration.subscriber_port, + configuration.bind_subscriber, configuration.wait + })); +} + +ZMQPublisherSubscriber::~ZMQPublisherSubscriber() { + ZMQPublisherSubscriber::close(); +} + +void ZMQPublisherSubscriber::open() { + this->pub_->open(); + this->sub_->open(); +} + +bool ZMQPublisherSubscriber::receive_bytes(std::string& buffer) { + return this->sub_->receive_bytes(buffer); +} + +bool ZMQPublisherSubscriber::send_bytes(const std::string& buffer) { + return this->pub_->send_bytes(buffer); +} + +void ZMQPublisherSubscriber::close() { + this->pub_->close(); + this->sub_->close(); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/tests/test_zmq_communication.cpp b/source/communication_interfaces/test/tests/test_zmq_communication.cpp index bddfdb6..a939689 100644 --- a/source/communication_interfaces/test/tests/test_zmq_communication.cpp +++ b/source/communication_interfaces/test/tests/test_zmq_communication.cpp @@ -3,6 +3,7 @@ #include "communication_interfaces/sockets/ZMQPublisher.hpp" #include "communication_interfaces/sockets/ZMQSubscriber.hpp" +#include "communication_interfaces/sockets/ZMQPublisherSubscriber.hpp" using namespace communication_interfaces; using namespace std::chrono_literals; @@ -28,12 +29,42 @@ TEST_F(TestZMQSockets, SendReceive) { publisher.open(); subscriber.open(); - for (int i = 0; i < 5; ++i) { + while (!subscriber.receive_bytes(receive_string)) { EXPECT_TRUE(publisher.send_bytes(send_string)); usleep(10000); } - ASSERT_TRUE(subscriber.receive_bytes(receive_string)); EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); publisher.close(); subscriber.close(); } + +TEST_F(TestZMQSockets, SendReceiveCombined) { + const std::string server_send_string = "Hello client!"; + const std::string client_send_string = "Hello server!"; + std::string server_receive_string, client_receive_string; + + sockets::ZMQCombinedSocketsConfiguration server_config = {config_.context, config_.ip_address, "5001", "5002"}; + sockets::ZMQPublisherSubscriber server(server_config); + + sockets::ZMQCombinedSocketsConfiguration + client_config = {config_.context, config_.ip_address, "5002", "5001", false, false}; + sockets::ZMQPublisherSubscriber client(client_config); + + server.open(); + client.open(); + + while (!client.receive_bytes(client_receive_string)) { + EXPECT_TRUE(server.send_bytes(server_send_string)); + usleep(10000); + } + EXPECT_STREQ(client_receive_string.c_str(), server_send_string.c_str()); + + while (!server.receive_bytes(server_receive_string)) { + EXPECT_TRUE(client.send_bytes(client_send_string)); + usleep(10000); + } + EXPECT_STREQ(server_receive_string.c_str(), client_send_string.c_str()); + + server.close(); + client.close(); +} From c2600a16475c79177a40378bcc6ee75ffe822e5c Mon Sep 17 00:00:00 2001 From: Louis Brunner Date: Tue, 30 May 2023 09:23:02 +0100 Subject: [PATCH 8/9] feat(ci): add an automated build for network-interfaces that can be copied into a ros2-modulo-control image (#39) --- .dockerignore | 5 + .github/workflows/build-release.yaml | 53 +++++++++++ .github/workflows/manual-dispatch.yml | 13 ++- CMakeLists.txt | 19 ++++ Dockerfile.ci | 95 +++++++++++++++++++ cpp/CMakeLists.txt | 6 +- .../communication_interfaces/CMakeLists.txt | 4 +- 7 files changed, 187 insertions(+), 8 deletions(-) create mode 100644 .dockerignore create mode 100644 .github/workflows/build-release.yaml create mode 100644 CMakeLists.txt create mode 100644 Dockerfile.ci diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c9d111c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +.github +/*.sh +/*.md +Dockerfile +LICENSE diff --git a/.github/workflows/build-release.yaml b/.github/workflows/build-release.yaml new file mode 100644 index 0000000..79a89d4 --- /dev/null +++ b/.github/workflows/build-release.yaml @@ -0,0 +1,53 @@ +name: Build and Push Multi-Arch Images + +on: + push: + branches: + - develop + - main + tags: + - "v*.*.*" + +jobs: + get-tag: + runs-on: ubuntu-latest + name: Get tag + outputs: + tag: ${{ steps.parse-tag.outputs.tag }} + steps: + - uses: actions/checkout@v3 + - uses: aica-technology/.github/.github/actions/docker-tag-from-git@v0.5.0 + id: parse-tag + + build: + needs: [get-tag] + strategy: + matrix: + arch: [amd64, arm64] + include: + - image: ubuntu-latest + - image: buildjet-2vcpu-ubuntu-2204-arm + arch: arm64 + + runs-on: ${{ matrix.image }} + name: Build and publish (${{ matrix.arch }}) + steps: + - uses: actions/checkout@v3 + - uses: aica-technology/.github/.github/actions/ghcr-build@v0.5.0 + with: + image_name: aica-technology/network-interfaces + image_tags: ${{ needs.get-tag.outputs.tag }}-${{ matrix.arch }} + dockerfile_path: Dockerfile.ci + token: ${{ secrets.GITHUB_TOKEN }} + + multi-arch: + runs-on: ubuntu-latest + name: Merge into a multi-arch image + needs: [get-tag, build] + steps: + - uses: aica-technology/.github/.github/actions/ghcr-manifest-merge@v0.5.0 + with: + image_name: aica-technology/network-interfaces + image_tags: ${{ needs.get-tag.outputs.tag }} + archs: amd64,arm64 + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/manual-dispatch.yml b/.github/workflows/manual-dispatch.yml index fb3eb40..84c3340 100644 --- a/.github/workflows/manual-dispatch.yml +++ b/.github/workflows/manual-dispatch.yml @@ -5,9 +5,12 @@ on: workflow_dispatch: inputs: cl_branch: - description: 'If set, the desired branch of control libraries' + description: "If set, the desired branch of control libraries" required: false - default: 'develop' + default: "develop" + tag: + description: "The tag to use" + required: true jobs: build-publish: @@ -19,7 +22,7 @@ jobs: - name: Build image run: | - IMAGE_NAME=network-interfaces:latest + IMAGE_NAME=network-interfaces:${{ inputs.tag }} docker build . \ --build-arg CONTROL_LIBRARIES_BRANCH=${{ inputs.cl_branch }} \ --tag ${IMAGE_NAME} @@ -31,7 +34,7 @@ jobs: - name: Push image run: | - IMAGE_NAME=network-interfaces:latest + IMAGE_NAME=network-interfaces:${{ inputs.tag }} docker tag ${IMAGE_NAME} ghcr.io/${{ github.repository_owner }}/${IMAGE_NAME} docker push ghcr.io/${{ github.repository_owner }}/${IMAGE_NAME} - shell: bash \ No newline at end of file + shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..2110e6c --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.15) +project(all_network_interfaces) + +include(FetchContent) +option(CPPZMQ_BUILD_TESTS OFF) +FetchContent_Declare( + cppzmq + GIT_REPOSITORY https://github.com/zeromq/cppzmq/ + GIT_TAG v4.7.1 +) +FetchContent_MakeAvailable(cppzmq) + +add_subdirectory(cpp) +add_subdirectory(source/communication_interfaces) + +if(BUILD_TESTING) + # reexport the test target defined in the subdirectories + enable_testing() +endif() diff --git a/Dockerfile.ci b/Dockerfile.ci new file mode 100644 index 0000000..7c8e13a --- /dev/null +++ b/Dockerfile.ci @@ -0,0 +1,95 @@ +FROM ghcr.io/aica-technology/ros2-modulo-control:humble as base + +FROM base as apt-dependencies +RUN < /tmp/new-packages.txt +# Then we install apt packages like normal +sudo apt-get install -y ${PACKAGES} +# Finally we use dpkg to get all files installed by those packages and copy them to a new root +# - get list of files installed by all the packages +# - remove empty lines +# - sort +# - remove duplicates +# - copy files while keeping file hierarchy and preserving links as-is +# - remove "omitting directory" messages (we don't do recursive copy as we only want specific files) for cleaner output +xargs -a /tmp/new-packages.txt dpkg-query -L \ + | sed '/^$/d' | sort | uniq \ + | xargs -d "\n" cp --parents -dp -t /tmp/apt 2>&1 \ + | grep -v 'omitting directory' +# this root can then be copied to / to install everything globally or use LD_LIBRARY_PATH to use it locally +HEREDOC + +FROM base as python +COPY --chown=${USER} ./python /python +RUN \ + --mount=type=cache,target=${HOME}/.cache,id=pip-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + python3 -m pip install --prefix=/tmp/python /python + +FROM base as code +WORKDIR /src +COPY --from=apt-dependencies /tmp/apt / +COPY --chown=${USER} . /src + +FROM code as development +COPY --from=python /tmp/python/local/lib/python3.10/dist-packages/ ${HOME}/.local/lib/python3.10/site-packages/ + +FROM code as build +RUN \ + --mount=type=cache,target=./build,id=cmake-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + cmake -B build \ + && cmake --build build + +FROM build as test +RUN \ + --mount=type=cache,target=./build,id=cmake-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + cmake -B build -DBUILD_TESTING=ON \ + && CTEST_OUTPUT_ON_FAILURE=1 cmake --build build --target all test +COPY --from=python /tmp/python/local/lib/python3.10/dist-packages/ ${HOME}/.local/lib/python3.10/site-packages/ +RUN python3 -m unittest discover python/test --verbose + +FROM build as install +RUN \ + --mount=type=cache,target=./build,id=cmake-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + cmake -B build -DCMAKE_INSTALL_PREFIX=/tmp/net-ifaces \ + && cmake --build build --target all install + +FROM scratch as production +COPY --from=apt-dependencies /tmp/apt / +COPY --from=install /tmp/net-ifaces /usr/local +COPY --from=python /tmp/python/local/lib/python3.10/dist-packages/ /home/ros2/.local/lib/python3.10/site-packages/ diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 520b4dd..a430ed5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -40,7 +40,9 @@ include(FindPkgConfig) add_project_dependency(control_libraries 7.0.0 REQUIRED COMPONENTS state_representation) add_project_dependency(clproto 7.0.0 REQUIRED) -add_project_dependency(cppzmq 4.7.1 REQUIRED) +if (NOT ${cppzmq_FOUND}) # provided by parent CMakeLists.txt (if used) + add_project_dependency(cppzmq 4.7.1 REQUIRED) +endif() include_directories(include) @@ -93,4 +95,4 @@ if(${PKG_CONFIG_FOUND}) DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig/ COMPONENT pkgconfig) else() message(WARNING "Could not find pkg-config executable, skipping generation of pkg-config files.") -endif() \ No newline at end of file +endif() diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index a23e542..ce9256e 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -38,7 +38,9 @@ include(GNUInstallDirs) include(CMakePackageConfigHelpers) include(FindPkgConfig) -add_project_dependency(cppzmq 4.7.1 REQUIRED) +if (NOT ${cppzmq_FOUND}) # provided by parent CMakeLists.txt (if used) + add_project_dependency(cppzmq 4.7.1 REQUIRED) +endif() include_directories(include) From 97ec4489917a9c3f58979faa64adf54aa4e00af0 Mon Sep 17 00:00:00 2001 From: Dominic Reber <71256590+domire8@users.noreply.github.com> Date: Tue, 30 May 2023 11:26:57 +0200 Subject: [PATCH 9/9] Add TCP server and client classes (#34) * Add TCP server and client classes * Update tests * Remove connect from TCP sockets --- .../communication_interfaces/CMakeLists.txt | 4 +- .../sockets/TCPClient.hpp | 34 +++++++++++ .../sockets/TCPServer.hpp | 46 +++++++++++++++ .../sockets/TCPSocket.hpp | 43 ++++++++++++++ .../src/sockets/TCPClient.cpp | 32 ++++++++++ .../src/sockets/TCPServer.cpp | 58 +++++++++++++++++++ .../src/sockets/TCPSocket.cpp | 38 ++++++++++++ .../test/tests/test_tcp_communication.cpp | 54 +++++++++++++++++ 8 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp create mode 100644 source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp create mode 100644 source/communication_interfaces/src/sockets/TCPClient.cpp create mode 100644 source/communication_interfaces/src/sockets/TCPServer.cpp create mode 100644 source/communication_interfaces/src/sockets/TCPSocket.cpp create mode 100644 source/communication_interfaces/test/tests/test_tcp_communication.cpp diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt index ce9256e..83d7500 100644 --- a/source/communication_interfaces/CMakeLists.txt +++ b/source/communication_interfaces/CMakeLists.txt @@ -52,7 +52,9 @@ add_library(${PROJECT_NAME} SHARED ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisher.cpp ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSubscriber.cpp ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisherSubscriber.cpp - ) + ${PROJECT_SOURCE_DIR}/src/sockets/TCPSocket.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/TCPClient.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/TCPServer.cpp) target_include_directories(${PROJECT_NAME} PUBLIC include) target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq) diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp new file mode 100644 index 0000000..e4e7ed0 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "communication_interfaces/sockets/TCPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct TCPClientConfiguration + * @brief Configuration parameters for a TCP client + */ +struct TCPClientConfiguration { + std::string ip_address; + int port; + int buffer_size; +}; + +class TCPClient : public TCPSocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit TCPClient(TCPClientConfiguration configuration); + + /** + * @copydoc ISocket::open() + * @details Connect the client socket to the server + */ + void open() override; + +private: + TCPClientConfiguration config_; ///< Socket configuration struct +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp new file mode 100644 index 0000000..78c8c2c --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "communication_interfaces/sockets/TCPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct TCPServerConfiguration + * @brief Configuration parameters for a TCP server + */ +struct TCPServerConfiguration { + int port; + int buffer_size; + bool enable_reuse; +}; + +class TCPServer : public TCPSocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit TCPServer(TCPServerConfiguration configuration); + + /** + * @brief Close the sockets by calling TCPServer::close() + */ + ~TCPServer() override; + + /** + * @copydoc ISocket::open() + * @details Wait for connection requests from clients and accept new connections. This method blocks until a + * connection is established + */ + void open() override; + + /** + * @brief Close the sockets + */ + void close() override; + +private: + TCPServerConfiguration config_; ///< Socket configuration struct + int server_fd_; ///< File descriptor of the connected server socket +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp new file mode 100644 index 0000000..8ceba43 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Abstract class to define a generic TCP socket + * @details TCP is a connection-based communication protocol. Hence, TCP sockets need to implement an additional + * interface method `connect()`. + */ +class TCPSocket : public ISocket { +public: + /** + * @brief Close the socket by calling TCPSocket::close() + */ + ~TCPSocket() override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool send_bytes(const std::string& buffer) override; + + /** + * @brief Close the socket + */ + void close() override; + +protected: + explicit TCPSocket(int buffer_size); + + sockaddr_in server_address_; ///< Address of the TCP server + int socket_fd_; ///< File descriptor of the socket + int buffer_size_; ///< Buffer size +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPClient.cpp b/source/communication_interfaces/src/sockets/TCPClient.cpp new file mode 100644 index 0000000..a8609d9 --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPClient.cpp @@ -0,0 +1,32 @@ +#include "communication_interfaces/sockets/TCPClient.hpp" + +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPClient::TCPClient(TCPClientConfiguration configuration) : TCPSocket(configuration.buffer_size), config_(configuration) {} + +void TCPClient::open() { + try { + bzero((char*) &this->server_address_, sizeof(this->server_address_)); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_port = htons(this->config_.port); + if (inet_pton(AF_INET, this->config_.ip_address.c_str(), &this->server_address_.sin_addr) <= 0) { + throw std::invalid_argument("IP Address not supported"); + } + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + this->socket_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (this->socket_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + + if (::connect(this->socket_fd_, (sockaddr*) &this->server_address_, sizeof(this->server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Connecting client failed"); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPServer.cpp b/source/communication_interfaces/src/sockets/TCPServer.cpp new file mode 100644 index 0000000..6acdf6d --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPServer.cpp @@ -0,0 +1,58 @@ +#include "communication_interfaces/sockets/TCPServer.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPServer::TCPServer(TCPServerConfiguration configuration) : + TCPSocket(configuration.buffer_size), config_(configuration), server_fd_() { +} + +TCPServer::~TCPServer() { + TCPServer::close(); +} + +void TCPServer::open() { + try { + bzero((char*) &this->server_address_, sizeof(this->server_address_)); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_addr.s_addr = htonl(INADDR_ANY); + this->server_address_.sin_port = htons(this->config_.port); + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + // open stream oriented socket with internet address + this->server_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (this->server_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + if (this->config_.enable_reuse) { + const int opt_reuse = 1; + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); + } + } + if (bind(this->server_fd_, (sockaddr*) &(this->server_address_), sizeof(this->server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Binding socket failed"); + } + // listen for only 1 request at a time + listen(this->server_fd_, 1); + // receive a request from client using accept + sockaddr_in new_socket_address{}; + socklen_t new_addr_len = sizeof(new_socket_address); + // accept, create a new socket descriptor to handle the new connection with client + this->socket_fd_ = accept(this->server_fd_, (sockaddr*) &new_socket_address, &new_addr_len); + if (this->socket_fd_ < 0) { + throw exceptions::SocketConfigurationException("Connecting server failed"); + } +} + +void TCPServer::close() { + ::close(this->server_fd_); + TCPSocket::close(); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPSocket.cpp b/source/communication_interfaces/src/sockets/TCPSocket.cpp new file mode 100644 index 0000000..6668b0a --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPSocket.cpp @@ -0,0 +1,38 @@ +#include "communication_interfaces/sockets/TCPSocket.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPSocket::TCPSocket(int buffer_size) : server_address_(), socket_fd_(), buffer_size_(buffer_size) { + if (buffer_size <= 0) { + throw exceptions::SocketConfigurationException("Configuration parameter 'buffer_size' has to be greater than 0."); + } +} + +TCPSocket::~TCPSocket() { + TCPSocket::close(); +} + +bool TCPSocket::receive_bytes(std::string& buffer) { + std::vector local_buffer(this->buffer_size_); + auto receive_length = recv(this->socket_fd_, local_buffer.data(), this->buffer_size_, 0); + if (receive_length < 0) { + return false; + } + buffer.assign(local_buffer.data(), local_buffer.size()); + return true; +} + +bool TCPSocket::send_bytes(const std::string& buffer) { + int send_length = send(this->socket_fd_, buffer.data(), buffer.size(), 0); + return send_length == static_cast(buffer.size()); +} + +void TCPSocket::close() { + ::close(this->socket_fd_); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/tests/test_tcp_communication.cpp b/source/communication_interfaces/test/tests/test_tcp_communication.cpp new file mode 100644 index 0000000..b3f21c2 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_tcp_communication.cpp @@ -0,0 +1,54 @@ +#include +#include + +#include "communication_interfaces/sockets/TCPClient.hpp" +#include "communication_interfaces/sockets/TCPServer.hpp" + +using namespace communication_interfaces; + +class TestTCPSockets : public ::testing::Test { +public: + TestTCPSockets() { + server_ = std::make_shared(sockets::TCPServerConfiguration{6000, 50, true}); + client_ = std::make_shared(sockets::TCPClientConfiguration{"127.0.0.1", 6000, 50}); + } + + std::thread start_server() { + return std::thread([this] { this->serve(); }); + } + + void serve() const { + server_->open(); + std::string recv_message; + EXPECT_TRUE(server_->receive_bytes(recv_message)); + EXPECT_STREQ(recv_message.c_str(), client_message_.c_str()); + EXPECT_TRUE(server_->send_bytes(server_message_)); + } + + std::thread start_client() { + return std::thread([this] { this->client(); }); + } + + void client() const { + client_->open(); + EXPECT_TRUE(client_->send_bytes(client_message_)); + std::string recv_message; + EXPECT_TRUE(client_->receive_bytes(recv_message)); + EXPECT_STREQ(recv_message.c_str(), server_message_.c_str()); + } + + std::string client_message_ = "Hello server"; + std::string server_message_ = "Hello client"; + + std::shared_ptr client_; + std::shared_ptr server_; +}; + +TEST_F(TestTCPSockets, TestCommunication) { + std::thread server = start_server(); + usleep(100000); + std::thread client = start_client(); + + client.join(); + server.join(); +}