2.3. Fast DDS Statistics Backend Monitoring to SQL

2.3.1. Background

Vulcanexus integrates Fast DDS Statistics Backend, which is a useful tool for monitoring and studying a ROS 2 network since ROS 2 relies on the DDS specification to communicate the different nodes. The interaction with this tool is through a C++ API, which could be leveraged to create powerful monitoring tools developed by the user. In this tutorial we show how to create an application consisting of a Fast DDS Statistics Backend connected with a SQL in-disk database.

../../../../_images/summary.png

Note

This tutorial assumes the reader has already reviewed previous tutorial and understands how Fast DDS Statistics Backend works and how to interact with it.

Within this tutorial we explain how to create an application using the Fast DDS Statistics Backend to store instrumentation data. The final application created will store latency and throughput data of chatter topic in an in-disk relational SQL database.

2.3.2. Prerequisites

It is required to have previously installed Vulcanexus using one of the following installation methods:

Ensure that the Vulcanexus installation includes Vulcanexus Tools (either vulcanexus-iron-desktop, vulcanexus-iron-tools, or vulcanexus-iron-base).

Note

This tutorial uses SQLite3 as SQL library to connect with an in-disk database. This SQLite3 is already installed in Vulcanexus environment.

2.3.3. Resultant Database

This tutorial executable monitor_sql_tutorial produces a database stored with name vulcanexus_monitor.db in the workspace where launched. This database contains one table called data with 3 columns:

  • timestamp [key] Time since linux based time in milliseconds

  • latency_median Median of Latency in the interval timestamp - 5000 : timestamp in nanoseconds.

  • throughput_mean Median of Throughput in the interval timestamp - 5000 : timestamp in MB/second.

Every 5000 ms the program calls the Statistics Backend API and stores the results for latency median and throughput mean for all Nodes using chatter topic. The timestamp column is the key of the table as it cannot be repeated. It is stored as a number and not as string or timestamp to simplify the tutorial.

There exists a useful browser application to visualize the data inside a database file: http://inloop.github.io/sqlite-viewer/. The resultant database should look similar to the following one:

../../../../_images/database.png

Warning

It is possible that some data is not available because it is not being published from the entities. In these cases Statistics Backend returns NaN, which is parsed as a 0 when inserted in the database to avoid format issues.

2.3.4. Creating the monitor package and application

This section explains the source code required to implement this tutorial. However, some code is reused from previous tutorial and will not be repeated here.

2.3.4.1. Creating the application workspace

The application workspace will have the following structure at the end of the tutorial.

ros2_ws
└── src
    └── monitor_sql_tutorial
        ├── src
        |   └── sql_monitor.cpp
        ├── CMakeLists.txt
        └── package.xml

Let’s create the ROS 2 workspace and package by running the following commands:

mkdir -p ros2_ws/src
cd ros2_ws/src
ros2 pkg create --build-type ament_cmake monitor_sql_tutorial --dependencies fastcdr fastrtps fastdds_statistics_backend sqlite3_vendor

You should now see a new folder within your workspace src directory called monitor_sql_tutorial. This command also creates an include folder that is not needed for this tutorial.

2.3.4.2. Writing the monitor application

From the ros_ws/src/monitor_sql_tutorial/src directory in the workspace, run the following command to download the sql_monitor.cpp file.

wget -O sql_monitor.cpp \
    https://raw.githubusercontent.com/eProsima/vulcanexus/main/code/monitor_sql_tutorial/src/sql_monitor.cpp

This is the C++ source code for the application. This source code can also be found here.

// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file sql_monitor.cpp
*
*/

#include <chrono>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <string>
#include <thread>
#include <vector>

#include <sqlite3.h>

#include <fastdds_statistics_backend/listener/DomainListener.hpp>
#include <fastdds_statistics_backend/StatisticsBackend.hpp>
#include <fastdds_statistics_backend/types/EntityId.hpp>
#include <fastdds_statistics_backend/types/types.hpp>
#include <fastdds_statistics_backend/exception/Exception.hpp>

using namespace eprosima::statistics_backend;

constexpr const char* DB_FILENAME = "vulcanexus_monitor.db";

constexpr uint32_t DOMAIN = 0;
constexpr uint32_t INTERVAL_MS = 5000;
constexpr const char* TOPIC_NAME = "rt/chatter";
constexpr const char* DATA_TYPE_NAME = "std_msgs::msg::dds_::String_";

