diff --git a/examples/routing_service/udp_socket_adapter/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt similarity index 91% rename from examples/routing_service/udp_socket_adapter/CMakeLists.txt rename to examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt index 4181e5ae9..0fc9b611b 100644 --- a/examples/routing_service/udp_socket_adapter/CMakeLists.txt +++ b/examples/routing_service/udp_socket_adapter_dynamic/CMakeLists.txt @@ -1,5 +1,5 @@ # -# (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. # # RTI grants Licensee a license to use, modify, compile, and create derivative # works of the Software. Licensee has the right to distribute object form @@ -37,6 +37,8 @@ add_library(${PROJECT_NAME} "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx" "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp" "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx" "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp" ) diff --git a/examples/routing_service/udp_socket_adapter_dynamic/README.md b/examples/routing_service/udp_socket_adapter_dynamic/README.md new file mode 100644 index 000000000..9ab2ece60 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/README.md @@ -0,0 +1,213 @@ +# Example Code: Routing Service C++11 Socket Adapter using Dynamic Data + +## Example Description + +This example shows how to implement a simple Routing Service Adapter plugin +in C++11 to receive data from a UDP socket using RTI Routing Service. + +This examples uses dynamic data API and there is no need to know the data type +information beforehand. + +The code in this directory provides the following components: + +- `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing +Service*. It responsible for creating and deleting connections. +- `src/SocketConnection` implements a connection. This component is +responsible for the creation and deletion of `StreamReaders`. +- `src/SocketInputDiscoveryStreamReader` implements the logic necessary to +propagate information about the discovered input streams (in this case +sockets) to the Routing Service. +- `src/SocketStreamReader` implements a `StreamReader` that reads sample +information from a UDP socket. +- `src/SocketStreamWriter` implements a `StreamWriter` that sends sample +information to a UDP socket. + + +For more details, please refer to the *RTI Routing Service SDK* documentation. + +## Building the C++ example + +In order to build this example, you need to define the variables +`CONNEXTDDS_DIR` and `CONNEXTDDS_ARCH`. You can do so by exporting them +manually, by sourcing the `rtisetenv` script for your architecture, or by +passing them to the `cmake` command as arguments: + +```bash +mkdir build +cd build +cmake -DCONNEXTDDS_DIR= \ # If not exported + -DCONNEXTDDS_ARCH= \ # If not exported + -DBUILD_SHARED_LIBS=ON|OFF \ # ON is preferred + -DCMAKE_BUILD_TYPE=Debug|Release .. +cmake --build . +cd .. +``` + +Example command for Windows: + +```bash +cmake .. -DCONNEXTDDS_DIR="%NDDSHOME%" -DCONNEXTDDS_ARCH=x64Win64VS2015 -DBUILD_SHARED_LIBS=ON -DCMAKE_BUILD_TYPE=Release -A x64 -G "Visual Studio 17 2022" +cd .. +``` + +**Note**: You do not need to define `CONNEXTDDS_ARCH` if you only have one +architecture target installed in your system. + +**Note**: When compiling on a Windows 64-bit machine you will need to add the +`-A x64` parameter to the call to CMake. + +**Note:** If you are using a multi-configuration generator, such as Visual +Studio Solutions, you can specify the configuration mode to build as follows: + +```bash +cmake --build . --config Release|Debug +``` + +Here is more information about generating +[Visual Studio Solutions for Windows using CMake](https://cmake.org/cmake/help/v3.16/generator/Visual%20Studio%2016%202019.html#platform-selection). + +**Note:** `BUILD_SHARED_LIBS` allows you to control if the generated library +for this example is a static or a dynamic shared library. The following +sections assume you are building a dynamic shared library. However, Routing +Service also supports static linking of adapters. To use this functionality +you would need to create an application that uses Routing Service as a library +component and statically links to this `SocketAdapter` library. + +### Cross-compilation + +When you need to cross-compile the example, the above +command will not work, the assigned compiler won't be the cross-compiler and +errors may happen when linking against the cross-compiled Connext binaries. +To fix this, you have to create a file with the architecture name and call +CMake with a specific flag called ``-DCMAKE_TOOLCHAIN_FILE``. +An example of the file to create with the toolchain settings (e.g. for an +ARM architectures): + +```cmake +set(CMAKE_SYSTEM_NAME Linux) +set(toolchain_path "/arm-bcm2708/gcc-linaro-arm-linux-gnueabihf-raspbian") +set(CMAKE_C_COMPILER "${toolchain_path}/bin/arm-linux-gnueabihf-gcc") +set(CMAKE_CXX_COMPILER "${toolchain_path}/bin/arm-linux-gnueabihf-g++") +``` + +Then you can call CMake like this: + +```bash +cmake -DCONNEXTDDS_DIR= -DCMAKE_TOOLCHAIN_FILE= + -DCONNEXTDDS_ARCH= .. +``` + +## Running the C++ example + +To run the example, you just need to run the following commands from the top +level folder. This example has been written to allow easy experimentation with +the RTI DDSPing tool shipped with *RTI Connext DDS* installer bundle. If you wish +to create a real Routing Service adapter, you should modify the code and XML accordingly. + +There are 2 configurations (`-cfgName`) in the Routing Service XML file: + +- **SocketAdapterToDDS** - It reads data from a UDP socket using the +SocketAdapter and outputs it to DDS. You can visualize the ouptut by running: + +- **DDSToSocketAdapter** - It sends data from DDS to a UDP socket. You can +publish DDS data by running command: + + +To run Routing Service, you will need first to set up your environment as +follows. + +Before running the RTI Routing Service, you need to specify where the +`SocketAdapterCpp` library is located as shown below: + +Linux: + +```bash +$export RTI_LD_LIBRARY_PATH=$NDDSHOME/lib/: +``` + +Windows: + +```bash +set PATH=%NDDSHOME%/lib/; +``` + +The SocketAdapterCpp library will be in the `./build` folder. + +```bash +# From the build/ directory +$NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +``` + +Here is an output from a sample run: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with name SocketAdapterToSocketAdapter) +``` + +Now you'll need to send data to the UDP sockets. By default, DDS Ping data is +expected on `127.0.0.1:10203`. You can change both the expected type and topic name +and the UDP socket configuration on `RsSocketAdapter.xml`. + +To run a simple test, run in different terminals: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + + +```bash + $NDDSHOME/bin/rtiddsping -publisher -domainId 0 +``` + +## Running a data-diode example + +You can configure a data-diode scenario by using two Routing Services instances; +- One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket +- The other using **SocketAdapterToDDS** configuration to convert back to DDS samples +``` + ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ + │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ + │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ + │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ + └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ +``` +To run this example in a local machine: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with configuration=SocketAdapterToDDS) +``` +And in a different terminal: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_dynamic/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +Using the default configuration from RsSocketAdapter.xml, you need to publish DDS Ping data +on domain id 0 and subscribe to DSS Ping data on domain id 1: + +```bash + $NDDSHOME/bin/rtiddsping -publisher -domainId 0 +``` + +```bash + $NDDSHOME/bin/rtiddsping -subscriber -domainId 1 +``` + +## Requirements + +To run this example you will need: + +- RTI Connext Professional version 6.0.0 or higher. +- CMake version 3.10 or higher. +- A target platform with support for RTI Routing Service and C++11. +- Python3. diff --git a/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml new file mode 100644 index 000000000..5a693012f --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/RsSocketAdapter.xml @@ -0,0 +1,105 @@ + + + + + + SocketAdapterCpp + SocketAdapter_create_adapter_plugin + + + + + + + + + + 1 + + + + + + IMMEDIATE + PingType + PingStream + + + + + + receive_address + 127.0.0.1 + + + + receive_port + 10203 + + + + + + ON_DOMAIN_MATCH + PingType + PingTopic + + + + + + + + + + + + + + 0 + + + + + + + ON_ROUTE_MATCH + PingType + PingStream + + + + send_address + 127.0.0.1 + + + send_port + 0 + + + dest_address + 127.0.0.1 + + + dest_port + 10203 + + + + + + ON_DOMAIN_MATCH + PingType + PingTopic + + + + + + + + \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx similarity index 95% rename from examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx index a649473f0..a73226590 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp similarity index 96% rename from examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp index aec7eee95..0042adae7 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketAdapter.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketAdapter.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx new file mode 100644 index 000000000..2b7eb7434 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.cxx @@ -0,0 +1,75 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketConnection.hpp" +#include "SocketStreamReader.hpp" +#include "SocketStreamWriter.hpp" +#include "SocketStreamWriter.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketConnection::SocketConnection( + StreamReaderListener *input_stream_discovery_listener, + StreamReaderListener *output_stream_discovery_listener, + const PropertySet &properties) + : input_discovery_reader_( + properties, + input_stream_discovery_listener) {}; + +StreamReader *SocketConnection::create_stream_reader( + Session *session, + const StreamInfo &info, + const PropertySet &properties, + StreamReaderListener *listener) +{ + return new SocketStreamReader(this, info, properties, listener); +} + +StreamWriter *SocketConnection::create_stream_writer( + Session *session, + const StreamInfo &info, + const PropertySet &properties) + { + return new SocketStreamWriter(this, info, properties); +} + +void SocketConnection::delete_stream_reader(StreamReader *reader) +{ + SocketStreamReader *socket_reader = + dynamic_cast(reader); + socket_reader->shutdown_socket_reader_thread(); + delete reader; +} + +void SocketConnection::delete_stream_writer(StreamWriter *writer) +{ + SocketStreamWriter *socket_writer = + dynamic_cast(writer); + delete writer; +} + +DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader() +{ + return &input_discovery_reader_; +} + +DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader() +{ + return nullptr; +} + +void SocketConnection::dispose_discovery_stream( + const rti::routing::StreamInfo &stream_info) +{ + input_discovery_reader_.dispose(stream_info); +} diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp similarity index 81% rename from examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp index c79ccb00a..ae49f6c0c 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketConnection.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketConnection.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -37,13 +37,23 @@ class SocketConnection : public rti::routing::adapter::Connection { const rti::routing::PropertySet &properties, rti::routing::adapter::StreamReaderListener *listener) final; - // This function will also stop the receiving socket thread + rti::routing::adapter::StreamWriter *create_stream_writer( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties) final; + void delete_stream_reader( rti::routing::adapter::StreamReader *reader) final; + void delete_stream_writer( + rti::routing::adapter::StreamWriter *writer) final; + rti::routing::adapter::DiscoveryStreamReader * input_stream_discovery_reader() final; + rti::routing::adapter::DiscoveryStreamReader * + output_stream_discovery_reader() final; + /** * @brief This function is called by the SocketStreamReader to indicate * that it's time to dispose the route. The dispose set by the diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx new file mode 100644 index 000000000..a55323e11 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.cxx @@ -0,0 +1,88 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketInputDiscoveryStreamReader.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketInputDiscoveryStreamReader::SocketInputDiscoveryStreamReader( + const PropertySet &, + StreamReaderListener *input_stream_discovery_listener) +{ + input_stream_discovery_listener_ = input_stream_discovery_listener; +} + +void SocketInputDiscoveryStreamReader::dispose( + const rti::routing::StreamInfo &stream_info) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + + std::unique_ptr stream_info_disposed( + new StreamInfo( + stream_info.stream_name(), + stream_info.type_info().type_name())); + stream_info_disposed.get()->disposed(true); + + this->data_samples_.push_back(std::move(stream_info_disposed)); + input_stream_discovery_listener_->on_data_available(this); +} + +void SocketInputDiscoveryStreamReader::take( + std::vector &stream) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + std::transform( + data_samples_.begin(), + data_samples_.end(), + std::back_inserter(stream), + [](const std::unique_ptr &element) { + return element.get(); + }); +} + +void SocketInputDiscoveryStreamReader::return_loan( + std::vector &stream) +{ + /** + * This guard is essential since the take() and return_loan() operations + * triggered by calling on_data_available() execute on an internal Routing + * Service thread. The custom dispose() operation doesn't run on that + * thread. Since the take() and return_loan() operations also need to access + * the data_samples_ list this protection is required. + */ + std::lock_guard guard(data_samples_mutex_); + + /** + * For discovery streams there will never be any outstanding return_loan(). + * Thus we can be sure that each take() will be followed by a call to + * return_loan(), before the next take() executes. + */ + this->data_samples_.erase( + data_samples_.begin(), + data_samples_.begin() + stream.size()); + stream.clear(); +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp new file mode 100644 index 000000000..7f2bdb373 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketInputDiscoveryStreamReader.hpp @@ -0,0 +1,61 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETDISCOVERYSTREAMREADER_HPP +#define SOCKETDISCOVERYSTREAMREADER_HPP + +#include +#include + +#include +#include + +/** + * This class implements a DiscoveryStreamReader, a special kind of StreamReader + * that provide discovery information about the available streams and their + * types. + */ + +class SocketInputDiscoveryStreamReader + : public rti::routing::adapter::DiscoveryStreamReader { +public: + SocketInputDiscoveryStreamReader( + const rti::routing::PropertySet &, + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener); + + void take(std::vector &) final; + + void return_loan(std::vector &) final; + + /** + * @brief Custom operation defined to indicate disposing off an + * when the SocketStreamReader has finished reading from the socket. + * The SocketInputDiscoveryStreamReader will then create a new + * discovery sample indicating that the stream has been disposed. + * This will cause the Routing Service to start tearing down the Routes + * associated with having the corresponding + * and . + * + * @param stream_info \b in. Reference to a StreamInfo object which should + * be used when creating a new StreamInfo sample with disposed set to true + */ + void dispose(const rti::routing::StreamInfo &stream_info); + +private: + std::mutex data_samples_mutex_; + std::vector> data_samples_; + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx new file mode 100644 index 000000000..b040e958b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.cxx @@ -0,0 +1,148 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif +#include "SocketStreamReader.hpp" +#include +#include +#include + + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +void SocketStreamReader::socket_reading_thread() +{ + while (!stop_thread_) { + int received_bytes = 0; + socket->receive_data( + received_buffer_, + &received_bytes, + BUFFER_MAX_SIZE); + + // Most likely received nothing or there was an error + // Not doing any error handling here + if (received_bytes <= 0) { + // Sleep for a small period of time to avoid busy waiting + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + { + std::lock_guard lock(buffer_mutex_); + received_buffers_.emplace(received_buffer_, received_buffer_ + received_bytes); + } + + reader_listener_->on_data_available(this); + } + + socket_connection_->dispose_discovery_stream(stream_info_); +} + +SocketStreamReader::SocketStreamReader( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties, + StreamReaderListener *listener) + : stop_thread_(false), + stream_info_(info.stream_name(), info.type_info().type_name()) +{ + socket_connection_ = connection; + reader_listener_ = listener; + adapter_type_ = static_cast(info.type_info().type_representation()); + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == RECEIVE_ADDRESS_STRING) { + receive_address_ = property.second; + } else if (property.first == RECEIVE_PORT_STRING) { + receive_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr( + new UdpSocket(receive_address_.c_str(), receive_port_)); + + socketreader_thread_ = + std::thread(&SocketStreamReader::socket_reading_thread, this); +} + +void SocketStreamReader::take( + std::vector &samples, + std::vector &infos) +{ + take_buffer_.clear(); + { + std::unique_lock lock(buffer_mutex_); + if (received_buffers_.empty()) { + // No data available + samples.clear(); + infos.clear(); + return; + } + take_buffer_ = std::move(received_buffers_.front()); + received_buffers_.pop(); + } + + dds::core::xtypes::DynamicData deserialized_sample(*adapter_type_); + rti::core::xtypes::from_cdr_buffer(deserialized_sample, take_buffer_); + + samples.resize(1); + infos.resize(1); + + std::unique_ptr sample(new DynamicData(*adapter_type_)); + *sample = deserialized_sample; + samples[0] = sample.release(); + + return; +} + +void SocketStreamReader::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +void SocketStreamReader::shutdown_socket_reader_thread() +{ + stop_thread_ = true; + socketreader_thread_.join(); +} + +SocketStreamReader::~SocketStreamReader() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp new file mode 100644 index 000000000..21517a9ff --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamReader.hpp @@ -0,0 +1,90 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMREADER_HPP +#define SOCKETSTREAMREADER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define RECEIVE_ADDRESS_STRING "receive_address" +#define RECEIVE_PORT_STRING "receive_port" + +/** + * @brief StreamReader implementation for UDP socket input in RTI Routing Service. + * + * SocketStreamReader is a specific implementation of rti::routing::adapter::DynamicDataStreamReader + * that receives data from a UDP socket and makes it available to RTI Routing Service as DynamicData samples. + * + * This class manages a background thread to continuously read UDP packets from a specified address and port, + * buffering received data for consumption by the Routing Service. It supports thread-safe queuing of incoming + * data, loaning and returning DynamicData samples, and clean shutdown of the reading thread. + * + */ + +class SocketStreamReader : public rti::routing::adapter::DynamicDataStreamReader { +public: + SocketStreamReader( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &, + rti::routing::adapter::StreamReaderListener *listener); + + void take( + std::vector &, + std::vector &) final; + + void return_loan( + std::vector &, + std::vector &) final; + + void shutdown_socket_reader_thread(); + + ~SocketStreamReader(); + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + void socket_reading_thread(); + + SocketConnection *socket_connection_; + rti::routing::adapter::StreamReaderListener *reader_listener_; + + std::unique_ptr socket; + + std::thread socketreader_thread_; + bool stop_thread_; + + std::ifstream input_socket_stream_; + std::string receive_address_; + int receive_port_; + char received_buffer_[BUFFER_MAX_SIZE]; + std::queue> received_buffers_; + std::mutex buffer_mutex_; + std::vector take_buffer_; + + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx new file mode 100644 index 000000000..8d08b0665 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.cxx @@ -0,0 +1,101 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include +#include +#include +#include "SocketStreamWriter.hpp" +#include "SocketStreamReader.hpp" //use ShapeType from here +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketStreamWriter::SocketStreamWriter( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties + ) + : stream_info_(info.stream_name(), info.type_info().type_name()) +{ + + socket_connection_ = connection; + + adapter_type_ = + static_cast(info.type_info().type_representation()); + + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == SEND_ADDRESS_STRING) { + send_address_ = property.second; + } + else if (property.first == SEND_PORT_STRING) { + send_port_ = std::stoi(property.second); + } + else if (property.first == DEST_ADDRESS_STRING) + { + dest_address_ = property.second; + } + else if (property.first == DEST_PORT_STRING) + { + dest_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr(new UdpSocket( + send_address_.c_str(), + send_port_)); +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos) +{ + size_t len = 0; + for (const auto sample : samples) { + serialization_buffer_.clear(); + rti::core::xtypes::to_cdr_buffer(serialization_buffer_, *sample); + // Send the serialized data + len = socket->send_data( + serialization_buffer_.data(), + serialization_buffer_.size(), + dest_address_.c_str(), + dest_port_); + } + + return len; +} + +SocketStreamWriter::~SocketStreamWriter() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp new file mode 100644 index 000000000..39fc32af8 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/SocketStreamWriter.hpp @@ -0,0 +1,76 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMWRITER_HPP +#define SOCKETSTREAMWRITER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define SEND_ADDRESS_STRING "send_address" +#define SEND_PORT_STRING "send_port" +#define DEST_ADDRESS_STRING "dest_address" +#define DEST_PORT_STRING "dest_port" + +/** + * @brief StreamWriter implementation for UDP socket output in RTI Routing Service. + * + * SocketStreamWriter is a specific implementation of rti::routing::adapter::DynamicDataStreamWriter + * that sends data to a UDP socket, making it available for external consumers outside DDS. + * + * This class is responsible for serializing DynamicData samples received from Routing Service + * and transmitting them as UDP packets to a specified destination address and port. + * It manages socket creation, serialization buffers, and the configuration of destination + * parameters via properties. + * + */ + +class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { +public: + explicit SocketStreamWriter( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet & + ); + + virtual int write( + const std::vector &, + const std::vector &) final; + + ~SocketStreamWriter(); + + +private: + + SocketConnection *socket_connection_; + std::vector serialization_buffer_; + std::unique_ptr socket; + + int send_port_; + int dest_port_; + + std::string send_address_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx similarity index 85% rename from examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx rename to examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx index 3c1bf9f61..8da702a3d 100644 --- a/examples/routing_service/udp_socket_adapter/src/UdpSocket.cxx +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -97,4 +97,15 @@ void UdpSocket::receive_data( &client_addr_len); return; +} + +int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort) +{ + sockaddr_in dest_addr; + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(destPort); + dest_addr.sin_addr.s_addr = inet_addr(destAddr); + + size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + return (int)length; } \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp similarity index 56% rename from examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp rename to examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp index bc0773f41..024429e1b 100644 --- a/examples/routing_service/udp_socket_adapter/src/UdpSocket.hpp +++ b/examples/routing_service/udp_socket_adapter_dynamic/src/UdpSocket.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -10,6 +10,9 @@ * use or inability to use the software. */ +#ifndef UDPSOCKETUTILS_HPP +#define UDPSOCKETUTILS_HPP + #ifdef _WIN32 #include #include @@ -28,6 +31,20 @@ #pragma comment(lib, "ws2_32.lib") #endif +/** + * @brief Utility class for UDP socket communication in the RTI Routing Service UDP Socket Adapter. + * + * UdpSocket provides a lightweight abstraction for UDP socket communication, + * supporting both Windows and POSIX systems. It is designed to be used by the Routing Service + * UDP socket adapter to send and receive raw UDP packets as part of data bridging between + * external UDP sources and DDS. + * + * The class handles socket creation, binding to a specified IP address and port, and + * ensures non-blocking operation for efficient integration with multi-threaded applications. + * It provides methods for receiving data from any UDP client and for sending data to a + * specified destination address and port. + */ + class UdpSocket { public: UdpSocket(const char* ip, int port); @@ -37,6 +54,12 @@ class UdpSocket { int* received_bytes, int size_of_original_buffer); + int send_data( + char* tx_buffer, + int tx_length, + const char* destAddr, + int destPort); + private: #ifdef _WIN32 SOCKET sockfd; @@ -48,3 +71,5 @@ class UdpSocket { void init_socket(); void bind_socket(const char* ip, int port); }; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt new file mode 100644 index 000000000..0fc9b611b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/CMakeLists.txt @@ -0,0 +1,58 @@ +# +# (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. +# +# RTI grants Licensee a license to use, modify, compile, and create derivative +# works of the Software. Licensee has the right to distribute object form +# only for use with RTI products. The Software is provided "as is", with no +# warranty of any type, including any warranty for fitness for any purpose. +# RTI is under no obligation to maintain or support the Software. RTI shall +# not be liable for any incidental or consequential damages arising out of the +# use or inability to use the software. +# +cmake_minimum_required(VERSION 3.11) +project(SocketAdapterCpp) + +# Find RTI Connext dependencies +list(APPEND CMAKE_MODULE_PATH + "${CMAKE_CURRENT_SOURCE_DIR}/../../../resources/cmake/Modules" +) +include(ConnextDdsConfigureCmakeUtils) +connextdds_configure_cmake_utils() + +find_package( + RTIConnextDDS "7.3.0" + REQUIRED + COMPONENTS + core + routing_service +) + +# It may not be necessary to include the hpp files +add_library(${PROJECT_NAME} + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketAdapter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketAdapter.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketConnection.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketConnection.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketInputDiscoveryStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamReader.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/SocketStreamWriter.hpp" + "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.cxx" + "${CMAKE_CURRENT_SOURCE_DIR}/src/UdpSocket.hpp" +) + +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD 11) +set_property(TARGET ${PROJECT_NAME} PROPERTY CXX_STANDARD_REQUIRED ON) + +target_link_libraries(${PROJECT_NAME} + RTIConnextDDS::routing_service_infrastructure + RTIConnextDDS::cpp2_api +) + +# To differentiate between debug and release builds +set_target_properties(${PROJECT_NAME} + PROPERTIES + DEBUG_POSTFIX "d" +) diff --git a/examples/routing_service/udp_socket_adapter/README.md b/examples/routing_service/udp_socket_adapter_typed/README.md similarity index 64% rename from examples/routing_service/udp_socket_adapter/README.md rename to examples/routing_service/udp_socket_adapter_typed/README.md index e69bfb04a..e84e8988a 100644 --- a/examples/routing_service/udp_socket_adapter/README.md +++ b/examples/routing_service/udp_socket_adapter_typed/README.md @@ -5,6 +5,8 @@ This example shows how to implement a simple Routing Service Adapter plugin in C++11 to receive data from a UDP socket using RTI Routing Service. +This example requires including a Types.xml file with the data type information. + The code in this directory provides the following components: - `src/SocketAdapter` implements the plugin that is loaded by *RTI Routing @@ -16,8 +18,12 @@ propagate information about the discovered input streams (in this case sockets) to the Routing Service. - `src/SocketStreamReader` implements an `StreamReader` that reads sample information from a UDP socket. +- `src/SocketStreamWriter` implements an `StreamWriter` that sends sample +information to a UDP socket. - `test/send_shape_to_socket.py` implements a simple tester to send shape type data to a UDP socket. +- `test/receive_shape_from_socket.py` implements a simple tester to receive shape +type data from a UDP socket. For more details, please refer to the *RTI Routing Service SDK* documentation. @@ -101,7 +107,7 @@ the Shapes Demo shipped with *RTI Connext DDS* installer bundle. You will find some hardcoded references to ShapeType and Square. If you wish to create a real Routing Service adapter, you should modify the code and XML accordingly. -There is 1 configuration (`-cfgName`) in the Routing Service XML file: +There are 2 configurations (`-cfgName`) in the Routing Service XML file: - **SocketAdapterToDDS** - It reads data from a UDP socket using the SocketAdapter and outputs it to DDS. You can visualize the ouptut by @@ -111,6 +117,8 @@ subscribing to Squares in Shapes Demo or running: $NDDSHOME/bin/rtiddsspy -printSample ``` +- **DDSToSocketAdapter** - It sends data from DDS to a UDP socket. + To run Routing Service, you will need first to set up your environment as follows. @@ -139,7 +147,7 @@ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdap Here is an output from a sample run: ```bash -$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter/build/ +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ $ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS RTI Routing Service 7.3.0 executing (with name SocketAdapterToSocketAdapter) @@ -158,6 +166,62 @@ python3 test/send_shape_to_socket.py 127.0.0.1 10203 You can now open a Shapes Demo instance on domain 0 and subscribe to Squares. You should start receiving a red Square. +Alternatively, you can also execute the test to send UDP sockets from DDS data: + +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +And to test the UDP socket content run: +```bash +python3 test/read_shape_from_socket.py 10203 +``` + + +## Running a data-diode example + +You can configure a data-diode scenario by using two Routing Services instances; +- One using **DDSToSocketAdapter** configuration to publish DDS data over a one direction UDP socket +- The other using **SocketAdapterToDDS** configuration to convert back to DDS samples +``` + ┌───────────┐ ┌─────────────┐ ┌─────────────┐ ┌───────────┐ + │ Connext │ │ Routing │ ┌────────────────┐ │ Routing │ │ Connext │ + │ App ├─►│ Service ├───►│ UDP DATA DIODE ├──►│ Service ├─►│ App │ + │ │ │ DDS TO UDP │ └────────────────┘ │ UDP TO DDS │ │ │ + └───────────┘ └─────────────┘ └─────────────┘ └───────────┘ +``` +To run this example in a local machine: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName SocketAdapterToDDS +RTI Routing Service 7.3.0 executing (with configuration=SocketAdapterToDDS) +``` +And in a different terminal: +```bash +$export RTI_LD_LIBRARY_PATH=~/$NDDSHOME/lib/$CONNEXT_ARCH:~/udp_socket_adapter_typed/build/ + +$ $NDDSHOME/bin/rtiroutingservice -cfgFile RsSocketAdapter.xml -cfgName DDSToSocketAdapter +RTI Routing Service 7.3.0 executing (with configuration=DDSToSocketAdapter) +``` + +Using the default configuration from RsSocketAdapter.xml, you need to publish Squares +on domain id 0 and subscribe to Squares on domain id 1 using rtishapes demo: + +```bash +$ $NDDSHOME/bin/rtishapesdemo -domainId 0 +``` + +```bash +$ $NDDSHOME/bin/rtishapesdemo -domainId 1 +``` +Then start publishing and subscribing to the Square topic. +You should be able to see red squares in the subscriber application. +Keep in mind the shape color has been overwritten in the adapter for showcasing purposes. + ## Requirements To run this example you will need: diff --git a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml similarity index 58% rename from examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml rename to examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml index 757cf5948..15cfeb391 100644 --- a/examples/routing_service/udp_socket_adapter/RsSocketAdapter.xml +++ b/examples/routing_service/udp_socket_adapter_typed/RsSocketAdapter.xml @@ -28,13 +28,12 @@ - + - 0 - - - + 1 + + @@ -70,11 +69,61 @@ You could use Triangle or Circle if you also modify the other references to Square that are hardcoded in the adapter code --> Square - + - + + + + + + + 0 + + + + + + + ON_ROUTE_MATCH + ShapeType + Square + + + + send_address + 127.0.0.1 + + + send_port + 0 + + + dest_address + 127.0.0.1 + + + dest_port + 10203 + + + + + + ON_DOMAIN_MATCH + ShapeType + Square + + + + + + + + \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter/Types.xml b/examples/routing_service/udp_socket_adapter_typed/Types.xml similarity index 100% rename from examples/routing_service/udp_socket_adapter/Types.xml rename to examples/routing_service/udp_socket_adapter_typed/Types.xml diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx new file mode 100644 index 000000000..a73226590 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.cxx @@ -0,0 +1,49 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "SocketAdapter.hpp" +#include "SocketConnection.hpp" + +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketAdapter::SocketAdapter(PropertySet &properties) +{ +} + +Connection *SocketAdapter::create_connection( + rti::routing::adapter::detail::StreamReaderListener + *input_stream_discovery_listener, + rti::routing::adapter::detail::StreamReaderListener + *output_stream_discovery_listener, + const PropertySet &properties) +{ + return new SocketConnection( + input_stream_discovery_listener, + output_stream_discovery_listener, + properties); +} + +void SocketAdapter::delete_connection(Connection *connection) +{ + /** + * Perform cleanup pertaining to the connection object here. + */ + delete connection; +} + +rti::config::LibraryVersion SocketAdapter::get_version() const +{ + return { 1, 0, 0, 'r' }; +} + +RTI_ADAPTER_PLUGIN_CREATE_FUNCTION_DEF(SocketAdapter) diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp new file mode 100644 index 000000000..0042adae7 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketAdapter.hpp @@ -0,0 +1,52 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETADAPTER_HPP +#define SOCKETADAPTER_HPP + +#include +#include +#include +#include +#include + +/* + * This is the initialization of the RS adapter. For simplicity, this adapter + * only reads from UDP and writes to DDS, not the other way around + */ +class SocketAdapter : public rti::routing::adapter::AdapterPlugin { +public: + explicit SocketAdapter(rti::routing::PropertySet &); + + rti::routing::adapter::Connection *create_connection( + rti::routing::adapter::detail::StreamReaderListener *, + rti::routing::adapter::detail::StreamReaderListener *, + const rti::routing::PropertySet &) final; + + void delete_connection(rti::routing::adapter::Connection *connection) final; + + rti::config::LibraryVersion get_version() const; +}; + +/** + * This macro defines a C-linkage symbol that can be used as create function + * for plug-in registration through XML. + * + * The generated symbol has the name: + * + * \code + * SocketAdapterPlugin_create_adapter_plugin + * \endcode + */ +RTI_ADAPTER_PLUGIN_CREATE_FUNCTION_DECL(SocketAdapter) + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx similarity index 75% rename from examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx index 729ce9bdf..c8f24a6fd 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketConnection.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form @@ -12,6 +12,7 @@ #include "SocketConnection.hpp" #include "SocketStreamReader.hpp" +#include "SocketStreamWriter.hpp" using namespace rti::routing; using namespace rti::routing::adapter; @@ -33,6 +34,14 @@ StreamReader *SocketConnection::create_stream_reader( return new SocketStreamReader(this, info, properties, listener); } +StreamWriter *SocketConnection::create_stream_writer( + Session *session, + const StreamInfo &info, + const PropertySet &properties) +{ + return new SocketStreamWriter(this, info, properties); +} + void SocketConnection::delete_stream_reader(StreamReader *reader) { SocketStreamReader *socket_reader = @@ -40,12 +49,22 @@ void SocketConnection::delete_stream_reader(StreamReader *reader) socket_reader->shutdown_socket_reader_thread(); delete reader; } +void SocketConnection::delete_stream_writer(StreamWriter *writer) +{ + SocketStreamWriter *socket_writer = dynamic_cast(writer); + delete writer; +} DiscoveryStreamReader *SocketConnection::input_stream_discovery_reader() { return &input_discovery_reader_; } +DiscoveryStreamReader *SocketConnection::output_stream_discovery_reader() +{ + return nullptr; +} + void SocketConnection::dispose_discovery_stream( const rti::routing::StreamInfo &stream_info) { diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp new file mode 100644 index 000000000..ae49f6c0c --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketConnection.hpp @@ -0,0 +1,72 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETCONNECTION_HPP +#define SOCKETCONNECTION_HPP + +#include +#include + + #include "SocketInputDiscoveryStreamReader.hpp" + +/* + * This class creates the RS Connection, which is an access point to our + * example data domain (a UDP socket) + */ +class SocketConnection : public rti::routing::adapter::Connection { +public: + SocketConnection( + rti::routing::adapter::StreamReaderListener + *input_stream_discovery_listener, + rti::routing::adapter::StreamReaderListener + *output_stream_discovery_listener, + const rti::routing::PropertySet &properties); + + rti::routing::adapter::StreamReader *create_stream_reader( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties, + rti::routing::adapter::StreamReaderListener *listener) final; + + rti::routing::adapter::StreamWriter *create_stream_writer( + rti::routing::adapter::Session *session, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet &properties) final; + + void delete_stream_reader( + rti::routing::adapter::StreamReader *reader) final; + + void delete_stream_writer( + rti::routing::adapter::StreamWriter *writer) final; + + rti::routing::adapter::DiscoveryStreamReader * + input_stream_discovery_reader() final; + + rti::routing::adapter::DiscoveryStreamReader * + output_stream_discovery_reader() final; + + /** + * @brief This function is called by the SocketStreamReader to indicate + * that it's time to dispose the route. The dispose set by the + * SocketInputDiscoveryStreamReader starts the chain of cleanup procedure. + * + * @param stream_info \b in. Reference to a StreamInfo object which should + * be used when creating a new StreamInfo sample with disposed set to true + */ + void dispose_discovery_stream( + const rti::routing::StreamInfo &stream_info); + + private: + SocketInputDiscoveryStreamReader input_discovery_reader_; +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx similarity index 98% rename from examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx index 79c3dddad..ecfd9c68e 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp similarity index 97% rename from examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp index 48c75f337..5103762d9 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketInputDiscoveryStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketInputDiscoveryStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx similarity index 98% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx index dbc2675c3..efec348ad 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.cxx +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.cxx @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp similarity index 97% rename from examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp rename to examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp index 5ab1e65a6..b3adf6d48 100644 --- a/examples/routing_service/udp_socket_adapter/src/SocketStreamReader.hpp +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamReader.hpp @@ -1,5 +1,5 @@ /* - * (c) 2024 Copyright, Real-Time Innovations, Inc. All rights reserved. + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. * * RTI grants Licensee a license to use, modify, compile, and create derivative * works of the Software. Licensee has the right to distribute object form diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx new file mode 100644 index 000000000..90d66d14f --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.cxx @@ -0,0 +1,130 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include +#include +#include +#include +#include + +#include +#include +#ifdef _WIN32 + #include + #pragma comment(lib, "ws2_32.lib") +#else + #include + #include + #include + #include + #include +#endif + +#include +#include +#include +#include "SocketStreamWriter.hpp" +#include "SocketStreamReader.hpp" //use ShapeType from here +#include + +using namespace dds::core::xtypes; +using namespace rti::routing; +using namespace rti::routing::adapter; + +SocketStreamWriter::SocketStreamWriter( + SocketConnection *connection, + const StreamInfo &info, + const PropertySet &properties + ) + : stream_info_(info.stream_name(), info.type_info().type_name()) +{ + + socket_connection_ = connection; + + adapter_type_ = + static_cast(info.type_info().type_representation()); + + + // Parse the properties provided in the xml configuration file + for (const auto &property : properties) { + if (property.first == SEND_ADDRESS_STRING) { + send_address_ = property.second; + } + else if (property.first == SEND_PORT_STRING) { + send_port_ = std::stoi(property.second); + } + else if (property.first == DEST_ADDRESS_STRING) + { + dest_address_ = property.second; + } + else if (property.first == DEST_PORT_STRING) + { + dest_port_ = std::stoi(property.second); + } + } + + socket = std::unique_ptr(new UdpSocket( + send_address_.c_str(), + send_port_)); +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos) +{ + size_t len = 0; + + ShapeType shapes; + uint32_t tempObject=0; + + for (const auto sample : samples) { + //send sample out UDP interface + if (sample->member_exists_in_type("shapesize")) + { + shapes.shapesize = sample->value("shapesize"); + shapes.x = sample->value("x"); + shapes.y = sample->value("y"); + len = +socket->send_data((char*)&shapes, sizeof(shapes), dest_address_.c_str(), dest_port_); + } + else + { + Logger::instance().local("Received Sample that is not valid ShapeType"); + } + } + return len; +} + +int SocketStreamWriter::write( + const std::vector &samples, + const std::vector &infos, + const SelectorState &selector_state) +{ + int len; + len = write(samples, infos); + return len; +} + +void SocketStreamWriter::return_loan( + std::vector &samples, + std::vector &infos) +{ + for (int i = 0; i < samples.size(); ++i) { + delete samples[i]; + delete infos[i]; + } + samples.clear(); + infos.clear(); +} + +SocketStreamWriter::~SocketStreamWriter() +{ +} diff --git a/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp new file mode 100644 index 000000000..053a92810 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/SocketStreamWriter.hpp @@ -0,0 +1,84 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef SOCKETSTREAMWRITER_HPP +#define SOCKETSTREAMWRITER_HPP + +#include +#include +#include +#include + +#include "SocketConnection.hpp" +#include "UdpSocket.hpp" + +#include +#include + +#define BUFFER_MAX_SIZE 1024 +#define SEND_ADDRESS_STRING "send_address" +#define SEND_PORT_STRING "send_port" +#define DEST_ADDRESS_STRING "dest_address" +#define DEST_PORT_STRING "dest_port" + +class SocketStreamWriter : public rti::routing::adapter::DynamicDataStreamWriter { +public: + explicit SocketStreamWriter( + SocketConnection *connection, + const rti::routing::StreamInfo &info, + const rti::routing::PropertySet & + ); + + virtual int + write(const std::vector &, + const std::vector &) final; + + virtual int write( + const std::vector &, + const std::vector &, + const rti::routing::adapter::SelectorState &selector_state) final; + + virtual void return_loan( + std::vector &, + std::vector &) final; + + ~SocketStreamWriter(); + + +private: + /** + * @brief Function used by socketreader_thread_ to read samples from the + * socket. + */ + + + SocketConnection *socket_connection_; + + std::unique_ptr socket; + + int send_port_; + int dest_port_; + + std::string send_address_; + std::string dest_address_; + rti::routing::StreamInfo stream_info_; + dds::core::xtypes::DynamicType *adapter_type_; + + struct ShapeType { + int x; + int y; + int shapesize; + }; + +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx new file mode 100644 index 000000000..8da702a3d --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.cxx @@ -0,0 +1,111 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#include "UdpSocket.hpp" + +#include + + +UdpSocket::UdpSocket(const char *ip, int port) +{ +#ifdef _WIN32 + WSADATA wsaData; + if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0) { + throw dds::core::IllegalOperationError("WSAStartup failed"); + } +#endif + + // Socket initialization + init_socket(); + memset(&server_addr, 0, sizeof(server_addr)); + + // Using non-blocking sockets for easier thread management +#ifdef _WIN32 + unsigned long nonBlocking = 1; + if (ioctlsocket(sockfd, FIONBIO, &nonBlocking) != 0) { + std::cerr << "Error setting socket to non-blocking\n"; + closesocket(sockfd); + WSACleanup(); + throw dds::core::IllegalOperationError("ioctlsocket failed"); + } +#else + fcntl(sockfd, F_SETFL, O_NONBLOCK); +#endif + + // Bind the socket + bind_socket(ip, port); +} + +UdpSocket::~UdpSocket() +{ +#ifdef _WIN32 + closesocket(sockfd); + WSACleanup(); +#else + close(sockfd); +#endif +} + +void UdpSocket::init_socket() +{ + if ((sockfd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + throw dds::core::IllegalOperationError("Socket creation failed"); + } +} + +void UdpSocket::bind_socket(const char *ip, int port) +{ + server_addr.sin_family = AF_INET; + inet_pton(AF_INET, ip, &(server_addr.sin_addr)); + server_addr.sin_port = htons(port); + + if (bind(sockfd, + (const struct sockaddr *) &server_addr, + sizeof(server_addr)) + == -1) { + throw dds::core::IllegalOperationError("Bind failed"); + } +} + +void UdpSocket::receive_data( + char *received_buffer, + int *received_bytes, + int size_of_original_buffer) +{ + socklen_t len = sizeof(server_addr); + + socklen_t client_addr_len = sizeof(client_addr); + + /** Receive data.Since it's non-blocking, it will return right away most + * of the times + */ + *received_bytes = recvfrom( + sockfd, + received_buffer, + size_of_original_buffer, + 0, + (struct sockaddr *) &client_addr, + &client_addr_len); + + return; +} + +int UdpSocket::send_data(char* tx_buffer, int tx_length, const char* destAddr, int destPort) +{ + sockaddr_in dest_addr; + dest_addr.sin_family = AF_INET; + dest_addr.sin_port = htons(destPort); + dest_addr.sin_addr.s_addr = inet_addr(destAddr); + + size_t length = sendto(sockfd, tx_buffer, tx_length, 0, (struct sockaddr*)&dest_addr, sizeof(dest_addr)); + return (int)length; +} \ No newline at end of file diff --git a/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp new file mode 100644 index 000000000..024429e1b --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/src/UdpSocket.hpp @@ -0,0 +1,75 @@ +/* + * (c) 2025 Copyright, Real-Time Innovations, Inc. All rights reserved. + * + * RTI grants Licensee a license to use, modify, compile, and create derivative + * works of the Software. Licensee has the right to distribute object form + * only for use with RTI products. The Software is provided "as is", with no + * warranty of any type, including any warranty for fitness for any purpose. + * RTI is under no obligation to maintain or support the Software. RTI shall + * not be liable for any incidental or consequential damages arising out of the + * use or inability to use the software. + */ + +#ifndef UDPSOCKETUTILS_HPP +#define UDPSOCKETUTILS_HPP + +#ifdef _WIN32 + #include + #include +#else + #include + #include + #include + #include + #include + #include +#endif + +#include + +#ifdef _WIN32 + #pragma comment(lib, "ws2_32.lib") +#endif + +/** + * @brief Utility class for UDP socket communication in the RTI Routing Service UDP Socket Adapter. + * + * UdpSocket provides a lightweight abstraction for UDP socket communication, + * supporting both Windows and POSIX systems. It is designed to be used by the Routing Service + * UDP socket adapter to send and receive raw UDP packets as part of data bridging between + * external UDP sources and DDS. + * + * The class handles socket creation, binding to a specified IP address and port, and + * ensures non-blocking operation for efficient integration with multi-threaded applications. + * It provides methods for receiving data from any UDP client and for sending data to a + * specified destination address and port. + */ + +class UdpSocket { +public: + UdpSocket(const char* ip, int port); + ~UdpSocket(); + void receive_data( + char* received_buffer, + int* received_bytes, + int size_of_original_buffer); + + int send_data( + char* tx_buffer, + int tx_length, + const char* destAddr, + int destPort); + +private: +#ifdef _WIN32 + SOCKET sockfd; +#else + int sockfd; +#endif + struct sockaddr_in server_addr, client_addr; + + void init_socket(); + void bind_socket(const char* ip, int port); +}; + +#endif diff --git a/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py b/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py new file mode 100644 index 000000000..f29f078e5 --- /dev/null +++ b/examples/routing_service/udp_socket_adapter_typed/test/read_shape_from_socket.py @@ -0,0 +1,47 @@ +import socket +import sys +import struct + +# Receive and parse data from the socket +def receive_data(sock): + # Receive data from the socket + data, addr = sock.recvfrom(1024) # Buffer size of 1024 bytes + # Unpack the data as 3 int types (x, y, shapesize) + x, y, size = struct.unpack("iii", data) + return x, y, size, addr + + +def main(): + if len(sys.argv) != 2: + print( + "Usage: python3 read_shape_from_socket.py " + ) + return + + # Input arguments + port = int(sys.argv[1]) + + samples_received = 0 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + try: + # Bind the socket to listen on the specified port + sock.bind(('', port)) + print(f"Listening for shape data on port {port}...") + + while True: + # Receive data from the socket + x, y, size, addr = receive_data(sock) + + # Print the received shape data + samples_received += 1 + print(f"Sample #{samples_received} from {addr}: x={x}, y={y}, size={size}") + + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"Error: {e}") + + +if __name__ == "__main__": + main() diff --git a/examples/routing_service/udp_socket_adapter/test/send_shape_to_socket.py b/examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py similarity index 100% rename from examples/routing_service/udp_socket_adapter/test/send_shape_to_socket.py rename to examples/routing_service/udp_socket_adapter_typed/test/send_shape_to_socket.py