Skip to content

Commit

Permalink
add a parser for the Line Protocol (InfluxDB)
Browse files Browse the repository at this point in the history
  • Loading branch information
facontidavide committed Apr 9, 2024
1 parent 15dce41 commit b6839d0
Show file tree
Hide file tree
Showing 5 changed files with 331 additions and 0 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ add_subdirectory( plotjuggler_plugins/ToolboxLuaEditor )
add_subdirectory( plotjuggler_plugins/ParserProtobuf )
add_subdirectory( plotjuggler_plugins/ParserROS )
add_subdirectory( plotjuggler_plugins/ParserDataTamer )
add_subdirectory( plotjuggler_plugins/ParserLineInflux )

add_subdirectory( plotjuggler_plugins/PluginsZcm )

Expand Down
16 changes: 16 additions & 0 deletions plotjuggler_plugins/ParserLineInflux/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

add_definitions(${QT_DEFINITIONS})
add_definitions(-DQT_PLUGIN)


add_library(ParserLineInflux SHARED
line_parser.cpp
line_parser.h )

target_link_libraries(ParserLineInflux
${Qt5Widgets_LIBRARIES}
plotjuggler_base)


install(TARGETS ParserLineInflux
DESTINATION ${PJ_PLUGIN_INSTALL_DIRECTORY} )
111 changes: 111 additions & 0 deletions plotjuggler_plugins/ParserLineInflux/line_parser.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@

#include "line_parser.h"

using namespace PJ;

class MsgParserImpl : public MessageParser
{
public:
MsgParserImpl(const std::string& topic_name,
const std::string& type_name,
const std::string&,
PJ::PlotDataMapRef& data)
: MessageParser(topic_name, data), topic_name_(topic_name)
{
}

bool parseMessage(const MessageRef msg, double& timestamp) override
{
auto ToChar = [](const auto* ptr) { return reinterpret_cast<const char*>(ptr); };

const auto str = QString::fromLocal8Bit(ToChar(msg.data()), msg.size());

std::string key;
std::string prefix;
// Obtain the key name from measurement name and tags
for(auto line: str.splitRef('\n', Qt::SkipEmptyParts))
{
auto parts = line.split(' ', Qt::SkipEmptyParts);
if(parts.size() != 2 && parts.size() != 3) {
continue;
}
const auto tags = parts[0].split(',', Qt::SkipEmptyParts);
const auto fields = parts[1].split(',', Qt::SkipEmptyParts);
if(tags.size() < 1 || fields.size() < 1) {
continue;
}
uint64_t timestamp = 0;
if(parts.size() == 3)
{
timestamp = parts[2].toULongLong();
}
else {
using namespace std::chrono;
auto now = steady_clock::now();
timestamp = duration_cast<nanoseconds>(now.time_since_epoch()).count();
}
const double ts_sec = double(timestamp) * 1e-9;

prefix = topic_name_;
for(auto tag: tags)
{
prefix += '/';
auto tag_str = tag.toLocal8Bit();
prefix.append(tag_str.data(), tag_str.size());
}
for(auto field: fields)
{
const auto field_parts = field.split('=');
const auto name = field_parts[0].toLocal8Bit();
auto value = field_parts[1];

key = prefix;
key += '/';
key.append(name.data(), name.size());

if(value.startsWith('"') && value.endsWith('"'))
{
auto& data = _plot_data.getOrCreateStringSeries(key);
data.pushBack({ts_sec, std::string(ToChar(value.data()+1), value.size()-2)});
}
else if(value == "t" || value == "T" || value == "true" || value == "True" || value == "TRUE")
{
auto& data = _plot_data.getOrCreateNumeric(key);
data.pushBack({ts_sec, 1.0});
}
else if(value == "f" || value == "F" || value == "false" || value == "False" || value == "FALSE")
{
auto& data = _plot_data.getOrCreateNumeric(key);
data.pushBack({ts_sec, 0.0});
}
else {
bool ok = false;
// remove last character if there is an integer suffix
if(value.endsWith('i') || value.endsWith('u'))
{
value.chop(1);
}
double num = value.toDouble(&ok);
if(ok)
{
auto& data = _plot_data.getOrCreateNumeric(key);
data.pushBack({ts_sec, num});
}
}
}
}
return true;
}

private:

std::string topic_name_;
};

MessageParserPtr ParserLine::createParser(const std::string& topic_name,
const std::string& type_name,
const std::string& schema,
PJ::PlotDataMapRef& data)
{
return std::make_shared<MsgParserImpl>(topic_name, type_name, schema, data);
}
31 changes: 31 additions & 0 deletions plotjuggler_plugins/ParserLineInflux/line_parser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include "PlotJuggler/messageparser_base.h"