class Monitor
{
public:

    Monitor()
        : monitor_id_(EntityId::invalid())
        , topic_id_(EntityId::invalid())
        , database_(nullptr)
    {
        // Initialize sqlite database
        open_or_create_database();
        if (!database_)
        {
            throw Error("Error initializing database.");
        }

        // Initialize Monitor
        monitor_id_ = StatisticsBackend::init_monitor(DOMAIN);
        if (!monitor_id_.is_valid())
        {
            throw Error("Error initializing monitor.");
        }

        std::cout << "Backend to SQL running." << std::endl;
    }

    ~Monitor()
    {
        StatisticsBackend::stop_monitor(monitor_id_);
        close_database();
    }

    void run()
    {
        while (true)
        {
            if (!topic_id_.is_valid())
            {
                // If topic still not exist, do nothing
                topic_id_ = get_topic_id(TOPIC_NAME, DATA_TYPE_NAME);
                std::cout << "Topic " << TOPIC_NAME << " does not exist yet." << std::endl;
            }
            else
            {
                // If it exist, store data in database
                store_to_db();
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(INTERVAL_MS));
        }
    }

    //! Get the id of the topic searching by topic_name and data_type
    EntityId get_topic_id(
            std::string topic_name,
            std::string data_type)
    {
        // Get the list of all topics available
        std::vector<EntityId> topics = StatisticsBackend::get_entities(EntityKind::TOPIC);
        Info topic_info;
        // Iterate over all topic searching for the one with specified topic_name and data_type
        for (auto topic_id : topics)
        {
            topic_info = StatisticsBackend::get_info(topic_id);
            if (topic_info["name"] == topic_name && topic_info["data_type"] == data_type)
            {
                return topic_id;
            }
        }
        return EntityId::invalid();
    }

    // Get communications latency median
    StatisticsData get_latency_data()
    {
        // Publishers on a specific topic
        std::vector<EntityId> publishers = StatisticsBackend::get_entities(
            EntityKind::DATAWRITER,
            topic_id_);

        // Subscriptions on a specific topic
        std::vector<EntityId> subscriptions = StatisticsBackend::get_entities(
            EntityKind::DATAREADER,
            topic_id_);

        // Get current time
        std::chrono::system_clock::time_point now = std::chrono::system_clock::now();

        /*
        * Get the median of the FASTDDS_LATENCY of the last time interval
        * between the Publishers and Subscriptions publishing in and subscribed to a given topic
        */
        auto data = StatisticsBackend::get_data(
            DataKind::FASTDDS_LATENCY,                      // DataKind
            publishers,                                     // Source entities
            subscriptions,                                  // Target entities
            1,                                              // Number of bins
            now - std::chrono::milliseconds(INTERVAL_MS),   // t_from
            now,                                            // t_to
            StatisticKind::MEDIAN);                         // Statistic

        // There is only one value, check it is not nan
        return data[0];
    }

    //! Get publication thougput mean
    StatisticsData get_publication_throughput_mean()
    {
        // Publishers on a specific topic
        std::vector<EntityId> publishers = StatisticsBackend::get_entities(
            EntityKind::DATAWRITER,
            topic_id_);

        // Get current time
        std::chrono::system_clock::time_point now = std::chrono::system_clock::now();

        /*
        * Get the mean of the PUBLICATION_THROUGHPUT of the last time interval
        * of the Publishers publishing in a given topic
        */
        auto data = StatisticsBackend::get_data(
            DataKind::PUBLICATION_THROUGHPUT,               // DataKind
            publishers,                                     // Source entities
            1,                                              // Number of bins
            now - std::chrono::milliseconds(INTERVAL_MS),   // t_from
            now,                                            // t_to
            StatisticKind::MEAN);                           // Statistic

        // There is only one value, check it is not nan
        return data[0];
    }

    void open_or_create_database()
    {
        // Open database
        int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_SHAREDCACHE;
        if (sqlite3_open_v2(DB_FILENAME, &database_, flags, 0) != SQLITE_OK)
        {
            std::cerr << "Error opening or creating database." << std::endl;
            sqlite3_close(database_);
            database_ = nullptr;
            return;
        }

        // Create table if it does not exist
        std::cout << "Creating table with query: \"" << create_table_statement_ << "\"." << std::endl;
        if (sqlite3_exec(database_, create_table_statement_, 0, 0, 0) != SQLITE_OK)
        {
            std::cerr << "Error creating table." << std::endl;
            sqlite3_close(database_);
            database_ = nullptr;
            return;
        }
    }

