2.2. Monitoring ROS 2 with Fast DDS Statistics Backend¶
2.2.1. Background¶
Vulcanexus integrates Fast DDS Statistics Backend, which is a useful tool for monitoring and studying a ROS 2 network since ROS 2 relies on the DDS specification to communicate the different nodes. This tool will be used to create a monitoring application tailored to the user’s needs. Therefore, this tutorial provides a step by step tutorial on how to create your first application as a ROS 2 package to monitor your ROS 2 deployment.
This tutorial will use the demo_nodes_cpp
package, available in the Vulcanexus Desktop distribution. First, a ROS 2 talker
is launched and then a listener
node is started in the same machine.
At this point, the monitor application created using the Fast DDS Statistics Backend library will be deployed to record and present the overall communication performance.
2.2.2. Prerequisites¶
It is required to have previously installed Vulcanexus using one of the following installation methods:
Ensure that the Vulcanexus installation includes Vulcanexus Tools (either vulcanexus-jazzy-desktop
, vulcanexus-jazzy-tools
, or vulcanexus-jazzy-base
).
2.2.3. Creating the monitor package and application¶
In this section the monitoring application is created from scratch, covering the creation of the working environment and the development of the source code.
2.2.3.1. Creating the application workspace¶
The application workspace will have the following structure at the end of the project.
ros2_ws
└── src
└── monitor_tutorial
├── src
└── monitor.cpp
├── CMakeLists.txt
└── package.xml
Let’s create the ROS 2 workspace and package by running the following commands:
mkdir -p ros2_ws/src
cd ros2_ws/src
ros2 pkg create --build-type ament_cmake monitor_tutorial --dependencies fastcdr fastrtps fastdds_statistics_backend
You will now have a new folder within your workspace src directory called monitor_tutorial.
2.2.3.2. Writing the monitor application¶
From the ros_ws/src/monitor_tutorial/src directory in the workspace, run the following command to download the monitor.cpp file.
wget -O monitor.cpp \
https://raw.githubusercontent.com/eProsima/vulcanexus/main/code/monitor_tutorial/src/monitor.cpp
This is the C++ source code for the application. This source code can also be found here.
// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/**
* @file monitor.cpp
*
*/
#include <chrono>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <string>
#include <thread>
#include <vector>
#include <fastdds_statistics_backend/listener/DomainListener.hpp>
#include <fastdds_statistics_backend/StatisticsBackend.hpp>
#include <fastdds_statistics_backend/types/EntityId.hpp>
#include <fastdds_statistics_backend/types/types.hpp>
using namespace eprosima::statistics_backend;
class Monitor
{
public:
Monitor(
uint32_t domain,
uint32_t n_bins,
uint32_t t_interval)
: domain_(domain)
, n_bins_(n_bins)
, t_interval_(t_interval)
, monitor_id_(EntityId::invalid())
, topic_id_(EntityId::invalid())
{
}
~Monitor()
{
StatisticsBackend::stop_monitor(monitor_id_);
}
bool init()
{
monitor_id_ = StatisticsBackend::init_monitor(domain_);
if (!monitor_id_.is_valid())
{
std::cout << "Error creating monitor" << std::endl;
return 1;
}
StatisticsBackend::set_physical_listener(&physical_listener_);
return true;
}
void run()
{
while (true)
{
if (!topic_id_.is_valid())
{
topic_id_ = get_topic_id("rt/chatter", "std_msgs::msg::dds_::String_");
}
else
{
get_fastdds_latency_mean();
get_publication_throughput_mean();
}
std::this_thread::sleep_for(std::chrono::milliseconds(t_interval_*1000));
}
}
//! Get the id of the topic searching by topic_name and data_type
EntityId get_topic_id(
std::string topic_name,
std::string data_type)
{
// Get the list of all topics available
std::vector<EntityId> topics = StatisticsBackend::get_entities(EntityKind::TOPIC);
Info topic_info;
// Iterate over all topic searching for the one with specified topic_name and data_type
for (auto topic_id : topics)
{
topic_info = StatisticsBackend::get_info(topic_id);
if (topic_info["name"] == topic_name && topic_info["data_type"] == data_type)
{
return topic_id;
}
}
return EntityId::invalid();
}
// Get communications latency mean
void get_fastdds_latency_mean()
{
// Vector of Statistics Data to store the latency data
std::vector<StatisticsData> latency_data{};
// Publishers on a specific topic
std::vector<EntityId> publishers = StatisticsBackend::get_entities(
EntityKind::DATAWRITER,
topic_id_);
// Subscriptions on a specific topic
std::vector<EntityId> subscriptions = StatisticsBackend::get_entities(
EntityKind::DATAREADER,
topic_id_);
// Get current time
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
/*
* Get the mean of the FASTDDS_LATENCY of the last time interval
* between the Publishers and Subscriptions publishing in and subscribed to a given topic
*/
latency_data = StatisticsBackend::get_data(
DataKind::FASTDDS_LATENCY, // DataKind
publishers, // Source entities
subscriptions, // Target entities
n_bins_, // Number of bins
now - std::chrono::seconds(t_interval_), // t_from
now, // t_to
StatisticKind::MEAN); // Statistic
// Iterate over the retrieve data
for (auto latency : latency_data)
{
// Check if there are meningful values in retrieved vector
if (std::isnan(latency.second))
{
return;
}
// Print the latency data
std::cout << "ROS 2 Latency in topic " << StatisticsBackend::get_info(topic_id_)["name"] << ": ["
<< timestamp_to_string(latency.first) << ", " << latency.second / 1000 << " μs]" << std::endl;
}
}
//! Get publication thougput mean
void get_publication_throughput_mean()
{
// Vector of Statistics Data to store the publication throughput data
std::vector<StatisticsData> publication_throughput_data{};
// Publishers on a specific topic
std::vector<EntityId> publishers = StatisticsBackend::get_entities(
EntityKind::DATAWRITER,
topic_id_);
// Get current time
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
/*
* Get the mean of the PUBLICATION_THROUGHPUT of the last time interval
* of the Publishers publishing in a given topic
*/
publication_throughput_data = StatisticsBackend::get_data(
DataKind::PUBLICATION_THROUGHPUT, // DataKind
publishers, // Source entities
n_bins_, // Number of bins
now - std::chrono::seconds(t_interval_), // t_from
now, // t_to
StatisticKind::MEAN); // Statistic
// Iterate over the retrieve data
for (auto publication_throughput : publication_throughput_data)
{
// Check if there are meningful values in retrieved vector
if (std::isnan(publication_throughput.second))
{
return;
}
// Print the publication througput data
std::cout << "Publication throughput in topic " << StatisticsBackend::get_info(topic_id_)["name"] << ": ["
<< timestamp_to_string(publication_throughput.first) << ", "
<< publication_throughput.second << " B/s]" << std::endl;
}
}
//! Convert timestamp to string
std::string timestamp_to_string(
const Timestamp timestamp)
{
auto timestamp_t = std::chrono::system_clock::to_time_t(timestamp);
auto msec = std::chrono::duration_cast<std::chrono::milliseconds>(timestamp.time_since_epoch()).count();
msec %= 1000;
std::stringstream ss;
ss << std::put_time(localtime(×tamp_t), "%F %T") << "." << std::setw(3) << std::setfill('0') << msec;
return ss.str();
}
protected:
class Listener : public eprosima::statistics_backend::PhysicalListener
{
public:
Listener()
{
}
~Listener() override
{
}
void on_host_discovery(
EntityId host_id,
const DomainListener::Status& status) override
{
Info host_info = StatisticsBackend::get_info(host_id);
if (status.current_count_change == 1)
{
std::cout << "Host " << std::string(host_info["name"]) << " discovered." << std::endl;
}
else
{
std::cout << "Host " << std::string(host_info["name"]) << " updated info." << std::endl;
}
}
void on_user_discovery(
EntityId user_id,
const DomainListener::Status& status) override
{
Info user_info = StatisticsBackend::get_info(user_id);
if (status.current_count_change == 1)
{
std::cout << "User " << std::string(user_info["name"]) << " discovered." << std::endl;
}
else
{
std::cout << "User " << std::string(user_info["name"]) << " updated info." << std::endl;
}
}
void on_process_discovery(
EntityId process_id,
const DomainListener::Status& status) override
{
Info process_info = StatisticsBackend::get_info(process_id);
if (status.current_count_change == 1)
{
std::cout << "Process " << std::string(process_info["name"]) << " discovered." << std::endl;
}
else
{
std::cout << "Process " << std::string(process_info["name"]) << " updated info." << std::endl;
}
}
void on_topic_discovery(
EntityId domain_id,
EntityId topic_id,
const DomainListener::Status& status) override
{
Info topic_info = StatisticsBackend::get_info(topic_id);
Info domain_info = StatisticsBackend::get_info(domain_id);
if (status.current_count_change == 1)
{
std::cout << "Topic " << std::string(topic_info["name"])
<< " [" << std::string(topic_info["data_type"])
<< "] discovered." << std::endl;
}
else
{
std::cout << "Topic " << std::string(topic_info["name"])
<< " [" << std::string(topic_info["data_type"])
<< "] updated info." << std::endl;
}
}
void on_participant_discovery(
EntityId domain_id,
EntityId participant_id,
const DomainListener::Status& status) override
{
static_cast<void>(domain_id);
Info participant_info = StatisticsBackend::get_info(participant_id);
if (status.current_count_change == 1)
{
std::cout << "Participant with GUID " << std::string(participant_info["guid"]) << " discovered." << std::endl;
}
else
{
std::cout << "Participant with GUID " << std::string(participant_info["guid"]) << " updated info." << std::endl;
}
}
void on_datareader_discovery(
EntityId domain_id,
EntityId datareader_id,
const DomainListener::Status& status) override
{
static_cast<void>(domain_id);
Info datareader_info = StatisticsBackend::get_info(datareader_id);
if (status.current_count_change == 1)
{
std::cout << "DataReader with GUID " << std::string(datareader_info["guid"]) << " discovered." << std::endl;
}
else
{
std::cout << "DataReader with GUID " << std::string(datareader_info["guid"]) << " updated info." << std::endl;
}
}
void on_datawriter_discovery(
EntityId domain_id,
EntityId datawriter_id,
const DomainListener::Status& status) override
{
static_cast<void>(domain_id);
Info datawriter_info = StatisticsBackend::get_info(datawriter_id);
if (status.current_count_change == 1)
{
std::cout << "DataWriter with GUID " << std::string(datawriter_info["guid"]) << " discovered." << std::endl;
}
else
{
std::cout << "DataWriter with GUID " << std::string(datawriter_info["guid"]) << " updated info." << std::endl;
}
}
} physical_listener_;
DomainId domain_;
uint32_t n_bins_;
uint32_t t_interval_;
EntityId monitor_id_;
EntityId topic_id_;
};
int main()
{
int domain = 0;
int n_bins = 1;
int t_interval = 5;
Monitor monitor(domain, n_bins, t_interval);
if (monitor.init())
{
monitor.run();
}
return 0;
}
2.2.3.3. Examining the code¶
At the beginning of the file, the Doxygen style comment block with the @file
field states the name of the file.
/**
* @file monitor.cpp
*
*/
Below are the includes of the C++ headers that allow the use of Fast DDS and Fast DDS Statistics Backend API.
#include <fastdds_statistics_backend/listener/DomainListener.hpp>
#include <fastdds_statistics_backend/StatisticsBackend.hpp>
#include <fastdds_statistics_backend/types/EntityId.hpp>
#include <fastdds_statistics_backend/types/types.hpp>
Next, we define the namespace that contains the Fast DDS Statistics Backend classes and functions that we are going to use in our application.
using namespace eprosima::statistics_backend;
The next line creates the Monitor
class that implements the monitor.
class Monitor
The public constructor and destructor of the Monitor
class are defined below.
The constructor initializes the protected data members of the class to the values passed as arguments.
The class destructor stops the monitor.
Monitor(
uint32_t domain,
uint32_t n_bins,
uint32_t t_interval)
: domain_(domain)
, n_bins_(n_bins)
, t_interval_(t_interval)
, monitor_id_(EntityId::invalid())
, topic_id_(EntityId::invalid())
{
}
~Monitor()
{
StatisticsBackend::stop_monitor(monitor_id_);
}
Continuing with the public member functions of the Monitor
class, the next snippet of code defines the public initialization member function.
This function performs several actions:
Initialize the monitor.
Check the monitor has a valid id in the Statistics Backend database.
Assign the physical listener to the Statistics Backend. This listener will capture any update in the discovery of DDS entities.
bool init()
{
monitor_id_ = StatisticsBackend::init_monitor(domain_);
if (!monitor_id_.is_valid())
{
std::cout << "Error creating monitor" << std::endl;
return 1;
}
StatisticsBackend::set_physical_listener(&physical_listener_);
return true;
}
To run the monitor, the public member function run()
is implemented.
This function search for the rt/chatter
topic in the Statistics Backend database by calling the get_topic_id()
function.
If this is found, the we can proceed to compute the actual statistics data.
In order to do so, it calls get_fastdds_latency_mean()
and get_publication_throughput_mean()
public member functions explained below.
void run()
{
while (true)
{
if (!topic_id_.is_valid())
{
topic_id_ = get_topic_id("rt/chatter", "std_msgs::msg::dds_::String_");
}
else
{
get_fastdds_latency_mean();
get_publication_throughput_mean();
}
std::this_thread::sleep_for(std::chrono::milliseconds(t_interval_*1000));
}
}
As introduced before, the get_topic_id()
public member function get the id of the topic searching by topic name and data type name.
//! Get the id of the topic searching by topic_name and data_type
EntityId get_topic_id(
std::string topic_name,
std::string data_type)
{
// Get the list of all topics available
std::vector<EntityId> topics = StatisticsBackend::get_entities(EntityKind::TOPIC);
Info topic_info;
// Iterate over all topic searching for the one with specified topic_name and data_type
for (auto topic_id : topics)
{
topic_info = StatisticsBackend::get_info(topic_id);
if (topic_info["name"] == topic_name && topic_info["data_type"] == data_type)
{
return topic_id;
}
}
return EntityId::invalid();
}
The public member function get_fastdds_latency_mean()
gets the Fast DDS latency mean of the last t_interval
seconds between the talker
and the listener
.
To achieve this the function performs several actions:
Get the Publishers and Subscriptions in a Topic.
Get the current time.
Get the mean of the
FASTDDS_LATENCY
of the last time interval between the Publishers and Subscriptions publishing under and subscribed to the given topic,rt/chatter
in this case.
// Get communications latency mean
void get_fastdds_latency_mean()
{
// Vector of Statistics Data to store the latency data
std::vector<StatisticsData> latency_data{};
// Publishers on a specific topic
std::vector<EntityId> publishers = StatisticsBackend::get_entities(
EntityKind::DATAWRITER,
topic_id_);
// Subscriptions on a specific topic
std::vector<EntityId> subscriptions = StatisticsBackend::get_entities(
EntityKind::DATAREADER,
topic_id_);
// Get current time
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
/*
* Get the mean of the FASTDDS_LATENCY of the last time interval
* between the Publishers and Subscriptions publishing in and subscribed to a given topic
*/
latency_data = StatisticsBackend::get_data(
DataKind::FASTDDS_LATENCY, // DataKind
publishers, // Source entities
subscriptions, // Target entities
n_bins_, // Number of bins
now - std::chrono::seconds(t_interval_), // t_from
now, // t_to
StatisticKind::MEAN); // Statistic
// Iterate over the retrieve data
for (auto latency : latency_data)
{
// Check if there are meningful values in retrieved vector
if (std::isnan(latency.second))
{
return;
}
// Print the latency data
std::cout << "ROS 2 Latency in topic " << StatisticsBackend::get_info(topic_id_)["name"] << ": ["
<< timestamp_to_string(latency.first) << ", " << latency.second / 1000 << " μs]" << std::endl;
}
}
Finally, the public member function get_publication_throughput_mean()
gets the publication throughput mean of the last t_interval
seconds of the talker
.
The function has a similar execution procedure than the previous one but in this case it query the mean of the PUBLICATION_THROUGHPUT
instead of the FASTDDS_LATENCY
.
//! Get publication thougput mean
void get_publication_throughput_mean()
{
// Vector of Statistics Data to store the publication throughput data
std::vector<StatisticsData> publication_throughput_data{};
// Publishers on a specific topic
std::vector<EntityId> publishers = StatisticsBackend::get_entities(
EntityKind::DATAWRITER,
topic_id_);
// Get current time
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
/*
* Get the mean of the PUBLICATION_THROUGHPUT of the last time interval
* of the Publishers publishing in a given topic
*/
publication_throughput_data = StatisticsBackend::get_data(
DataKind::PUBLICATION_THROUGHPUT, // DataKind
publishers, // Source entities
n_bins_, // Number of bins
now - std::chrono::seconds(t_interval_), // t_from
now, // t_to
StatisticKind::MEAN); // Statistic
// Iterate over the retrieve data
for (auto publication_throughput : publication_throughput_data)
{
// Check if there are meningful values in retrieved vector
if (std::isnan(publication_throughput.second))
{
return;
}
// Print the publication througput data
std::cout << "Publication throughput in topic " << StatisticsBackend::get_info(topic_id_)["name"] << ": ["
<< timestamp_to_string(publication_throughput.first) << ", "
<< publication_throughput.second << " B/s]" << std::endl;
}
}
Then, the protected Listener
class is defined by inheriting from the PhysicalListener class.
This class overrides the default PhysicalListener callbacks, which allows the execution of routines in case of an event.
class Listener : public eprosima::statistics_backend::PhysicalListener
{
public:
Listener()
{
}
~Listener() override
{
}
Within the PhysicalListener class, we can override several callback to adapt how the monitor application reacts to some events. These overridden callbacks are:
on_host_discovery()
allows the definition of a series of actions when a new host is detected.void on_host_discovery( EntityId host_id, const DomainListener::Status& status) override { Info host_info = StatisticsBackend::get_info(host_id); if (status.current_count_change == 1) { std::cout << "Host " << std::string(host_info["name"]) << " discovered." << std::endl; } else { std::cout << "Host " << std::string(host_info["name"]) << " updated info." << std::endl; } }
on_user_discovery()
detects when a new user is discovered.void on_user_discovery( EntityId user_id, const DomainListener::Status& status) override { Info user_info = StatisticsBackend::get_info(user_id); if (status.current_count_change == 1) { std::cout << "User " << std::string(user_info["name"]) << " discovered." << std::endl; } else { std::cout << "User " << std::string(user_info["name"]) << " updated info." << std::endl; } }
on_process_discovery()
involves when a new process is discovered.void on_process_discovery( EntityId process_id, const DomainListener::Status& status) override { Info process_info = StatisticsBackend::get_info(process_id); if (status.current_count_change == 1) { std::cout << "Process " << std::string(process_info["name"]) << " discovered." << std::endl; } else { std::cout << "Process " << std::string(process_info["name"]) << " updated info." << std::endl; } }
on_topic_discovery()
is called when a new Topic is discovered.void on_topic_discovery( EntityId domain_id, EntityId topic_id, const DomainListener::Status& status) override { Info topic_info = StatisticsBackend::get_info(topic_id); Info domain_info = StatisticsBackend::get_info(domain_id); if (status.current_count_change == 1) { std::cout << "Topic " << std::string(topic_info["name"]) << " [" << std::string(topic_info["data_type"]) << "] discovered." << std::endl; } else { std::cout << "Topic " << std::string(topic_info["name"]) << " [" << std::string(topic_info["data_type"]) << "] updated info." << std::endl; } }
on_participant_discovery()
is called when a new participant is discovered.void on_participant_discovery( EntityId domain_id, EntityId participant_id, const DomainListener::Status& status) override { static_cast<void>(domain_id); Info participant_info = StatisticsBackend::get_info(participant_id); if (status.current_count_change == 1) { std::cout << "Participant with GUID " << std::string(participant_info["guid"]) << " discovered." << std::endl; } else { std::cout << "Participant with GUID " << std::string(participant_info["guid"]) << " updated info." << std::endl; } }
on_datareader_discovery()
andon_datawriter_discovery()
involves when a new DataReader or DataWriter respectively are discovered.void on_datareader_discovery( EntityId domain_id, EntityId datareader_id, const DomainListener::Status& status) override { static_cast<void>(domain_id); Info datareader_info = StatisticsBackend::get_info(datareader_id); if (status.current_count_change == 1) { std::cout << "DataReader with GUID " << std::string(datareader_info["guid"]) << " discovered." << std::endl; } else { std::cout << "DataReader with GUID " << std::string(datareader_info["guid"]) << " updated info." << std::endl; } } void on_datawriter_discovery( EntityId domain_id, EntityId datawriter_id, const DomainListener::Status& status) override { static_cast<void>(domain_id); Info datawriter_info = StatisticsBackend::get_info(datawriter_id); if (status.current_count_change == 1) { std::cout << "DataWriter with GUID " << std::string(datawriter_info["guid"]) << " discovered." << std::endl; } else { std::cout << "DataWriter with GUID " << std::string(datawriter_info["guid"]) << " updated info." << std::endl; } }
Finally, the monitor application is initialized and run in main
function.
int main()
{
int domain = 0;
int n_bins = 1;
int t_interval = 5;
Monitor monitor(domain, n_bins, t_interval);
if (monitor.init())
{
monitor.run();
}
return 0;
}
2.2.3.4. CMakeLists.txt¶
Include at the end of the CMakeList.txt file you created earlier the following code snippet. This adds all the source files needed to build the executable, and links the executable and the library together.
# Compile executable and link with dependencies
add_executable(${PROJECT_NAME} src/monitor.cpp)
# Do not use ament as dependencies are not linked with ament
target_link_libraries(${PROJECT_NAME} fastrtps fastcdr fastdds_statistics_backend)
# Install executable
install(
TARGETS ${PROJECT_NAME}
DESTINATION lib/${PROJECT_NAME}
)
This file can also be downloaded with this command in ros_ws/src/monitor_tutorial directory:
wget -O CMakeList.txt \
https://raw.githubusercontent.com/eProsima/vulcanexus/main/code/monitor_tutorial/CMakeLists.txt
2.2.4. Running the application¶
At this point the project is ready for building, compiling and running the application. From the base workspace directory (ros_ws), run the following commands.
colcon build
source install/setup.bash
ros2 run monitor_tutorial monitor_tutorial
Then open two more terminals and load the Vulcanexus environment.
Then, in one of them run a talker
and in the other one a listener
of the demo_nodes_cpp
ROS 2 package, available in the Vulcanexus Desktop distribution.
Terminal 1:
source /opt/vulcanexus/jazzy/setup.bash export FASTDDS_STATISTICS="HISTORY_LATENCY_TOPIC;PUBLICATION_THROUGHPUT_TOPIC;PHYSICAL_DATA_TOPIC" ros2 run demo_nodes_cpp talker
Terminal 2:
source /opt/vulcanexus/jazzy/setup.bash export FASTDDS_STATISTICS="HISTORY_LATENCY_TOPIC;PUBLICATION_THROUGHPUT_TOPIC;PHYSICAL_DATA_TOPIC" ros2 run demo_nodes_cpp listener
Note
In order to monitor other statistics topics, add them to environment variable FASTDDS_STATISTICS
.
Check the statistics topics available in the Fast DDS Documentation.
You should be able to see something similar to the following image.
2.2.5. Next steps¶
Now you can develop more functionalities in your application, such as collecting more performance data or monitoring other topics. You can check also this tutorial explaining how to connect an application developed with the Fast DDS Statistics Backend to a visualization tool like Grafana.
For more information about Fast DDS Statistics Backend features please refer to the project’s documentation.