diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c9d111c --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +.github +/*.sh +/*.md +Dockerfile +LICENSE diff --git a/.github/workflows/build-release.yaml b/.github/workflows/build-release.yaml new file mode 100644 index 0000000..79a89d4 --- /dev/null +++ b/.github/workflows/build-release.yaml @@ -0,0 +1,53 @@ +name: Build and Push Multi-Arch Images + +on: + push: + branches: + - develop + - main + tags: + - "v*.*.*" + +jobs: + get-tag: + runs-on: ubuntu-latest + name: Get tag + outputs: + tag: ${{ steps.parse-tag.outputs.tag }} + steps: + - uses: actions/checkout@v3 + - uses: aica-technology/.github/.github/actions/docker-tag-from-git@v0.5.0 + id: parse-tag + + build: + needs: [get-tag] + strategy: + matrix: + arch: [amd64, arm64] + include: + - image: ubuntu-latest + - image: buildjet-2vcpu-ubuntu-2204-arm + arch: arm64 + + runs-on: ${{ matrix.image }} + name: Build and publish (${{ matrix.arch }}) + steps: + - uses: actions/checkout@v3 + - uses: aica-technology/.github/.github/actions/ghcr-build@v0.5.0 + with: + image_name: aica-technology/network-interfaces + image_tags: ${{ needs.get-tag.outputs.tag }}-${{ matrix.arch }} + dockerfile_path: Dockerfile.ci + token: ${{ secrets.GITHUB_TOKEN }} + + multi-arch: + runs-on: ubuntu-latest + name: Merge into a multi-arch image + needs: [get-tag, build] + steps: + - uses: aica-technology/.github/.github/actions/ghcr-manifest-merge@v0.5.0 + with: + image_name: aica-technology/network-interfaces + image_tags: ${{ needs.get-tag.outputs.tag }} + archs: amd64,arm64 + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/manual-dispatch.yml b/.github/workflows/manual-dispatch.yml index fb3eb40..84c3340 100644 --- a/.github/workflows/manual-dispatch.yml +++ b/.github/workflows/manual-dispatch.yml @@ -5,9 +5,12 @@ on: workflow_dispatch: inputs: cl_branch: - description: 'If set, the desired branch of control libraries' + description: "If set, the desired branch of control libraries" required: false - default: 'develop' + default: "develop" + tag: + description: "The tag to use" + required: true jobs: build-publish: @@ -19,7 +22,7 @@ jobs: - name: Build image run: | - IMAGE_NAME=network-interfaces:latest + IMAGE_NAME=network-interfaces:${{ inputs.tag }} docker build . \ --build-arg CONTROL_LIBRARIES_BRANCH=${{ inputs.cl_branch }} \ --tag ${IMAGE_NAME} @@ -31,7 +34,7 @@ jobs: - name: Push image run: | - IMAGE_NAME=network-interfaces:latest + IMAGE_NAME=network-interfaces:${{ inputs.tag }} docker tag ${IMAGE_NAME} ghcr.io/${{ github.repository_owner }}/${IMAGE_NAME} docker push ghcr.io/${{ github.repository_owner }}/${IMAGE_NAME} - shell: bash \ No newline at end of file + shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..2110e6c --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.15) +project(all_network_interfaces) + +include(FetchContent) +option(CPPZMQ_BUILD_TESTS OFF) +FetchContent_Declare( + cppzmq + GIT_REPOSITORY https://github.com/zeromq/cppzmq/ + GIT_TAG v4.7.1 +) +FetchContent_MakeAvailable(cppzmq) + +add_subdirectory(cpp) +add_subdirectory(source/communication_interfaces) + +if(BUILD_TESTING) + # reexport the test target defined in the subdirectories + enable_testing() +endif() diff --git a/Dockerfile.ci b/Dockerfile.ci new file mode 100644 index 0000000..7c8e13a --- /dev/null +++ b/Dockerfile.ci @@ -0,0 +1,95 @@ +FROM ghcr.io/aica-technology/ros2-modulo-control:humble as base + +FROM base as apt-dependencies +RUN < /tmp/new-packages.txt +# Then we install apt packages like normal +sudo apt-get install -y ${PACKAGES} +# Finally we use dpkg to get all files installed by those packages and copy them to a new root +# - get list of files installed by all the packages +# - remove empty lines +# - sort +# - remove duplicates +# - copy files while keeping file hierarchy and preserving links as-is +# - remove "omitting directory" messages (we don't do recursive copy as we only want specific files) for cleaner output +xargs -a /tmp/new-packages.txt dpkg-query -L \ + | sed '/^$/d' | sort | uniq \ + | xargs -d "\n" cp --parents -dp -t /tmp/apt 2>&1 \ + | grep -v 'omitting directory' +# this root can then be copied to / to install everything globally or use LD_LIBRARY_PATH to use it locally +HEREDOC + +FROM base as python +COPY --chown=${USER} ./python /python +RUN \ + --mount=type=cache,target=${HOME}/.cache,id=pip-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + python3 -m pip install --prefix=/tmp/python /python + +FROM base as code +WORKDIR /src +COPY --from=apt-dependencies /tmp/apt / +COPY --chown=${USER} . /src + +FROM code as development +COPY --from=python /tmp/python/local/lib/python3.10/dist-packages/ ${HOME}/.local/lib/python3.10/site-packages/ + +FROM code as build +RUN \ + --mount=type=cache,target=./build,id=cmake-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + cmake -B build \ + && cmake --build build + +FROM build as test +RUN \ + --mount=type=cache,target=./build,id=cmake-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + cmake -B build -DBUILD_TESTING=ON \ + && CTEST_OUTPUT_ON_FAILURE=1 cmake --build build --target all test +COPY --from=python /tmp/python/local/lib/python3.10/dist-packages/ ${HOME}/.local/lib/python3.10/site-packages/ +RUN python3 -m unittest discover python/test --verbose + +FROM build as install +RUN \ + --mount=type=cache,target=./build,id=cmake-${TARGETPLATFORM},uid=1000 \ + --mount=type=ssh,uid=1000 \ + cmake -B build -DCMAKE_INSTALL_PREFIX=/tmp/net-ifaces \ + && cmake --build build --target all install + +FROM scratch as production +COPY --from=apt-dependencies /tmp/apt / +COPY --from=install /tmp/net-ifaces /usr/local +COPY --from=python /tmp/python/local/lib/python3.10/dist-packages/ /home/ros2/.local/lib/python3.10/site-packages/ diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 520b4dd..a430ed5 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -40,7 +40,9 @@ include(FindPkgConfig) add_project_dependency(control_libraries 7.0.0 REQUIRED COMPONENTS state_representation) add_project_dependency(clproto 7.0.0 REQUIRED) -add_project_dependency(cppzmq 4.7.1 REQUIRED) +if (NOT ${cppzmq_FOUND}) # provided by parent CMakeLists.txt (if used) + add_project_dependency(cppzmq 4.7.1 REQUIRED) +endif() include_directories(include) @@ -93,4 +95,4 @@ if(${PKG_CONFIG_FOUND}) DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig/ COMPONENT pkgconfig) else() message(WARNING "Could not find pkg-config executable, skipping generation of pkg-config files.") -endif() \ No newline at end of file +endif() diff --git a/source/Dockerfile b/source/Dockerfile new file mode 100644 index 0000000..7c2b323 --- /dev/null +++ b/source/Dockerfile @@ -0,0 +1,15 @@ +FROM ghcr.io/aica-technology/control-libraries/development-dependencies as source-dependencies + +RUN apt-get update && apt-get install -y \ + libzmq3-dev \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +WORKDIR /tmp + +ARG CPPZMQ_VERSION=4.7.1 +RUN wget https://github.com/zeromq/cppzmq/archive/v${CPPZMQ_VERSION}.tar.gz -O cppzmq-${CPPZMQ_VERSION}.tar.gz +RUN tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz +WORKDIR /tmp/cppzmq-${CPPZMQ_VERSION} +RUN mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install + +RUN rm -rf /tmp/* diff --git a/source/README.md b/source/README.md new file mode 100644 index 0000000..7ad135b --- /dev/null +++ b/source/README.md @@ -0,0 +1,77 @@ +## Socket Interface + +The `ISocket` class is an interface for simple socket communication, defining functions for opening a socket, +sending and receiving bytes, and closing the socket connection. + +The `ISocket` class defines an `open()` method to perform configuration steps to open the socket for communication. +If opening the socket fails, an exception is thrown. The `close()` method is also provided to perform steps to disconnect +and close the socket communication. + +The functions `receive_bytes(std::string&)` and `send_bytes(const std::string&)` perform the read and write logic of the socket +respectively. + +### Implementing a derived socket class + +To use this class, create a subclass that inherits from it and implement its pure virtual functions. The pure virtual +functions are `open()`, `receive_bytes(std::string&)`, and `send_bytes(const std::string&)`. + +Configuration parameters should be passed with a configuration struct, resulting in a single argument constructor. + +The `close()` function can optionally be overridden to perform steps to disconnect and close the socket communication. +If a derived class defines any cleanup behavior in `close()`, it should also be invoked statically and explicitly +in the destructor of the derived class. + +An example is given below. + +```c++ +// DerivedSocket.hpp + +struct DerivedSocketConfig { + int param1; + double param2; +}; + +class DerivedSocket : ISocket { +public: + DerivedSocket(DerivedSocketConfig configuration); + + ~DerivedSocket() override; + + void open() override; + + bool receive_bytes(std::string& buffer) override; + + bool send_bytes(const std::string& buffer) override; + + void close() override; +} +``` + +```c++ +// DerivedSocket.cpp +DerivedSocket::DerivedSocket(DerivedSocketConfig configuraiton) { + // save configuration parameters for later use +} + +DerivedSocket::~DerivedSocket() { + DerivedSocket::close(); +} + +void DerivedSocket::open() { + // Configure and open the socket +} + +bool DerivedSocket::receive_bytes(std::string& buffer) { + // Read the contents of the socket into the buffer and return true on success. Otherwise, return false. + return true; +} + +bool DerivedSocket::send_bytes(const std::string& buffer) { + // Write the contents of the buffer onto the socket and return true on success. Otherwise, return false. + return true; +} + +void DerivedSocket::close() { + // Perform clean-up steps here +} +``` diff --git a/source/communication_interfaces/CMakeLists.txt b/source/communication_interfaces/CMakeLists.txt new file mode 100644 index 0000000..83d7500 --- /dev/null +++ b/source/communication_interfaces/CMakeLists.txt @@ -0,0 +1,107 @@ +cmake_minimum_required(VERSION 3.15) + +set(COMMUNICATION_INTERFACES_VERSION 0.0.1) +project(communication_interfaces VERSION ${COMMUNICATION_INTERFACES_VERSION}) + +option(BUILD_TESTING "Build tests." OFF) + +# Default to C99 +if(NOT CMAKE_C_STANDARD) + set(CMAKE_C_STANDARD 99) +endif() + +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 17) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +if(BUILD_TESTING) + enable_testing() + find_package(GTest REQUIRED) +else() + find_package(GTest QUIET) +endif() + +macro(add_project_dependency) + find_package(${ARGV}) + if(PKG_EXTERNAL_DEPS) + set(PKG_EXTERNAL_DEPS "${PKG_EXTERNAL_DEPS}, ${ARGV0} >= ${ARGV1}") + else() + set(PKG_EXTERNAL_DEPS "${ARGV0} >= ${ARGV1}") + endif() +endmacro() + +include(GNUInstallDirs) +include(CMakePackageConfigHelpers) +include(FindPkgConfig) + +if (NOT ${cppzmq_FOUND}) # provided by parent CMakeLists.txt (if used) + add_project_dependency(cppzmq 4.7.1 REQUIRED) +endif() + +include_directories(include) + +add_library(${PROJECT_NAME} SHARED + ${PROJECT_SOURCE_DIR}/src/sockets/UDPSocket.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/UDPClient.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/UDPServer.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSocket.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisher.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQSubscriber.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/ZMQPublisherSubscriber.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/TCPSocket.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/TCPClient.cpp + ${PROJECT_SOURCE_DIR}/src/sockets/TCPServer.cpp) +target_include_directories(${PROJECT_NAME} PUBLIC include) +target_link_libraries(${PROJECT_NAME} PUBLIC cppzmq) + +install(DIRECTORY include/ DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}) + +install(TARGETS ${PROJECT_NAME} + ARCHIVE DESTINATION lib + LIBRARY DESTINATION lib + RUNTIME DESTINATION bin + ) + +if(BUILD_TESTING) + add_executable(test_${PROJECT_NAME} test/test_communication_interfaces.cpp) + file(GLOB_RECURSE MODULE_TEST_SOURCES test/tests test_*.cpp) + target_sources(test_${PROJECT_NAME} PRIVATE ${MODULE_TEST_SOURCES}) + target_link_libraries(test_${PROJECT_NAME} ${PROJECT_NAME} ${GTEST_LIBRARIES} pthread) + add_test(NAME test_${PROJECT_NAME} COMMAND test_${PROJECT_NAME}) +endif() + + +# generate the version file for the config file +write_basic_package_version_file( + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake" + VERSION "${PROJECT_VERSION}" + COMPATIBILITY SameMajorVersion +) + +# create config file +configure_package_config_file(${CMAKE_CURRENT_SOURCE_DIR}/${PROJECT_NAME}-config.cmake.in + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake" + INSTALL_DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME} + ) + +# install config files +install(FILES + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}Config.cmake" + "${CMAKE_CURRENT_BINARY_DIR}/${PROJECT_NAME}ConfigVersion.cmake" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME} + ) + +if(${PKG_CONFIG_FOUND}) + set(PKG_NAME ${PROJECT_NAME}) + set(PKG_DESC "Communication Interfaces") + set(pkg_conf_file "${PROJECT_NAME}.pc") + configure_file("${pkg_conf_file}.in" "${CMAKE_BINARY_DIR}/${pkg_conf_file}" @ONLY) + install(FILES "${CMAKE_BINARY_DIR}/${pkg_conf_file}" + DESTINATION ${CMAKE_INSTALL_LIBDIR}/pkgconfig/ COMPONENT pkgconfig) +else() + message(WARNING "Could not find pkg-config executable, skipping generation of pkg-config files.") +endif() diff --git a/source/communication_interfaces/communication_interfaces-config.cmake.in b/source/communication_interfaces/communication_interfaces-config.cmake.in new file mode 100644 index 0000000..cc4e2c5 --- /dev/null +++ b/source/communication_interfaces/communication_interfaces-config.cmake.in @@ -0,0 +1,4 @@ +@PACKAGE_INIT@ + +include(CMakeFindDependencyMacro) +find_dependency(cppzmq) \ No newline at end of file diff --git a/source/communication_interfaces/communication_interfaces.pc.in b/source/communication_interfaces/communication_interfaces.pc.in new file mode 100644 index 0000000..a415858 --- /dev/null +++ b/source/communication_interfaces/communication_interfaces.pc.in @@ -0,0 +1,9 @@ +prefix=@CMAKE_INSTALL_PREFIX@ +exec_prefix=${prefix} +includedir=@CMAKE_INSTALL_FULL_INCLUDEDIR@ + +Name: @PKG_NAME@ +Description: @PKG_DESC@ +Version: @COMMUNICATION_INTERFACES_VERSION@ +Requires: @PKG_EXTERNAL_DEPS@ +Cflags: -I${includedir} diff --git a/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp b/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp new file mode 100644 index 0000000..28f5f93 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/exceptions/SocketConfigurationException.hpp @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace communication_interfaces::exceptions { + +/* + * @class SocketConfigurationException + * @brief Exception that is thrown when a socket configuration failed + */ +class SocketConfigurationException : public std::runtime_error { +public: + explicit SocketConfigurationException(const std::string& msg) : runtime_error(msg) {}; +}; +} // namespace communication_interfaces::exceptions diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp new file mode 100644 index 0000000..2badfe7 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ISocket.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include + +namespace communication_interfaces::sockets { + +/** + * @brief Interface class to define functions required for simple socket communication + */ +class ISocket { +public: + /** + * @brief Default constructor + */ + ISocket() = default; + + /** + * @brief Default destructor + */ + virtual ~ISocket() = default; + + /** + * @brief Perform configuration steps to open the socket for communication + * @throws SocketConfigurationException if opening fails + */ + virtual void open() = 0; + + /** + * @brief Receive bytes from the socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + virtual bool receive_bytes(std::string& buffer) = 0; + + /** + * @brief Send bytes to the socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + virtual bool send_bytes(const std::string& buffer) = 0; + + /** + * @brief Perform steps to disconnect and close the socket communication + */ + virtual void close() {} +}; +}// namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp new file mode 100644 index 0000000..e4e7ed0 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPClient.hpp @@ -0,0 +1,34 @@ +#pragma once + +#include "communication_interfaces/sockets/TCPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct TCPClientConfiguration + * @brief Configuration parameters for a TCP client + */ +struct TCPClientConfiguration { + std::string ip_address; + int port; + int buffer_size; +}; + +class TCPClient : public TCPSocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit TCPClient(TCPClientConfiguration configuration); + + /** + * @copydoc ISocket::open() + * @details Connect the client socket to the server + */ + void open() override; + +private: + TCPClientConfiguration config_; ///< Socket configuration struct +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp new file mode 100644 index 0000000..78c8c2c --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPServer.hpp @@ -0,0 +1,46 @@ +#pragma once + +#include "communication_interfaces/sockets/TCPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct TCPServerConfiguration + * @brief Configuration parameters for a TCP server + */ +struct TCPServerConfiguration { + int port; + int buffer_size; + bool enable_reuse; +}; + +class TCPServer : public TCPSocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit TCPServer(TCPServerConfiguration configuration); + + /** + * @brief Close the sockets by calling TCPServer::close() + */ + ~TCPServer() override; + + /** + * @copydoc ISocket::open() + * @details Wait for connection requests from clients and accept new connections. This method blocks until a + * connection is established + */ + void open() override; + + /** + * @brief Close the sockets + */ + void close() override; + +private: + TCPServerConfiguration config_; ///< Socket configuration struct + int server_fd_; ///< File descriptor of the connected server socket +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp new file mode 100644 index 0000000..8ceba43 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/TCPSocket.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @brief Abstract class to define a generic TCP socket + * @details TCP is a connection-based communication protocol. Hence, TCP sockets need to implement an additional + * interface method `connect()`. + */ +class TCPSocket : public ISocket { +public: + /** + * @brief Close the socket by calling TCPSocket::close() + */ + ~TCPSocket() override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool send_bytes(const std::string& buffer) override; + + /** + * @brief Close the socket + */ + void close() override; + +protected: + explicit TCPSocket(int buffer_size); + + sockaddr_in server_address_; ///< Address of the TCP server + int socket_fd_; ///< File descriptor of the socket + int buffer_size_; ///< Buffer size +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp new file mode 100644 index 0000000..e27ecff --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPClient.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include "communication_interfaces/sockets/UDPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class UDPClient + * @brief Class to define a UDP client + */ +class UDPClient : public UDPSocket { +public: + /** + * @copydoc UDPSocket::UDPSocket(UDPSocketConfiguration) + */ + UDPClient(UDPSocketConfiguration configuration); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::send_bytes(const std::string&) + */ + bool send_bytes(const std::string& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp new file mode 100644 index 0000000..ce39a46 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPServer.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include "communication_interfaces/sockets/UDPSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class UDPServer + * @brief Class to define a UDP server + */ +class UDPServer : public UDPSocket { +public: + /** + * @copydoc UDPSocket::UDPSocket(UDPSocketConfiguration) + */ + UDPServer(UDPSocketConfiguration configuration); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::send_bytes(const std::string&) + */ + bool send_bytes(const std::string& buffer) override; + +private: + sockaddr_in client_address_; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp new file mode 100644 index 0000000..f5bc55d --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/UDPSocket.hpp @@ -0,0 +1,73 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct UDPSocketConfiguration + * @brief Configuration parameters for a UDP sockets + */ +struct UDPSocketConfiguration { + std::string ip_address; + int port; + int buffer_size; + bool enable_reuse = false; + double timeout_duration_sec = 0.0; +}; + +/** + * @class UDPSocket + * @brief Abstract class to define a generic UDP socket + */ +class UDPSocket : public ISocket { +public: + /** + * @brief Close the socket by calling UDPSocket::close() + */ + ~UDPSocket() override; + + /** + * @brief Close the socket + */ + void close() override; + +protected: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit UDPSocket(UDPSocketConfiguration configuration); + + /** + * @brief Perform steps to open the socket on the desired IP/port, set reuse and timeout options and bind if desired. + * @param bind_socket If true, bind the socket (for a UDP server), no binding otherwise (for a UDP client) + */ + void open_socket(bool bind_socket); + + /** + * @brief Receive bytes from the socket + * @param address Reference to a sockaddr_in structure in which the sending address is to be stored + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + [[nodiscard]] bool recvfrom(sockaddr_in& address, std::string& buffer); + + /** + * @brief Send bytes to the socket + * @param address Reference to a sockaddr_in structure containing the destination address + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + [[nodiscard]] bool sendto(const sockaddr_in& address, const std::string& buffer) const; + + sockaddr_in server_address_; ///< Address of the UDP server + +private: + UDPSocketConfiguration config_; ///< Socket configuration struct + int server_fd_; ///< File descriptor of the socket + socklen_t addr_len_; ///< Length of the socket address +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp new file mode 100644 index 0000000..1fdc38c --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisher.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class ZMQPublisher + * @brief Class to define a ZMQ publisher + */ +class ZMQPublisher : public ZMQSocket { +public: + /** + * @copydoc ZMQSocket::ZMQSocket(ZMQSocketConfiguration) + */ + explicit ZMQPublisher(ZMQSocketConfiguration configuration); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @brief This method throws a runtime error as receiving is not available for a ZMQ publisher + */ + bool receive_bytes(std::string& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp new file mode 100644 index 0000000..fcadd94 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQPublisherSubscriber.hpp @@ -0,0 +1,68 @@ +#pragma once + +#include "communication_interfaces/sockets/ISocket.hpp" +#include "communication_interfaces/sockets/ZMQPublisher.hpp" +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct ZMQCombinedSocketsConfiguration + * @brief Configuration parameters for a the combination of a ZMQ Publisher and Pubscriber socket + */ +struct ZMQCombinedSocketsConfiguration { + std::shared_ptr context; + std::string ip_address; + std::string publisher_port; + std::string subscriber_port; + bool bind_publisher = true; + bool bind_subscriber = true; + bool wait = false; +}; + +/** + * @brief A class that combines both a ZMQ Publisher and Subscriber socket into one single object + */ +class ZMQPublisherSubscriber : public ISocket { +public: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + explicit ZMQPublisherSubscriber(ZMQCombinedSocketsConfiguration configuration); + + /** + * @brief Close the sockets by calling ZMQPublisherSubscriber::close() + */ + ~ZMQPublisherSubscriber() override; + + /** + * @brief Open the internal ZMQ Publisher and Subscriber sockets for communication + * @throws SocketConfigurationException if opening fails + */ + void open() override; + + /** + * @brief Receive bytes from the internal ZMQ Subscriber socket + * @param buffer The buffer to fill with the received bytes + * @return True if bytes were received, false otherwise + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @brief Send bytes with the internal ZMQ Publisher socket + * @param buffer The buffer with the bytes to send + * @return True if bytes were sent, false otherwise + */ + bool send_bytes(const std::string& buffer) override; + + /** + * @brief Close the sockets + */ + void close() override; + +private: + std::shared_ptr pub_; ///< ZMQ publisher + std::shared_ptr sub_; ///< ZMQ subscriber +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp new file mode 100644 index 0000000..433dcb2 --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSocket.hpp @@ -0,0 +1,62 @@ +#pragma once + +#include + +#include "communication_interfaces/sockets/ISocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @struct ZMQSocketConfiguration + * @brief Configuration parameters for a ZMQ socket + */ +struct ZMQSocketConfiguration { + std::shared_ptr context; + std::string ip_address; + std::string port; + bool bind = true; + bool wait = false; +}; + +/** + * @class ZMQSocket + * @brief Abstract class to define a generic ZMQ socket + */ +class ZMQSocket : public ISocket { +public: + /** + * @brief Close the socket by calling ZMQSocket::close() + */ + ~ZMQSocket() override; + + /** + * @brief Close the socket + */ + void close() override; + + /** + * @copydoc ISocket::receive_bytes(std::string&) + */ + bool receive_bytes(std::string& buffer) override; + + /** + * @copydoc ISocket::send_bytes(const std::string&) + */ + bool send_bytes(const std::string& buffer) override; + +protected: + /** + * @brief Constructor taking the configuration struct + * @param The configuration struct + */ + ZMQSocket(ZMQSocketConfiguration configuration); + + /** + * @brief Bind or connect the socket on the desired IP/port + */ + void open_socket(); + + ZMQSocketConfiguration config_; ///< Socket configuration struct + std::shared_ptr socket_; ///< ZMQ socket +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp new file mode 100644 index 0000000..fd4a62c --- /dev/null +++ b/source/communication_interfaces/include/communication_interfaces/sockets/ZMQSubscriber.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +namespace communication_interfaces::sockets { + +/** + * @class ZMQSubscriber + * @brief Class to define a ZMQ subscriber + */ +class ZMQSubscriber : public ZMQSocket { +public: + /** + * @copydoc ZMQSocket::ZMQSocket(ZMQSocketConfiguration) + */ + explicit ZMQSubscriber(ZMQSocketConfiguration configuration); + + /** + * @copydoc ISocket::open() + */ + void open() override; + + /** + * @brief This method throws a runtime error as sending is not available for a ZMQ publisher + */ + bool send_bytes(const std::string& buffer) override; +}; +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPClient.cpp b/source/communication_interfaces/src/sockets/TCPClient.cpp new file mode 100644 index 0000000..a8609d9 --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPClient.cpp @@ -0,0 +1,32 @@ +#include "communication_interfaces/sockets/TCPClient.hpp" + +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPClient::TCPClient(TCPClientConfiguration configuration) : TCPSocket(configuration.buffer_size), config_(configuration) {} + +void TCPClient::open() { + try { + bzero((char*) &this->server_address_, sizeof(this->server_address_)); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_port = htons(this->config_.port); + if (inet_pton(AF_INET, this->config_.ip_address.c_str(), &this->server_address_.sin_addr) <= 0) { + throw std::invalid_argument("IP Address not supported"); + } + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + this->socket_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (this->socket_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + + if (::connect(this->socket_fd_, (sockaddr*) &this->server_address_, sizeof(this->server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Connecting client failed"); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPServer.cpp b/source/communication_interfaces/src/sockets/TCPServer.cpp new file mode 100644 index 0000000..6acdf6d --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPServer.cpp @@ -0,0 +1,58 @@ +#include "communication_interfaces/sockets/TCPServer.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPServer::TCPServer(TCPServerConfiguration configuration) : + TCPSocket(configuration.buffer_size), config_(configuration), server_fd_() { +} + +TCPServer::~TCPServer() { + TCPServer::close(); +} + +void TCPServer::open() { + try { + bzero((char*) &this->server_address_, sizeof(this->server_address_)); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_addr.s_addr = htonl(INADDR_ANY); + this->server_address_.sin_port = htons(this->config_.port); + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + // open stream oriented socket with internet address + this->server_fd_ = socket(AF_INET, SOCK_STREAM, 0); + if (this->server_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + if (this->config_.enable_reuse) { + const int opt_reuse = 1; + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); + } + } + if (bind(this->server_fd_, (sockaddr*) &(this->server_address_), sizeof(this->server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Binding socket failed"); + } + // listen for only 1 request at a time + listen(this->server_fd_, 1); + // receive a request from client using accept + sockaddr_in new_socket_address{}; + socklen_t new_addr_len = sizeof(new_socket_address); + // accept, create a new socket descriptor to handle the new connection with client + this->socket_fd_ = accept(this->server_fd_, (sockaddr*) &new_socket_address, &new_addr_len); + if (this->socket_fd_ < 0) { + throw exceptions::SocketConfigurationException("Connecting server failed"); + } +} + +void TCPServer::close() { + ::close(this->server_fd_); + TCPSocket::close(); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/TCPSocket.cpp b/source/communication_interfaces/src/sockets/TCPSocket.cpp new file mode 100644 index 0000000..6668b0a --- /dev/null +++ b/source/communication_interfaces/src/sockets/TCPSocket.cpp @@ -0,0 +1,38 @@ +#include "communication_interfaces/sockets/TCPSocket.hpp" + +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +TCPSocket::TCPSocket(int buffer_size) : server_address_(), socket_fd_(), buffer_size_(buffer_size) { + if (buffer_size <= 0) { + throw exceptions::SocketConfigurationException("Configuration parameter 'buffer_size' has to be greater than 0."); + } +} + +TCPSocket::~TCPSocket() { + TCPSocket::close(); +} + +bool TCPSocket::receive_bytes(std::string& buffer) { + std::vector local_buffer(this->buffer_size_); + auto receive_length = recv(this->socket_fd_, local_buffer.data(), this->buffer_size_, 0); + if (receive_length < 0) { + return false; + } + buffer.assign(local_buffer.data(), local_buffer.size()); + return true; +} + +bool TCPSocket::send_bytes(const std::string& buffer) { + int send_length = send(this->socket_fd_, buffer.data(), buffer.size(), 0); + return send_length == static_cast(buffer.size()); +} + +void TCPSocket::close() { + ::close(this->socket_fd_); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPClient.cpp b/source/communication_interfaces/src/sockets/UDPClient.cpp new file mode 100644 index 0000000..52e3da4 --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPClient.cpp @@ -0,0 +1,18 @@ +#include "communication_interfaces/sockets/UDPClient.hpp" + +namespace communication_interfaces::sockets { + +UDPClient::UDPClient(UDPSocketConfiguration configuration) : UDPSocket(std::move(configuration)) {} + +void UDPClient::open() { + this->open_socket(false); +} + +bool UDPClient::receive_bytes(std::string& buffer) { + return this->recvfrom(this->server_address_, buffer); +} + +bool UDPClient::send_bytes(const std::string& buffer) { + return this->sendto(this->server_address_, buffer); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPServer.cpp b/source/communication_interfaces/src/sockets/UDPServer.cpp new file mode 100644 index 0000000..d392b57 --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPServer.cpp @@ -0,0 +1,18 @@ +#include "communication_interfaces/sockets/UDPServer.hpp" + +namespace communication_interfaces::sockets { + +UDPServer::UDPServer(UDPSocketConfiguration configuration) : UDPSocket(std::move(configuration)) {} + +void UDPServer::open() { + this->open_socket(true); +} + +bool UDPServer::receive_bytes(std::string& buffer) { + return this->recvfrom(this->client_address_, buffer); +} + +bool UDPServer::send_bytes(const std::string& buffer) { + return this->sendto(this->client_address_, buffer); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/UDPSocket.cpp b/source/communication_interfaces/src/sockets/UDPSocket.cpp new file mode 100644 index 0000000..7ee0d88 --- /dev/null +++ b/source/communication_interfaces/src/sockets/UDPSocket.cpp @@ -0,0 +1,83 @@ +#include "communication_interfaces/sockets/UDPSocket.hpp" + +#include +#include +#include +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +UDPSocket::UDPSocket(UDPSocketConfiguration configuration) : + server_address_(), config_(std::move(configuration)), server_fd_(), addr_len_() { + if (this->config_.buffer_size <= 0) { + throw exceptions::SocketConfigurationException("Configuration parameter 'buffer_size' has to be greater than 0."); + } +} + +UDPSocket::~UDPSocket() { + UDPSocket::close(); +} + +void UDPSocket::open_socket(bool bind_socket) { + try { + this->addr_len_ = sizeof(this->server_address_); + this->server_address_.sin_family = AF_INET; + this->server_address_.sin_addr.s_addr = inet_addr(this->config_.ip_address.c_str()); + this->server_address_.sin_port = htons(this->config_.port); + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } + + this->server_fd_ = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (this->server_fd_ < 0) { + throw exceptions::SocketConfigurationException("Opening socket failed"); + } + if (this->config_.enable_reuse) { + const int opt_reuse = 1; + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt_reuse, sizeof(opt_reuse)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket option (enable reuse) failed"); + } + } + if (bind_socket) { + if (bind(this->server_fd_, (sockaddr*) &(this->server_address_), sizeof(server_address_)) != 0) { + throw exceptions::SocketConfigurationException("Binding socket failed."); + } + } + + if (this->config_.timeout_duration_sec > 0.0 + && this->config_.timeout_duration_sec < std::numeric_limits::max()) { + timeval timeout{}; + auto secs = std::floor(this->config_.timeout_duration_sec); + timeout.tv_sec = static_cast(secs); + timeout.tv_usec = static_cast((this->config_.timeout_duration_sec - secs) * 1e6); + if (setsockopt(this->server_fd_, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) != 0) { + throw exceptions::SocketConfigurationException("Setting socket timeout failed."); + } + } +} + +bool UDPSocket::recvfrom(sockaddr_in& address, std::string& buffer) { + std::vector local_buffer(this->config_.buffer_size); + auto receive_length = ::recvfrom( + this->server_fd_, local_buffer.data(), this->config_.buffer_size, 0, (sockaddr*) &(address), &(this->addr_len_)); + if (receive_length < 0) { + return false; + } + buffer.assign(local_buffer.data(), local_buffer.size()); + return true; +} + +bool UDPSocket::sendto(const sockaddr_in& address, const std::string& buffer) const { + int send_length = ::sendto( + this->server_fd_, buffer.data(), buffer.size(), 0, (sockaddr*) &(address), this->addr_len_); + return send_length == static_cast(buffer.size()); +} + +void UDPSocket::close() { + if (this->server_fd_ >= 0) { + ::close(this->server_fd_); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQPublisher.cpp b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp new file mode 100644 index 0000000..48c9486 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQPublisher.cpp @@ -0,0 +1,15 @@ +#include "communication_interfaces/sockets/ZMQPublisher.hpp" + +namespace communication_interfaces::sockets { + +ZMQPublisher::ZMQPublisher(ZMQSocketConfiguration configuration) : ZMQSocket(std::move(configuration)) {} + +void ZMQPublisher::open() { + this->socket_ = std::make_shared(*this->config_.context, ZMQ_PUB); + this->open_socket(); +} + +bool ZMQPublisher::receive_bytes(std::string&) { + throw std::runtime_error("Receive not available for socket of type ZMQPublisher"); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp new file mode 100644 index 0000000..cdbf5f6 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQPublisherSubscriber.cpp @@ -0,0 +1,41 @@ +#include "communication_interfaces/sockets/ZMQPublisherSubscriber.hpp" + +namespace communication_interfaces::sockets { + +ZMQPublisherSubscriber::ZMQPublisherSubscriber(ZMQCombinedSocketsConfiguration configuration) { + this->pub_ = std::make_shared( + ZMQSocketConfiguration( + { + configuration.context, configuration.ip_address, configuration.publisher_port, + configuration.bind_publisher, configuration.wait + })); + this->sub_ = std::make_shared( + ZMQSocketConfiguration( + { + configuration.context, configuration.ip_address, configuration.subscriber_port, + configuration.bind_subscriber, configuration.wait + })); +} + +ZMQPublisherSubscriber::~ZMQPublisherSubscriber() { + ZMQPublisherSubscriber::close(); +} + +void ZMQPublisherSubscriber::open() { + this->pub_->open(); + this->sub_->open(); +} + +bool ZMQPublisherSubscriber::receive_bytes(std::string& buffer) { + return this->sub_->receive_bytes(buffer); +} + +bool ZMQPublisherSubscriber::send_bytes(const std::string& buffer) { + return this->pub_->send_bytes(buffer); +} + +void ZMQPublisherSubscriber::close() { + this->pub_->close(); + this->sub_->close(); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSocket.cpp b/source/communication_interfaces/src/sockets/ZMQSocket.cpp new file mode 100644 index 0000000..85f2c8d --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQSocket.cpp @@ -0,0 +1,60 @@ +#include "communication_interfaces/sockets/ZMQSocket.hpp" + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" + +namespace communication_interfaces::sockets { + +ZMQSocket::ZMQSocket(ZMQSocketConfiguration configuration) : config_(std::move(configuration)) {} + +ZMQSocket::~ZMQSocket() { + ZMQSocket::close(); +} + +void ZMQSocket::open_socket() { + try { + auto address = "tcp://" + this->config_.ip_address + ":" + this->config_.port; + if (this->config_.bind) { + this->socket_->bind(address); + } else { + this->socket_->connect(address); + } + } catch (const std::exception& ex) { + throw exceptions::SocketConfigurationException("Socket configuration failed: " + std::string(ex.what())); + } +} + +bool ZMQSocket::receive_bytes(std::string& buffer) { + zmq::recv_flags recv_flag = this->config_.wait ? zmq::recv_flags::none : zmq::recv_flags::dontwait; + zmq::message_t message; + try { + auto received = this->socket_->recv(message, recv_flag); + if (received.has_value()) { + buffer = std::string(static_cast(message.data()), message.size()); + } + return received.has_value(); + } catch (const zmq::error_t&) { + return false; + } +} + +bool ZMQSocket::send_bytes(const std::string& buffer) { + zmq::send_flags send_flags = this->config_.wait ? zmq::send_flags::none : zmq::send_flags::dontwait; + zmq::message_t msg(buffer.size()); + memcpy(msg.data(), buffer.data(), buffer.size()); + try { + auto sent = this->socket_->send(msg, send_flags); + if (!sent.has_value()) { + return false; + } + return *sent == buffer.size(); + } catch (const zmq::error_t&) { + return false; + } +} + +void ZMQSocket::close() { + if (this->socket_ != nullptr && this->socket_->connected()) { + this->socket_->close(); + } +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp new file mode 100644 index 0000000..e9a8d94 --- /dev/null +++ b/source/communication_interfaces/src/sockets/ZMQSubscriber.cpp @@ -0,0 +1,17 @@ +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" + +namespace communication_interfaces::sockets { + +ZMQSubscriber::ZMQSubscriber(ZMQSocketConfiguration configuration) : ZMQSocket(std::move(configuration)) {} + +void ZMQSubscriber::open() { + this->socket_ = std::make_shared(*this->config_.context, ZMQ_SUB); + this->open_socket(); + this->socket_->set(zmq::sockopt::conflate, 1); + this->socket_->set(zmq::sockopt::subscribe, ""); +} + +bool ZMQSubscriber::send_bytes(const std::string&) { + throw std::runtime_error("Send not available for socket of type ZMQSubscriber"); +} +} // namespace communication_interfaces::sockets diff --git a/source/communication_interfaces/test/test_communication_interfaces.cpp b/source/communication_interfaces/test/test_communication_interfaces.cpp new file mode 100644 index 0000000..697a9d7 --- /dev/null +++ b/source/communication_interfaces/test/test_communication_interfaces.cpp @@ -0,0 +1,6 @@ +#include + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/source/communication_interfaces/test/tests/test_tcp_communication.cpp b/source/communication_interfaces/test/tests/test_tcp_communication.cpp new file mode 100644 index 0000000..b3f21c2 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_tcp_communication.cpp @@ -0,0 +1,54 @@ +#include +#include + +#include "communication_interfaces/sockets/TCPClient.hpp" +#include "communication_interfaces/sockets/TCPServer.hpp" + +using namespace communication_interfaces; + +class TestTCPSockets : public ::testing::Test { +public: + TestTCPSockets() { + server_ = std::make_shared(sockets::TCPServerConfiguration{6000, 50, true}); + client_ = std::make_shared(sockets::TCPClientConfiguration{"127.0.0.1", 6000, 50}); + } + + std::thread start_server() { + return std::thread([this] { this->serve(); }); + } + + void serve() const { + server_->open(); + std::string recv_message; + EXPECT_TRUE(server_->receive_bytes(recv_message)); + EXPECT_STREQ(recv_message.c_str(), client_message_.c_str()); + EXPECT_TRUE(server_->send_bytes(server_message_)); + } + + std::thread start_client() { + return std::thread([this] { this->client(); }); + } + + void client() const { + client_->open(); + EXPECT_TRUE(client_->send_bytes(client_message_)); + std::string recv_message; + EXPECT_TRUE(client_->receive_bytes(recv_message)); + EXPECT_STREQ(recv_message.c_str(), server_message_.c_str()); + } + + std::string client_message_ = "Hello server"; + std::string server_message_ = "Hello client"; + + std::shared_ptr client_; + std::shared_ptr server_; +}; + +TEST_F(TestTCPSockets, TestCommunication) { + std::thread server = start_server(); + usleep(100000); + std::thread client = start_client(); + + client.join(); + server.join(); +} diff --git a/source/communication_interfaces/test/tests/test_udp_communication.cpp b/source/communication_interfaces/test/tests/test_udp_communication.cpp new file mode 100644 index 0000000..98aabe0 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_udp_communication.cpp @@ -0,0 +1,74 @@ +#include + +#include "communication_interfaces/exceptions/SocketConfigurationException.hpp" +#include "communication_interfaces/sockets/UDPClient.hpp" +#include "communication_interfaces/sockets/UDPServer.hpp" + +using namespace communication_interfaces; + +class TestUDPSockets : public ::testing::Test { +public: + TestUDPSockets() { + config_ = {"127.0.0.1", 5000, 100}; + } + + sockets::UDPSocketConfiguration config_; +}; + +TEST_F(TestUDPSockets, SendReceive) { + const std::string send_string = "Hello world!"; + std::string receive_string; + + // Create server socket and bind it to a port + sockets::UDPServer server(this->config_); + ASSERT_NO_THROW(server.open()); + + // Create client socket + sockets::UDPClient client(this->config_); + ASSERT_NO_THROW(client.open()); + + // Send test message from client to server + ASSERT_TRUE(client.send_bytes(send_string)); + + // Receive message on server + ASSERT_TRUE(server.receive_bytes(receive_string)); + + // Convert received message to string and compare with sent message + EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); +} + +TEST_F(TestUDPSockets, Timeout) { + // Create server socket and bind it to a port + this->config_.timeout_duration_sec = 5.0; + sockets::UDPServer server(this->config_); + + // Try to receive a message from client, but expect timeout + std::string received_bytes; + EXPECT_FALSE(server.receive_bytes(received_bytes)); +} + +TEST_F(TestUDPSockets, PortReuse) { + // Create server socket and bind it to a port + sockets::UDPServer server1(this->config_); + server1.open(); + + // Try to create a second server socket and bind it to the same port (expect failure) + sockets::UDPServer server2(this->config_); + EXPECT_THROW(server2.open(), exceptions::SocketConfigurationException); +} + +TEST_F(TestUDPSockets, OpenClose) { + // Create and open server socket + sockets::UDPServer server(this->config_); + server.open(); + + // Close server socket + server.close(); + + // Create and open client socket + sockets::UDPClient client(this->config_); + client.open(); + + // Try to send a message from the closed server socket (expect failure) + EXPECT_FALSE(server.send_bytes(std::string())); +} diff --git a/source/communication_interfaces/test/tests/test_zmq_communication.cpp b/source/communication_interfaces/test/tests/test_zmq_communication.cpp new file mode 100644 index 0000000..a939689 --- /dev/null +++ b/source/communication_interfaces/test/tests/test_zmq_communication.cpp @@ -0,0 +1,70 @@ +#include +#include + +#include "communication_interfaces/sockets/ZMQPublisher.hpp" +#include "communication_interfaces/sockets/ZMQSubscriber.hpp" +#include "communication_interfaces/sockets/ZMQPublisherSubscriber.hpp" + +using namespace communication_interfaces; +using namespace std::chrono_literals; + +class TestZMQSockets : public ::testing::Test { +public: + TestZMQSockets() { + auto context = std::make_shared(1); + config_ = {context, "127.0.0.1", "4000"}; + } + + sockets::ZMQSocketConfiguration config_; +}; + +TEST_F(TestZMQSockets, SendReceive) { + const std::string send_string = "Hello world!"; + std::string receive_string; + + sockets::ZMQPublisher publisher(this->config_); + this->config_.bind = false; + sockets::ZMQSubscriber subscriber(this->config_); + + publisher.open(); + subscriber.open(); + + while (!subscriber.receive_bytes(receive_string)) { + EXPECT_TRUE(publisher.send_bytes(send_string)); + usleep(10000); + } + EXPECT_STREQ(receive_string.c_str(), send_string.c_str()); + publisher.close(); + subscriber.close(); +} + +TEST_F(TestZMQSockets, SendReceiveCombined) { + const std::string server_send_string = "Hello client!"; + const std::string client_send_string = "Hello server!"; + std::string server_receive_string, client_receive_string; + + sockets::ZMQCombinedSocketsConfiguration server_config = {config_.context, config_.ip_address, "5001", "5002"}; + sockets::ZMQPublisherSubscriber server(server_config); + + sockets::ZMQCombinedSocketsConfiguration + client_config = {config_.context, config_.ip_address, "5002", "5001", false, false}; + sockets::ZMQPublisherSubscriber client(client_config); + + server.open(); + client.open(); + + while (!client.receive_bytes(client_receive_string)) { + EXPECT_TRUE(server.send_bytes(server_send_string)); + usleep(10000); + } + EXPECT_STREQ(client_receive_string.c_str(), server_send_string.c_str()); + + while (!server.receive_bytes(server_receive_string)) { + EXPECT_TRUE(client.send_bytes(client_send_string)); + usleep(10000); + } + EXPECT_STREQ(server_receive_string.c_str(), client_send_string.c_str()); + + server.close(); + client.close(); +} diff --git a/source/dev-server.sh b/source/dev-server.sh new file mode 100755 index 0000000..47c0c15 --- /dev/null +++ b/source/dev-server.sh @@ -0,0 +1,33 @@ +#!/bin/bash +REMOTE_SSH_PORT=4420 + +IMAGE_NAME=aica-technology/communication-interfaces +IMAGE_STAGE=source-dependencies + +BUILD_FLAGS=() + +HELP_MESSAGE="Usage: ./dev-server.sh [-r] [-v] + +Build a Docker container for remote development. +Options: + -r, --rebuild Rebuild the image with no cache. + -v, --verbose Show all the output of the Docker + build process. + -h, --help Show this help message." + +while [ "$#" -gt 0 ]; do + case "$1" in + -r|--rebuild) BUILD_FLAGS+=(--no-cache); shift 1;; + -v|--verbose) BUILD_FLAGS+=(--progress=plain); shift 1;; + -h|--help) echo "${HELP_MESSAGE}"; exit 0;; + *) echo "Unknown option: $1" >&2; echo "${HELP_MESSAGE}"; exit 1;; + esac +done + +BUILD_FLAGS+=(-t "${IMAGE_NAME}:${IMAGE_STAGE}") +BUILD_FLAGS+=(--target "${IMAGE_STAGE}") + +docker pull ghcr.io/aica-technology/control-libraries/development-dependencies || exit 1 +DOCKER_BUILDKIT=1 docker build "${BUILD_FLAGS[@]}" . || exit 1 + +aica-docker server "${IMAGE_NAME}:${IMAGE_STAGE}" -u developer -p "${REMOTE_SSH_PORT}" diff --git a/source/install.sh b/source/install.sh new file mode 100644 index 0000000..8bc904c --- /dev/null +++ b/source/install.sh @@ -0,0 +1,80 @@ +#!/bin/bash +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + +INSTALL_DESTINATION="/usr/local" +AUTO_INSTALL="" +CPPZMQ_VERSION=4.7.1 + +HELP_MESSAGE="Usage: [sudo] ./install.sh [OPTIONS] +An install script for the communication_interfaces library. +Options: + -y, --auto Suppress any input prompts and + automatically approve install steps. + -d, --dir [path] Configure the installation directory + (default: ${INSTALL_DESTINATION}). + --build-tests Build the unittest targets. + --clean Delete any previously installed header + files from ${INSTALL_DESTINATION}/include and any + shared library files from ${INSTALL_DESTINATION}/lib. + --cleandir [path] Delete any previously installed header + and library files from the specified path. + -h, --help Show this help message." + +function install_cppzmq() { + apt-get update && apt-get install "${AUTO_INSTALL}" libzmq3-dev || exit 1 + + mkdir -p "${SCRIPT_DIR}"/install + cd "${SCRIPT_DIR}"/install || exit 1 + wget https://github.com/zeromq/cppzmq/archive/v${CPPZMQ_VERSION}.tar.gz -O cppzmq-${CPPZMQ_VERSION}.tar.gz && + tar -xzf cppzmq-${CPPZMQ_VERSION}.tar.gz && + rm cppzmq-${CPPZMQ_VERSION}.tar.gz + + cd "${SCRIPT_DIR}"/install/cppzmq-"${CPPZMQ_VERSION}" || exit 1 + mkdir build && cd build && cmake .. -DCPPZMQ_BUILD_TESTS=OFF && make -j install || exit 1 + ldconfig + + cd "${SCRIPT_DIR}" && rm -rf "${SCRIPT_DIR}"/install || exit 1 +} + +install_communication_interfaces () { + cd "${SCRIPT_DIR}"/communication_interfaces && mkdir -p build && cd build || exit 1 + cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX="${INSTALL_DESTINATION}" .. || exit 1 + + make -j && make install || exit 1 + ldconfig + + cd "${SCRIPT_DIR}" || exit 1 +} + +function uninstall() { + function delete_components() { + rm -r "${INSTALL_DESTINATION}"/include/communication_interfaces + rm -r "${INSTALL_DESTINATION}"/lib/libcommunication_interfaces*.so + } + + delete_components >/dev/null 2>&1 + + echo "Deleted any communication_interfaces artefacts from ${INSTALL_DESTINATION}." +} + +while [ "$#" -gt 0 ]; do + case "$1" in + -y|--auto) AUTO_INSTALL="-y"; shift 1;; + -d|--dir) INSTALL_DESTINATION=$2; shift 2;; + --build-tests) BUILD_TESTING="ON"; shift 1;; + --clean) uninstall; exit 0;; + --cleandir) INSTALL_DESTINATION=$2; uninstall; exit 0;; + -h|--help) echo "${HELP_MESSAGE}"; exit 0;; + -*) echo "Unknown option: $1" >&2 + echo "${FAIL_MESSAGE}" + exit 1;; + esac +done + +echo ">>> INSTALLING ZMQ DEPENDENCIES" +install_cppzmq || exit 1 + +echo ">>> INSTALLING COMMUNICATION INTERFACES" +install_communication_interfaces || exit 1 + +echo ">>> DONE"