Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system filesystem)
find_package(Git QUIET)
find_package(ApMon MODULE)
find_package(CURL MODULE)
find_package(RdKafka CONFIG)

####################################
Expand Down Expand Up @@ -105,11 +106,12 @@ add_library(Monitoring SHARED
src/Exceptions/MonitoringException.cxx
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/Kafka.cxx>
$<$<BOOL:${CURL_FOUND}>:src/Transports/HTTP.cxx>
)

target_include_directories(Monitoring
PUBLIC
$<INSTALL_INTERFACE:include>
PUBLIC
$<INSTALL_INTERFACE:include>
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src
Expand All @@ -127,6 +129,7 @@ target_link_libraries(Monitoring
pthread
$<$<BOOL:${ApMon_FOUND}>:ApMon::ApMon>
$<$<BOOL:${RdKafka_FOUND}>:RdKafka::rdkafka++>
$<$<BOOL:${CURL_FOUND}>:CURL::libcurl>
)

# Handle ApMon optional dependency
Expand All @@ -138,6 +141,10 @@ if(RdKafka_FOUND)
message(STATUS " Compiling Kafka transport")
endif()

if(CURL_FOUND)
message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend")
endif()

# Detect operating system
if (UNIX AND NOT APPLE)
message(STATUS "Detected Linux: Process monitor enabled")
Expand All @@ -155,6 +162,7 @@ target_compile_definitions(Monitoring
$<$<BOOL:${LINUX}>:O2_MONITORING_OS_LINUX>
$<$<BOOL:${ApMon_FOUND}>:O2_MONITORING_WITH_APPMON>
$<$<BOOL:${RdKafka_FOUND}>:O2_MONITORING_WITH_KAFKA>
$<$<BOOL:${CURL_FOUND}>:O2_MONITORING_WITH_CURL>
)

# Use C++17
Expand Down Expand Up @@ -217,7 +225,7 @@ foreach (test ${TEST_SRCS})

add_executable(${test_name} ${test})
target_link_libraries(${test_name}
PRIVATE
PRIVATE
Monitoring Boost::unit_test_framework Boost::filesystem
)
add_test(NAME ${test_name} COMMAND ${test_name})
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ See the table below to find `URI`s for supported backends:
| InfluxDB | Unix socket | `influxdb-unix` | - | `info` |
| InfluxDB | StdOut | `influxdb-stdout` | - | `info` |
| InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` |
| InfluxDB 2.x | HTTP | `influxdbv2` | `org=ORG&bucket=BUCKET&token=TOKEN` | `info` |
| ApMon | UDP | `apmon` | - | `info` |
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |

Expand All @@ -62,7 +63,7 @@ A metric consist of 5 parameters:
| Parameter name | Type | Required | Default |
| -------------- |:--------------------------------:|:--------:| -----------------------:|
| name | string | yes | - |
| values | map&lt;string, int/double/string/uint64_t&gt; | no/1 | - |
| values | map&lt;string, int/double/string/uint64_t&gt; | no/1 | - |
| timestamp | time_point&lt;system_clock&gt; | no | current time |
| verbosity | Enum (Debug/Info/Prod) | no | Verbosity::Info |
| tags | map | no | host and process names |
Expand Down
36 changes: 36 additions & 0 deletions src/MonitoringFactory.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
#include "Transports/Kafka.h"
#endif

#ifdef O2_MONITORING_WITH_CURL
#include "Transports/HTTP.h"
#endif

namespace o2
{
/// ALICE O2 Monitoring system
Expand All @@ -56,6 +60,37 @@ std::unique_ptr<Backend> getStdOut(http::url uri)
}
}

/// Extracts token from header add sets it as addition HTTP header
/// http://localhost:9999/?org=YOUR_ORG&bucket=YOUR_BUCKET&token=AUTH_TOKEN
/// ->
/// http://localhost:9999/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET
/// --header "Authorization: Token YOURAUTHTOKEN"
std::unique_ptr<Backend> getInfluxDbv2(http::url uri)
{
#ifdef O2_MONITORING_WITH_CURL
std::string tokenLabel = "token=";
std::string path = "/api/v2/write";
std::string query = uri.search;

auto tokenStart = query.find(tokenLabel);
auto tokenEnd = query.find('&', tokenStart);
if (tokenEnd == std::string::npos) {
tokenEnd = query.length();
}
std::string token = query.substr(tokenStart + tokenLabel.length(), tokenEnd-(tokenStart + tokenLabel.length()));
// make sure ampersand is removed
if (tokenEnd < query.length() && query.at(tokenEnd) == '&') tokenEnd++;
if (tokenStart > 0 && query.at(tokenStart-1) == '&') tokenStart--;
query.erase(tokenStart, tokenEnd - tokenStart);

auto transport = std::make_unique<transports::HTTP>("http://" + uri.host + ':' + std::to_string(uri.port) + path + '?' + query);
transport->addHeader("Authorization: Token " + token);
return std::make_unique<backends::InfluxDB>(std::move(transport));
#else
throw std::runtime_error("HTTP transport is not enabled");
#endif
}

std::unique_ptr<Backend> getInfluxDb(http::url uri)
{
auto const position = uri.protocol.find_last_of('-');
Expand Down Expand Up @@ -129,6 +164,7 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url)
{"influxdb-unix", getInfluxDb},
{"influxdb-stdout", getInfluxDb},
{"influxdb-kafka", getInfluxDb},
{"influxdbv2", getInfluxDbv2},
{"apmon", getApMon},
{"no-op", getNoop}
};
Expand Down
68 changes: 68 additions & 0 deletions src/Transports/HTTP.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
///
/// \file HTTP.cxx
/// \author Adam Wegrzynek <adam.wegrzynek@cern.ch>
///

#include "HTTP.h"
#include "../MonLogger.h"
#include "../Exceptions/MonitoringException.h"
#include <boost/algorithm/string.hpp>

namespace o2
{
/// ALICE O2 Monitoring system
namespace monitoring
{
/// Monitoring transports
namespace transports
{

HTTP::HTTP(const std::string& url)
{
mHeaders = NULL;
mCurl = curl_easy_init();
curl_easy_setopt(mCurl, CURLOPT_URL, url.c_str());
curl_easy_setopt(mCurl, CURLOPT_SSL_VERIFYPEER, 0);
curl_easy_setopt(mCurl, CURLOPT_CONNECTTIMEOUT, 10);
curl_easy_setopt(mCurl, CURLOPT_TIMEOUT, 10);
curl_easy_setopt(mCurl, CURLOPT_POST, 1);
curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPIDLE, 120L);
curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPINTVL, 60L);
FILE *devnull = fopen("/dev/null", "w+");
curl_easy_setopt(mCurl, CURLOPT_WRITEDATA, devnull);

MonLogger::Get() << "HTTP transport initialized (" << url << ")" << MonLogger::End();
}

HTTP::~HTTP()
{
curl_slist_free_all(mHeaders);
curl_easy_cleanup(mCurl);
curl_global_cleanup();
}

void HTTP::addHeader(const std::string& header)
{
mHeaders = curl_slist_append(mHeaders, header.c_str());
curl_easy_setopt(mCurl, CURLOPT_HTTPHEADER, mHeaders);
}

void HTTP::send(std::string&& post)
{
CURLcode response;
long responseCode;
curl_easy_setopt(mCurl, CURLOPT_POSTFIELDS, post.c_str());
curl_easy_setopt(mCurl, CURLOPT_POSTFIELDSIZE, (long) post.length());
response = curl_easy_perform(mCurl);
curl_easy_getinfo(mCurl, CURLINFO_RESPONSE_CODE, &responseCode);
if (response != CURLE_OK) {
MonLogger::Get() << "HTTP Tranport " << curl_easy_strerror(response) << MonLogger::End();
}
if (responseCode < 200 || responseCode > 206) {
MonLogger::Get() << "HTTP Transport: Response code : " << std::to_string(responseCode) << MonLogger::End();
}
}

} // namespace transports
} // namespace monitoring
} // namespace o2
54 changes: 54 additions & 0 deletions src/Transports/HTTP.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
///
/// \file HTTP.h
/// \author Adam Wegrzynek <adam.wegrzynek@cern.ch>
///

#ifndef ALICEO2_MONITORING_TRANSPORTS_HTTP_H
#define ALICEO2_MONITORING_TRANSPORTS_HTTP_H

#include "TransportInterface.h"

#include <curl/curl.h>
#include <string>

namespace o2
{
/// ALICE O2 Monitoring system
namespace monitoring
{
/// Monitoring transports
namespace transports
{

/// \brief HTTP POST transport
///
/// Allows to push string formatted metrics as HTTP POST requests via cURL
class HTTP : public TransportInterface
{
public:
/// Constructor
/// \param url URL of HTTP server endpoint
HTTP(const std::string& url);

/// Destructor
~HTTP();

/// Sends metric via HTTP POST
/// \param post r-value reference string formatted metric
void send(std::string&& post);

/// Adds custom HTTP header
void addHeader(const std::string& header);
private:
/// CURL pointers
CURL *mCurl;

/// HTTP headers struct
struct curl_slist *mHeaders;
};

} // namespace transports
} // namespace monitoring
} // namespace o2

#endif // ALICEO2_MONITORING_TRANSPORTS_HTTP_H
4 changes: 2 additions & 2 deletions src/UriParser/UriParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
namespace http
{
struct url {
std::string protocol, user, password, host, path, search;
std::string protocol, user, password, host, path, search, url;
int port;
};

Expand Down Expand Up @@ -89,7 +89,7 @@ static inline url ParseHttpUrl(std::string& in)
{
url ret;
ret.port = -1;

ret.url = in;
ret.protocol = ExtractProtocol(in);
ret.search = ExtractSearch(in);
ret.path = ExtractPath(in);
Expand Down
6 changes: 5 additions & 1 deletion test/testInfluxDb.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ namespace monitoring
{
namespace Test
{

BOOST_AUTO_TEST_CASE(simplySendMetric)
{
auto monitoring = MonitoringFactory::Get("influxdb-udp://localhost:1000");
Expand All @@ -32,6 +31,11 @@ BOOST_AUTO_TEST_CASE(simplySendMetric2)
monitoring->send(Metric{10, "myCrazyMetric"});
}

BOOST_AUTO_TEST_CASE(InfluxDbv2)
{
auto monitoring = MonitoringFactory::Get("influxdbv2://localhost:9999?org=cern&bucket=test&token=TOKEN");
}

} // namespace Test
} // namespace monitoring
} // namespace o2