Managing time series data using InfluxData

0
12216

 

InfluxData is used when IoT deployments require the support of data from a large number, say thousands, of sensors. This data can be collected, stored and used to issue alerts. It is a good choice when it comes to building custom monitoring solutions.

In a typical IoT application, data is collected at servers and middleware solutions from many devices with multiple sensors, and each device cluster generates a high volume of data at the rate of thousands to millions of records per second. These records are mapped to a time scale with nano second precision. Even though traditional databases are highly efficient for typical rates of insertions and deletions, they may not be able to handle high precision data optimally. Moreover, it is very rare to delete individual records in IoT or DevOps scenarios, and updates are not allowed most of the time. Instead, data may be dropped for a particular duration periodically after taking aggregates or anomalies. So, specialised storage engines optimised for this purpose are required.

A time series data base is an optimised solution for the above challenges. The list below gives open source databases designed for time series data management; some are exclusively designed for time series needs, and a few* come with additional support like custom storage engines and schema considerations:

  • InfluxDb, a part of InfluxData TICK stack, with independent storage engine
  • OpenTSDB, on top of Apache HBase/Cassandra or Google’s BigTable
  • KairosDB, on top of Apache Cassandra
  • Riak TS from Basho – NoSQL Time Series
  • Whisper Database, part of Graphite tool
  • Round Robin Database (RRD) tool
  • Prometheus.io – designed for monitoring metrics
  • MongoDB*
  • PostgresQL*

In this article, let’s take a tour of the InfluxData TICK stack and two of its components —Telegraf for data collection and InfluxDb for data storage. InfluxDb has an independent storage engine, whereas most other time series databases depend on other storage back-ends. Recent versions of InfluxDb are built on a Time Structured Merge (TSM) based BoltDB engine, which is a shift from LSM based LevelDB from v0.9.x.

The TICK stack comes with the following four components, with its unique ecosystem for complete IoT data management and integration with other solutions:

Telegraf  Data collection
InfluxDb Data storage
Chronograf Data visualisation
Kapacitor  Data processing, alerting and ETL jobs

This article is based on v1.2.0 of the above components, which is the latest at present (even though Telegraf v1.2.1 is available). All these components are available in rpm, deb package formats for 64-bit Linux platforms and binaries for 64-bit Windows, 32-bit Linux and ARM platforms are also available. Official Docker images are also available for the above components, These images can be started and linked together using Docker Compose using the hints given at https://github.com/influxdata/TICK-docker. One can build a custom Docker image using Docker build scripts available at https://github.com/influxdata/influxdata-docker or with the help of available rpm, deb packages using a base OS like Debian, Ubuntu or CentOS. You can also build these from sources using the GO build system for desired versions, which generates single binaries without external dependencies and templates for configuration files and necessary scripts.
Let’s now have a walkthrough of InfluxDb.

InfluxDb
Once InfluxDb is installed, you can launch the server using the influxd binary directly or using init control systems like systemctl based on init.d or systemd scripts, with the default configuration file being available at /etc/influxdb/infludb.conf. To enable the Web admin interface, modify the configuration file as follows and restart the server daemon:

[admin]
enabled=true
bind-address = “:8083”

Note: Please refer to Romin Irani’s article on InfluxDb published in the November 2016 edition of OSFY for initial pointers like database creation, and writing data and basic queries using the Influx CLI tool, Web console or HTTP REST APIs.

Additionally, InfluxData comes with various libraries to support APIs of different languages officially like Go, Java, Python, Node.js, PHP, Ruby, etc. It also supports a few third party libraries for, among others, Haskell, Lisp, .NET, Perl, Rust and Scala. A complete listing is available at

InfluxDb Documentation ==> API Client Libraries.

Input protocols and service plugins
By default, InfluxDb supports HTTP transport and data formats as per the line protocol, with the syntax shown in Figure 1.

Figure 1: Line protocol syntax

Here, tagset and fieldset are a combination of one or more key-value pairs, separated by a comma. Timestamp is a value as per the RFC3339 format, with nano precision. Measurement name and at least one field key-value pair is a must, while tagset or timestamp are optional.

SQL analogy
In comparison with SQL, measurements are like tables and points are like records; fields are equivalent to unindexed columns with numeric data only, on which aggregations can be applied; whereas tags are like indexed columns with any type of data (typically strings). A combination of a measurement with a tagset is known as a series. Influx doesn’t believe in pre-defined schema, i.e., in the following examples, measurements like temperature, humidity and pressure are created on-the-fly with suitable tag pairs.