    void close_database()
    {
        sqlite3_close(database_);
    }

    void store_to_db()
    {
        // Get data
        auto latency_data = get_latency_data();
        auto throughput_data = get_publication_throughput_mean();

        // Parse the query to add values. Use only latency timestamp as both would be almost the same
        sprintf(
            query_,
            insert_data_query_,
            std::chrono::duration_cast<std::chrono::milliseconds>(latency_data.first.time_since_epoch()).count(),
            (std::isnan(latency_data.second) ? 0 : latency_data.second),
            (std::isnan(throughput_data.second) ? 0 : throughput_data.second));

        std::cout << "Storing data in SQLite DataBase with query: \"" << query_ << "\"." << std::endl;

        // Insert in database (if fails, do nothing)
        if(sqlite3_exec(database_, query_, 0, 0, 0) != SQLITE_OK)
        {
            std::cerr << "An error ocurred inserting new data: " << sqlite3_errmsg(database_) << std::endl;
        }
    }

protected:

    static constexpr const char* create_table_statement_ =
        "CREATE TABLE IF NOT EXISTS data("
        "timestamp BIGINT,"
        "latency_median FLOAT,"
        "throughput_mean FLOAT,"
        "PRIMARY KEY(timestamp)"
        ") WITHOUT ROWID;";

    static constexpr const char* insert_data_query_ =
        "INSERT INTO data (timestamp,latency_median,throughput_mean) VALUES(%lu,%f,%f);";

    char query_[200];

    DomainId domain_;
    uint32_t t_interval_;

    EntityId monitor_id_;
    EntityId topic_id_;

    sqlite3* database_;
};

int main()
{
    // Create Monitor instance
    Monitor monitor;

    // Start Monitor
    monitor.run();

    return 0;
}

2.3.4.3. Examining the code

Before declaring the main class, there are some definitions that could be changed before compiling.

constexpr const char* DB_FILENAME = "vulcanexus_monitor.db";

constexpr uint32_t DOMAIN = 0;
constexpr uint32_t INTERVAL_MS = 5000;
constexpr const char* TOPIC_NAME = "rt/chatter";
constexpr const char* DATA_TYPE_NAME = "std_msgs::msg::dds_::String_";

The main class called Monitor creates the database and initializes the monitor in construction.

    Monitor()
        : monitor_id_(EntityId::invalid())
        , topic_id_(EntityId::invalid())
        , database_(nullptr)
    {
        // Initialize sqlite database
        open_or_create_database();
        if (!database_)
        {
            throw Error("Error initializing database.");
        }

        // Initialize Monitor
        monitor_id_ = StatisticsBackend::init_monitor(DOMAIN);
        if (!monitor_id_.is_valid())
        {
            throw Error("Error initializing monitor.");
        }

        std::cout << "Backend to SQL running." << std::endl;
    }

And it closes them in destruction.

    ~Monitor()
    {
        StatisticsBackend::stop_monitor(monitor_id_);
        close_database();
    }

The routine of the instance is an infinite loop where, if the topic has already been discovered, it stores data in the database. This is similar to previous tutorial.

    void run()
    {
        while (true)
        {
            if (!topic_id_.is_valid())
            {
                // If topic still not exist, do nothing
                topic_id_ = get_topic_id(TOPIC_NAME, DATA_TYPE_NAME);
                std::cout << "Topic " << TOPIC_NAME << " does not exist yet." << std::endl;
            }
            else
            {
                // If it exist, store data in database
                store_to_db();
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(INTERVAL_MS));
        }
    }

The database is opened if exists, or created otherwise. It is initialized with the table data. In case opening or creating the table fails, the execution will finish. In exit, it closes the database.

    void open_or_create_database()
    {
        // Open database
        int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_FULLMUTEX | SQLITE_OPEN_SHAREDCACHE;
        if (sqlite3_open_v2(DB_FILENAME, &database_, flags, 0) != SQLITE_OK)
        {
            std::cerr << "Error opening or creating database." << std::endl;
            sqlite3_close(database_);
            database_ = nullptr;
            return;
        }

        // Create table if it does not exist
        std::cout << "Creating table with query: \"" << create_table_statement_ << "\"." << std::endl;
        if (sqlite3_exec(database_, create_table_statement_, 0, 0, 0) != SQLITE_OK)
        {
            std::cerr << "Error creating table." << std::endl;
            sqlite3_close(database_);
            database_ = nullptr;
            return;
        }
    }