#include <QCheckBox>
#include <QDebug>
#include <string>

class ParserLine : public PJ::ParserFactoryPlugin
{
Q_OBJECT
Q_PLUGIN_METADATA(IID "facontidavide.PlotJuggler3.ParserFactoryPlugin")
Q_INTERFACES(PJ::ParserFactoryPlugin)

public:
ParserLine() = default;

const char* name() const override
{
return "ParserLine";
}
const char* encoding() const override
{
return "Influx (Line protocol)";
}

PJ::MessageParserPtr createParser(const std::string& topic_name,
const std::string& type_name,
const std::string& schema,
PJ::PlotDataMapRef& data) override;
};
172 changes: 172 additions & 0 deletions plotjuggler_plugins/ParserLineInflux/sample_telegraf.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
# Telegraf Configuration
#
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
#
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
#
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
#
# Environment variables can be used anywhere in this config file, simply surround
# them with ${}. For strings the variable must be within quotes (ie, "${STR_VAR}"),
# for numbers and booleans they should be plain (ie, ${INT_VAR}, ${BOOL_VAR})


# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"

# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "1s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = false

## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000

## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000

## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"

## Collection offset is used to shift the collection by the given amount.
## This can be be used to avoid many plugins querying constraint devices
## at the same time by manually scheduling them in time.
# collection_offset = "0s"

## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "5s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"

## Collected metrics are rounded to the precision specified. Precision is
## specified as an interval with an integer + unit (e.g. 0s, 10ms, 2us, 4s).
## Valid time units are "ns", "us" (or "µs"), "ms", "s".
##
## By default or when set to "0s", precision will be set to the same
## timestamp order as the collection interval, with the maximum being 1s:
## ie, when interval = "10s", precision will be "1s"
## when interval = "250ms", precision will be "1ms"
##
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"

## Log at debug level.
# debug = false
## Log only error level messages.
# quiet = false

## Log target controls the destination for logs and can be one of "file",
## "stderr" or, on Windows, "eventlog". When set to "file", the output file
## is determined by the "logfile" setting.
# logtarget = "file"

## Name of the file to be logged to when using the "file" logtarget. If set to
## the empty string then logs are written to stderr.
# logfile = ""

## The logfile will be rotated after the time interval specified. When set
## to 0 no time based rotation is performed. Logs are rotated only when
## written to, if there is no log activity rotation may be delayed.
# logfile_rotation_interval = "0h"

## The logfile will be rotated when it becomes larger than the specified
## size. When set to 0 no size based rotation is performed.
# logfile_rotation_max_size = "0MB"

## Maximum number of rotated archives to keep, any older logs are deleted.
## If set to -1, no archives are removed.
# logfile_rotation_max_archives = 5

## Pick a timezone to use when logging or type 'local' for local time.
## Example: America/Chicago
# log_with_timezone = ""

## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false

## Method of translating SNMP objects. Can be "netsnmp" (deprecated) which
## translates by calling external programs snmptranslate and snmptable,
## or "gosmi" which translates using the built-in gosmi library.
# snmp_translator = "netsnmp"

## Name of the file to load the state of plugins from and store the state to.
## If uncommented and not empty, this file will be used to save the state of
## stateful plugins on termination of Telegraf. If the file exists on start,
## the state in the file will be restored for the plugins.
# statefile = ""

## Flag to skip running processors after aggregators
## By default, processors are run a second time after aggregators. Changing
## this setting to true will skip the second run of processors.
# skip_processors_after_aggregators = false


# Read metrics about cpu usage
[[inputs.cpu]]
## Whether to report per-cpu stats or not
percpu = false
## Whether to report total system cpu stats or not
totalcpu = true
## If true, collect raw CPU time metrics
collect_cpu_time = false
## If true, compute and report the sum of all non-idle CPU states
## NOTE: The resulting 'time_active' field INCLUDES 'iowait'!
report_active = false
## If true and the info is available then add core_id and physical_id tags
core_tags = false


[[outputs.socket_writer]]
address = "udp://127.0.0.1:8094"

# Send telegraf metrics to file(s)
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## The file will be rotated after the time interval specified. When set
## to 0 no time based rotation is performed.
# rotation_interval = "1h"

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "influx"

## Compress output data with the specified algorithm.
## If empty, compression will be disabled and files will be plain text.
## Supported algorithms are "zstd", "gzip" and "zlib".
# compression_algorithm = ""

## Compression level for the algorithm above.
## Please note that different algorithms support different levels:
## zstd -- supports levels 1, 3, 7 and 11.
## gzip -- supports levels 0, 1 and 9.
## zlib -- supports levels 0, 1, and 9.
## By default the default compression level for each algorithm is used.
# compression_level = -1

0 comments on commit b6839d0

Please sign in to comment.