Flux in the industry: 3 real world IIoT examples

Frederik Van Leeckwyck on , updated

We have been using InfluxDB in an industrial setting since 2016. This started with the development of an open-source OPC-UA collector, which is used across the globe. The OPC-UA protocol is important in the industrial setting since it can be used to gather sensor data, a form of time series data. Sensor data is collected from manufacturing equipment via programmable logic controllers (PLCs) and the supervisory control and data acquisition (SCADA) systems and then stored into InfluxDB. The sensor data includes measurements collected from equipment that help to ensure that the manufacturing process is running effectively.

Once in InfluxDB, the data is set to be visualized. This is the most common use case for manufacturers — the ability to see all sensor data, centrally in one location.

One of the first meetings I attended after joining Factry was with a food producer in Belgium who said to me, “If you can’t visualize it, you can’t understand it.” And while I do not necessarily agree, it touches upon the fact that if we can properly visualize an event, it becomes much easier to understand and make sense of the said event. When we analyze time series data, what we seek are new insights from the data, not just the data itself.

So in this post we describe 3 real-world cases where Flux brings new and meaningful insights into time series data collected from industrial equipment, ultimately supporting improved decision making.

First some context

We’ve thus received positive responses from engineers and operators regarding the functionality of InfluxQL, InfluxDB’s query language, in an industrial context. With the help of Grafana’s excellent query editor, in their dashboarding product, it’s easy to use, easy to learn and very readable. Despite this, advanced functionality was often lacking because the context was missing.

What do we mean by context? Imagine you’re sitting in a history class and you’re learning about event X in the year 1721, event Y in 1857 and so on. This history class is boring because all you’re being taught are dates and times of events, in order. This is comparable to time series data — data presented by date and in sequential order.

Imagine now that instead of learning just the dates and times, you receive information about the people that lived at that time, how they thought, what their days looked like, what their surroundings looked like, etc. This is context. This information provides you the context behind the history and behind the events, so now you can start understanding why people made the decisions they made. While the dates and times are important, an understanding of the people, the time periods and the events teach you so much more. Similarly, time series data is crucial but it is far more informative when coupled with contextual data and vice versa.

In an industrial setting, there are two levels of context seen from a technical perspective. The first level derives context from other data that is available in the time series data database. Examples of this include knowing that the value of a sensor only matters when a machine is in production and not in maintenance. Another example consists of calculating the difference between two pressure sensors at the beginning and end of a process. In short, this boils down to the functionality that was missing in the versions of InfluxDB prior to the introduction of their new query language, Flux. Flux comes with the ability to perform math across measurements (measurements are the container of the time series collected).

The second level of functionality comes from integration with other IT systems in a manufacturing context, typically described as the manufacturing execution system, or the MES-layer. Knowing the pressure or temperature evolution of a certain sensor in a machine tells you something, but it will tell you a lot more if you know what product was being produced, which operator was responsible for the line, what shift it was, what the machine’s setpoints were set to, etc. This information is typically present in other software and databases that support the manufacturing process. As this is often relational data, this currently falls out of scope with what we have currently achieved with Flux. But Flux will bring us the ability to bring multiple data sources together, so we anticipate interesting things to come.

Examples of Flux and math across measurements in industry

With InfluxDB and Flux, we’ve been able to extract more insights and value out of the time series data we collect. We will review a few use cases mentioned above, in more detail.

Use case 1: Calculating pressure drop across a filter

A company with a water purification system needs to determine when they should replace filters used in the purification process. They can physically walk up to the filter (if accessible) and inspect the filter to determine if it needs to be replaced or they can rely on data. In the graph below, the top panel shows the water pressure measurement collected at the input and the panel below that shows the water pressure measurement collected at the output. If you take the difference between the two measurements, you can see the pressure difference increase over time. This is depicted below. With this data, you can see that when the pressure drop exceeds a certain threshold, the filter needs to be replaced.

Raw pressure sensor data from PT01 (Pressure Transmitter 01) on the input side before the filter is plotted above. PT02 on the output side. The calculated pressure difference is plotted in the bottom graph.

The Flux query to achieve this:

// Create a generic function that can be reused to compute the 3m average
avg3m = (measurement) =>
from(bucket: "historian")
|> range(start:2018-05-01T23:30:00Z, stop: 2018-05-23T00:00:00Z)
|> filter(fn: (r) => r._measurement == measurement)
|> aggregateWindow(every: 3m, fn: mean)

// Use the function we just defined to get the data for the first
// and second pressure transmitters
pt01 = avg3m(measurement: "PT01")
pt02 = avg3m(measurement: "PT02")

// Join data and calculate the pressure drop
join(tables: {pt01:pt01, pt02:pt01}, on: ["_time"])
|> map(fn: (r) => ({
_time: r._time,
_pressureDrop: r._value_pt01 - r._value_pt02
}))
|> aggregateWindow(columns: ["_pressureDrop"], every: 1h, fn: mean) // to smoothen the graph a bit

Use case 2: Sensor values in the context of a machine state

In some cases, you might have a machine that runs 24/7 but is only doing meaningful work some of the time. We often encounter this with discrete workstations, for example, Computer Numerical Control (CNC) machines. In these cases, you might only be interested in the values of certain sensors given that the machine is in a state of “production” and not in the state of “waiting for operator input” or “maintenance.” In this case, we’re adding context (i.e. the machine state). Let’s construct this graph in four steps.

Step 1: First, we retrieve the raw machine status. This is different for every machine, as is the way each machine is programmed by the supplier or automation partner. In this case, we have over 200 status codes. As such, the resulting graph is pretty meaningless by itself.

Raw Status Codes from the Machine

The Flux query to achieve this:

// Retrieve status data. We’re not downsampling here because status codes are only written to the database on change, which means this is a very small dataset.
from(bucket: "historian")
  |> range(start: dashboardTime, stop: upperDashboardTime)
  |> filter(fn: (r) => r._measurement == "Status")

Step 2: Each status code represents a certain machine state. In our case, we’re only interested in status codes 5 and 10, because they represent the machine being in production. All other codes are irrelevant for our use case. So, let’s map these raw status codes to 1 when the machine is in production and 0 in all other cases.

Mapped Status Codes

The Flux query to achieve this:

Status = from(bucket: "historian")
  |> range(start: dashboardTime, stop: upperDashboardTime)
  |> filter(fn: (r) => r._measurement == "Status")
  |> map(fn: (r) => ({_time: r._time, _status: float(v: contains(value: int(v: r._value), set: [5,10])}), mergeKey: false) // map to 0 or 1
// The statements below could probably be optimized. They work for our use case but should be improved.
// Because the raw status code is only written to the database on change but we want to join on time later, we need to create time windows and fill these with the last known 0 or 1. So, we first create this windows with averages between 0 and 1.
  |> aggregateWindow(every: 30s, fn: mean, columns: ["_status"], createEmpty: true)
// Then we fill the empty windows with the previous values.
  |> fill(column: "_status", usePrevious: true)
// We don’t have a previous value at the beginning of our time window, so we cheat by filling it with 0. Ideally, we would fill with the last status value that falls just outside the start: dashboardTime range.
  |> fill(column: "_status", value: 0.0)
// Every window that contains a status > 0 should be categorized as "in production".
  |> map(fn: (r) => ({_time: r._time, _status: math.ceil(x: r._status)}), mergeKey: false)

Step 3: Retrieve the raw sensor data. This is standard Flux functionality.

We notice noise on the sensor between 17:00 and 18:30. This is because the machine is switched on but not in production. As such, these values are meaningless to us.

The Flux query to achieve this:

NTU = from(bucket: "historian")
  |> range(start: dashboardTime, stop: upperDashboardTime)
  |> filter(fn: (r) => r._measurement == "S01")
  |> aggregateWindow(every: 30s, fn: mean)
  |> keep(columns: ["_value", "_time"])
// Like in Step 2, we first fill with previous and finally 0 at the beginning of our time window
  |> fill(column: "_value", usePrevious: true)

Step 4: Finally, we’re ready to put all of the pieces together. From Step 2, we have 0/1 data telling us when the machine is in production and when it is not. From Step 3, we have the raw sensor data. Now, let’s bring these data sets together by joining them. The resulting graph shows us the sensor values when the machine is in production status.

The raw sensor data, in the context of the machine being in production status.

The Flux query to achieve this:

Sensor = from(bucket: "historian")
  |> range(start: dashboardTime, stop: upperDashboardTime)
  |> filter(fn: (r) => r._measurement == "S01")
  |> aggregateWindow(every: 30s, fn: mean)
  |> keep(columns: ["_value", "_time"])
  |> fill(column: "_value", usePrevious: true)
  |> fill(column: "_value", value: 0.0)

Status = from(bucket: "historian")
  |> range(start: dashboardTime, stop: upperDashboardTime)
  |> filter(fn: (r) => r._measurement == "Status")
  |> map(fn: (r) => ({_time: r._time, _status: float(v: contains(value: int(v: r._value), set: [5,10])}), mergeKey: false)
  |> aggregateWindow(every: 30s, fn: mean, columns: ["_status"], createEmpty: true)
  |> fill(column: "_status", usePrevious: true)
  |> fill(column: "_status", value: 0.0)
  |> map(fn: (r) => ({_time: r._time, _status: math.ceil(x: r._status)}), mergeKey: false)

join(
  tables: {n:Sensor, s:Status},
  on: ["_time"]
)
  |> map(fn: (r) => ({
  _time: r._time,
// All raw sensor values get multiplied by 0 when the machine is not in production
  _filteredValue: r._value * r._status
  }))
  |> yield()

Use case 3: Calculating the error between a forecast and the measured value

A third and final use case involves calculating the mean absolute error (MAE) to score the accuracy of a forecast compared to the measured values. This could be helpful in cases where your service is dependent on weather forecasts, for example. This formula returns a single value.

The MAE is calculated as follows:

The Flux query to achieve this:

WINDSPEED = from(bucket: "meteo")
  |> range(start: dashboardTime, stop: upperDashboardTime) // same window from which we have forecast data
  |> filter(fn: (r) => r._measurement == "WindSpeed")
  |> aggregateWindow(every: 10m, fn: mean)
  |> keep(columns: ["_value", "_time"])
  |> fill(column: "_value", value: 0.0) // fill nulls (in the future) with 0
  |> map(fn: (r) => ({_time: r._time, _real: r._value})) // map value column to new name

WINDSPEED_FORECAST = from(bucket: "meteo_forecast")
  |> range(start: dashboardTime, stop: upperDashboardTime)
  |> filter(fn: (r) => r._measurement == "WindSpeed")
  |> keep(columns: ["_value", "_time"])
  |> aggregateWindow(every: 10m, fn: mean)
  |> fill(column: "_value", usePrevious: true)
  |> fill(column: "_value", value: 0.0)
  |> map(fn: (r) => ({_time: r._time, _forecast: r._value}))

RES = join(tables: {curr: WINDSPEED, forecast: WINDSPEED_FORECAST}, on: ["_time"], method: "inner")
NUMERATOR = RES |> map(fn: (r) => ({_time: r._time, _diff: math.abs(x: r._forecast - r._real)})) |> sum(columns: ["_diff"])
DENOMINATOR = RES |> count(columns: ["_real"])
MAE = join(tables: {num: NUMERATOR, denom: DENOMINATOR}, on: ["_start"], method: "inner")
  |> map(fn: (r) => ({_value: float(v: r._diff) / float(v: r._real)}))
MAE |> yield()

Some remarks

The primary drawback we’ve encountered is the trade-off between functionality and simplicity, at least to the casual, non-technical user. While we see InfluxQL in Grafana as a piece of functionality that is easily taught to non-technical people in an industrial context, this is much less the case with Flux.

Members of the community can take steps in mitigating this drawback by providing examples, such as the ones described in this post, which others can build on. Furthermore, the extensibility of Flux will make it possible to build predefined functions to cover the most common use cases encountered in the manufacturing world.

In conclusion

We’re sure we will encounter more use cases for Flux as functionality becomes more readily available and the language becomes more widely supported.

Want to know more?

Factry is hosting the 3rd Time Series Belgium meetup on November 13th, 2019, with talks about Flux and other uses of time series databases. Sign up for free on the meetup page.

Never miss the golden tip, subscribe to our quarterly newsletter