Cross measurement joins are not allowed in InfluxQL; so, if you have any such plan, consider a single measurement with different tags. But, preferably, keep values with different meanings under separate measurements. Also, instead of encoding measurements or tag names with multiple pieces of information, use separate tags for each piece.

Let’s assume a few points have been written into the city weather database, as follows:

temperature,city=delhi,metric=celsius value=35 1488731314272947906
humidity,city=pune,sensor=dht internal=24,external=32
pressure,city=mumbai,sensor=bmp180 value=72

Data can be collected through other channels like UDP and other data formats, known as protocols supported by Graphite and OpenTSDB. This means InfluxDb can act like a drop-in replacement for OpenTSDB or Graphite, using the respective input plugins. It can also store the data sent by CollectD, such as performance and monitoring related metrics in a DevOps scenario.

Influx Query Language (QL)
InfluxQL supports SQL like query operations with typical clauses like FROM, WHERE, GROUP BY, ORDER BY, LIMIT, INTO, etc.

Let’s see some examples of time series specific queries:

SELECT min(value),max(value),mean(value) FROM temperature WHERE city=’pune’and time > now()-6h GROUP BY time(10m)
SELECT min(value),max(value), mean(value) FROM temperature WHERE time > now()-6h GROUP BY city,time(10m) fill(none)
SELECT min(value),max(value), mean(value) FROM temperature WHERE time > now()-6h GROUP BY time(10m) fill 0
SELECT*FROM temperature WHERE time>now()-15h ORDER BY time DESC

In the first query, data is queried for the past six hours with a filter of Pune as the city and grouping the aggregates every 10 minutes. In the second query, instead of filtering by city we are grouping by city, and skipping the results with empty values. In the third query, empty values are replaced by a default value like zero. The GROUP BY time clause requires the WHERE time clause, and it’s obvious that the GROUP BY interval exceeding the time slot in the WHERE clause is not meaningful. The WHERE time clause can be based on absolute timestamps also. The last query provides results in the descending order of timestamps, which fall over the last 15 hours.

Transforming data into other series
One can store the results of a query into other measurements or series. For example:

SELECT mean(value) INTO avg_temperature FROM temperature WHERE time > now() - 6h 
SELECT mean(value) INTO city_wise_temperature FROM temperature GROUP BY city

Downsampling of data
Since data volume is high in IoT and DevOps scenarios, it’s not feasible to retain all the data for a long time. So one can periodically take aggregations, anomalies from original series, store in long term series or other forms, and flush the original series. InfluxDb comes with downsampling of high precision data using the following two features.

Note: Here, precision refers to data volume and time frequency, and not the accuracy of floating values.

Retention policies: Each database comes with retention policies, which decide the persistence of data in measurements stored under it. By default, ‘autogen’ is the retention policy with infinite duration, i.e., data is never flushed out from autogen measurements. We can create custom retention policies for the desired duration, and select one of the policies as default per database.

Here is the syntax for creating a retention policy:

CREATE RETENTION POLICY <rp-name> ON <db-name> DURATION <duration>REPLICATION<n>[SHARD DURATION<duration>][DEFAULT]

Please refer to Storage Engine and Glossary of Terms sections of the documentation for more details on storage engine, shard groups, shard duration, etc.

Let’s create a few retention policies for the durations of one year, three days, two hours and call them long-term, mid-term and short-term policies.

CREATE RETENTION POLICY rp_lterm ON cityweather DURATION 365d REPLICATION 1

CREATE RETENTION POLICY rp_sterm ON cityweather DURATION 2h REPLICATION 1

CREATE RETENTION POLICY rp_mterm ON cityweather DURATION 72h 
REPLICATION 1 SHARD DURATION 3h

Typically, unqualified measurements are considered under a default retention policy. While writing data or querying, measurements can be qualified with non-default policies such as <policy-name>.<measurement> — for example, rp_lterm.temperature, rp_sterm.humidity, autogen.pressure, etc.

Transformed data can be stored in other databases also, using fully qualified names in the form of <db-name>.<rp-name>.<measurement> — for example, weatherdb.autogen.aggr_temperature, weatherdb.aggr_temperature, etc.

To list out all policies or drop a particular policy, you can use the following syntax:

SHOW RETENTION POLICIES on <db-name>
DROP RETENTION POLICY <rp-name> on <db-name>