The routine to store new data in the database firstly calls the Statistics Bakend. Then it loads the data received in a query previously written. Finally it executes the query to insert the data.

    void store_to_db()
    {
        // Get data
        auto latency_data = get_latency_data();
        auto throughput_data = get_publication_throughput_mean();

        // Parse the query to add values. Use only latency timestamp as both would be almost the same
        sprintf(
            query_,
            insert_data_query_,
            std::chrono::duration_cast<std::chrono::milliseconds>(latency_data.first.time_since_epoch()).count(),
            (std::isnan(latency_data.second) ? 0 : latency_data.second),
            (std::isnan(throughput_data.second) ? 0 : throughput_data.second));

        std::cout << "Storing data in SQLite DataBase with query: \"" << query_ << "\"." << std::endl;

        // Insert in database (if fails, do nothing)
        if(sqlite3_exec(database_, query_, 0, 0, 0) != SQLITE_OK)
        {
            std::cerr << "An error ocurred inserting new data: " << sqlite3_errmsg(database_) << std::endl;
        }
    }

The queries used to interact with the database are defined inside the class as static const variables.

    static constexpr const char* create_table_statement_ =
        "CREATE TABLE IF NOT EXISTS data("
        "timestamp BIGINT,"
        "latency_median FLOAT,"
        "throughput_mean FLOAT,"
        "PRIMARY KEY(timestamp)"
        ") WITHOUT ROWID;";

    static constexpr const char* insert_data_query_ =
        "INSERT INTO data (timestamp,latency_median,throughput_mean) VALUES(%lu,%f,%f);";

Finally, the monitor application is initialized and run in main function.

int main()
{
    // Create Monitor instance
    Monitor monitor;

    // Start Monitor
    monitor.run();

    return 0;
}

2.3.4.4. CMakeLists.txt

Include at the end of the CMakeLists.txt file you created earlier the following code snippet. This adds all the source files needed to build the executable, and links the executable and the library together.

find_package(SQLite3 REQUIRED)  # provided by sqlite3_vendor
# Compile executable and link with dependencies
add_executable(${PROJECT_NAME} src/sql_monitor.cpp)
# Do not use ament as dependencies are not linked with ament
target_link_libraries(${PROJECT_NAME} fastrtps fastcdr fastdds_statistics_backend sqlite3)

# Install executable
install(
  TARGETS ${PROJECT_NAME}
  DESTINATION lib/${PROJECT_NAME}
)

This file can also be downloaded with this command in ros_ws/src/monitor_sql_tutorial directory:

wget -O CMakeLists.txt \
    https://raw.githubusercontent.com/eProsima/vulcanexus/main/code/monitor_sql_tutorial/CMakeLists.txt

2.3.5. Running the application

At this point the project is ready for building, compiling and running the application. From the base workspace directory (ros_ws), run the following commands.

colcon build
source install/setup.bash
ros2 run monitor_sql_tutorial monitor_sql_tutorial

Now open two more terminals and load the Vulcanexus environment. Then, run a talker and a listener of the demo_nodes_cpp ROS 2 package, available in the Vulcanexus Desktop distribution, in a different terminal each.

  • Terminal 1:

    source /opt/vulcanexus/iron/setup.bash
    export FASTDDS_STATISTICS="HISTORY_LATENCY_TOPIC;PUBLICATION_THROUGHPUT_TOPIC;PHYSICAL_DATA_TOPIC"
    ros2 run demo_nodes_cpp talker
    
  • Terminal 2:

    source /opt/vulcanexus/iron/setup.bash
    export FASTDDS_STATISTICS="HISTORY_LATENCY_TOPIC;PUBLICATION_THROUGHPUT_TOPIC;PHYSICAL_DATA_TOPIC"
    ros2 run demo_nodes_cpp listener
    

Note

In order to monitor other statistics topics, add them to environment variable FASTDDS_STATISTICS. Check the statistics topics available in the Fast DDS Documentation.

2.3.6. Next steps

Now you can develop more functionalities in your application, such as collecting more performance data or monitoring other topics. You may also check this tutorial explaining how to connect an application developed with the Fast DDS Statistics Backend to a visualization tool like Grafana.

For more information about Fast DDS Statistics Backend features please refer to the project’s documentation.