Continuous queries: Before flushing the data, we need to take out aggregations and data beyond thresholds, and store the resultant points in measurements under a long-term retention policy or autogen. For this, one can schedule a query for periodical execution. This is possible through continuous queries. Let’s consider a few examples:

CREATE CONTINUOUS QUERY "cq_temp" ON "cityweather"
BEGIN 
SELECT MEAN(“value”),MIN("value"),MAX("value") INTO autogen.aggr_temperature FROM rp_short_term.temperature WHERE city="pune" GROUP BY time(1m) 
END
CREATE CONTINUOUS QUERY "cq_humidity" ON "cityweather"
BEGIN 
SELECT * INTO rp_long_term.beyond_humidity FROM rp_ short_term.humidity WHERE time > now()-1h humidity > 90 
END

You can observe that measurements specified in the INTO clause come under the autogen or long-term policy to keep low precision data forever or for a longer period, and the FROM clause is applied on measurements with a policy whose duration is smaller than that of the INTO clause.
With the help of the above two features, a high volume of data can be efficiently reduced to low volumes periodically.

InfluxDb is the right choice for storing high precision data like hundreds of thousands of points per second, as it has unique downsampling support. Therefore, many IoT and DevOps frameworks and solutions are adopting Influx in place of traditional alternatives. Here is a small listing of use cases:

  • Plugin for monitoring of internal metrics of HiveMQ
  • For storing tenant data from various devices in SiteWhere
  • Apache Spark, Kafka integrations for feeding data or storing the results
  • OpenHAB add-ons
  • Resource usage monitoring by Kubernetes

Let’s now take a quick look at Telegraf and its relation to InfluxDB.

Telegraf
Telegraf is a data gathering and feeding agent for InfluxDb, with a rich set of plugins for collecting and reporting metrics for different use cases like system performance and monitoring systems. Once Telegraf is launched directly through its binary or by using service managers, it activates input and output plugins as specified in the supplied configuration file. The default configuration file is stored in /etc/telegraf/telegraf.conf; you can modify it or provide a custom config file to the binary.

By default, input plugins related to system resources collect statistics like CPU, memory, diskIO, kernel, swapping and process related parameters. These are stored in respective measurements under the database named telegraf using InfluxDb output plugin. To check this, open the Influx shell in CLI or Web mode, select telegraf as the database and list out all measurements and series using ‘Show Measurements’, ‘Show Series’, respectively.

Let’s consider the usage of Telegraf in an IoT application. If end devices or gateways need to store the values into a database on the remote server side, supporting HTTP APIs can cause overheads all the time due to frequent updations. To minimise this, Telegraf comes with the MQTT service plugin, where devices publish data to the MQTT broker in a lightweight manner. Here, Telegraf acts like a subscriber receiving all the matching data, and stores the collected points in InfluxDb. For this, publishing devices should take care that the MQTT payload should be in the form of InfluxDb Line protocol. To enable this integration of MQTT and InfluxDb, edit the telegraf configuration as follows. Locate inputs.mqtt_consumer in telegraf.conf and uncomment the lines as follows:

[[inputs.mqtt_consumer]]
qos=0
topics = [
"telegraf/weather/temperature",
"telegraf/weather/humidity",
"telegraf/smarthome/#",
]
data_format = "influx"

You may fill a few more fields under this section optionally. Now, Telegraf collects payloads published with specified topics and stores them in the telegraf database by default. This has an added advantage of batch inputs, to minimise the usage of HTTP requests by merging multiple points falling under a particular interval into a single HTTP transaction.

So one can consider MQTT broker, Telegraf and InfluxDb on the same server node or in a cluster of different nodes with good network support; devices behind constrained networks publish data using the lightweight MQTT protocol.

Telegraf also comes with the MQTT output plugin to publish a few metrics to a broker residing on a remote machine. These can be used on Linux compatible targets to collect the following types of data:

  • On board system metrics
  • Sensor data collected through input plugin ‘sensors’
  • Data from nearby devices collected through TCP, UDP service plugins

Please check Telegraf documentation for a complete list of plugins and their usage. Once data is collected and stored, a dashboard has the role of visualisation of data. Chronograf is an immature visualisation tool from the TICK stack, but Grafana is a better choice with good integration for InfluxDb and many other databases. Apart from visualisation, if you wish to process the data, find the anomalies, schedule for alerts and do basic ETL jobs, Kapacitor is the right tool.

 

LEAVE A REPLY

Please enter your comment!
Please enter your name here