Skip to main content

Example: Integrating machine data as a single API

This example shows how to integrate IoT and log data from multiple data sources.

info

If you are not familiar with RAW we recommend checking out our Getting started guide first. To use RAW, you need an account which you can create and use for free here.

Scenario

Suppose we are a site reliability engineer managing control software for industrial machines.

Recently we are having suspicious software crashes, and we want to create a data service to collect diagnostics/data to help us identify the cause(s).

info

If you want to try this example, you can deploy the following endpoint:

Machine data processing
Interrogating machine generated data for diagnostics.

Usage:

/examples/iot/failures?machineId=<int>

For instance:

/examples/iot/failures?machineId=1

The datasets

We have the following data sources:

  • Machine status information logged periodically and stored in a PostgreSQL database.
  • Software crashes from Docker, these come in a JSON format and output by familiar docker inspect commands.
  • Sensor data from the machines, exported in CSV.
  • Error Logs, stored in an S3 bucket, in familiar log file formats that require parsing.

Therefore, we need to combine data stored:

  • in a database;
  • and in a S3 bucket with several formats:
    • log files
    • CSV files
    • JSON files.

Machine Status

Machines are being monitored and their status and location are being stored in a PostgreSQL database. This table looks something like this:

idmodelagestatuslatitudelongitude
1model318OK46.5154716.644706
2model47OK46.5647826.551355
3model38OK46.5379846.629472
4model37OK46.5705006.591574

To read it we can use PostgreSQL.InferAndRead. Here we created a function where you pass a machine ID and returns the corresponding record from the table.

machine(id: int) =
let
data = PostgreSQL.InferAndRead(
"raw",
"example",
"machines",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "..."
)
in
Collection.First(Collection.Filter(data, (x) -> x.id == id))

We are reading data from the database "raw", schema "example" and table "machines". The output for id=1 is:

{
"id": 1,
"model": "model3",
"age": 18,
"status": "OK",
"latitude": 46.515471,
"longitude": 6.644706
}

Software crashes from Docker

Each machine has a specific service controlling it.

These services are deployed using docker. The status of this software can be extracted from the output of the docker-inspect command.

The output of docker-inspect is a (long) JSON document, e.g.:

 {
"Id": "806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3",
"Name": "machine 98",
"Created": "2015-11-26T06:00:00.000",
"Path": "bash",
"Image": "sha256:9873176a8ff5ac192ce4d7df8a403787558b9f3981a4c4d74afb3edceeda451c",
"Driver": "overlay2",
"Platform": "linux",
"Args": [
"arg1"
],
"State": {
"Status": "running",
"Running": false,
"Paused": false,
"Restarting": false,
"OOMKilled": false,
"Dead": true,
"Pid": 86629,
"ExitCode": 3,
"Error": "comp3",
"StartedAt": "2015-11-26T06:00:00.000",
"FinishedAt": "2015-11-26T06:00:00.000"
},
"ResolvConfPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/resolv.conf",
"HostnamePath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hostname",
"HostsPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hosts",
"LogPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3-json.log",
"RestartCount": 0,

We can get the corresponding machine from the Name field. The field State has an exit code, which tells us if the software finished successfully or not.

The following function extracts the relevant information in an easier-to-consume tabular form.

failures(id: int) =
let
dockerInspect = Json.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/docker-inspect-output.json"
),
summary = Collection.Transform(
dockerInspect,
(x) ->
{
MachineId: Int.From(String.SubString(x.Name, 9, 1)),
ExitCode: x.State.ExitCode,
Error: x.State.Error,
FinishedAt: x.State.FinishedAt
}
)
in
Collection.Filter(
summary,
(x) -> x.ExitCode > 0 and x.MachineId == id
)
[
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-01-05T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 1,
"Error": "comp1",
"FinishedAt": "2015-03-06T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 2,
"Error": "comp2",
"FinishedAt": "2015-04-20T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-06-19T06:00:00.000"
},

Error logs

Errors are collected from logs. These logs are uploaded to a logging service, which in turn collects and saves all records into an S3 bucket.

2015-01-01T05:54:15 WARN vibration close to treshold, check instrumentation panel ASAP.
2015-01-01T05:54:58 INFO calibration at 100%, checking inner sub-systems.
2015-01-01T05:55:41 ERROR voltage not measured for more than 25 seconds, reboot machine.
2015-01-01T05:56:24 INFO cleaning procedure schedulled soon, performing sub task 111.
2015-01-01T05:57:07 INFO task 155 schedulled soon, preparing next task.
2015-01-01T05:57:50 WARN inner temp increasing rapidly, please check internet connection.
2015-01-01T05:58:33 INFO cleaning procedure starting, calibrating.
2015-01-01T06:00:00 WARN machine 24 with error=error1
2015-01-01T05:54:15 ERROR inner temp not measured for more than 16 seconds, please call 041 123 456 789.

This file has a lot of data, but right now, we are only interested in lines that report machine errors. We can use Collection.Filter with a regular expression to remove all unwanted lines, like this:

let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches( x, "(.*) WARN machine (\\d+) with error=(\\w+).*")
)
in
filtered

Output:

[
"2015-01-01T06:00:00 WARN machine 24 with error=error1",
"2015-01-01T06:00:00 WARN machine 73 with error=error4",
"2015-01-01T06:00:00 WARN machine 81 with error=error1",
"2015-01-01T07:00:00 WARN machine 43 with error=error3",
"2015-01-01T08:00:00 WARN machine 14 with error=error4",
"2015-01-01T08:00:00 WARN machine 76 with error=error5"

Now we can use Regex.Groups to extract all the relevant fields. This is how the final function looks like:

errors(id: int) =
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
),
parsed = Collection.Transform(
filtered,
(x) ->
let
groups = Regex.Groups(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
in
{
machineId: Int.From(List.Get(groups, 1)),
timestamp: Timestamp.Parse(
List.Get(groups, 0),
"y-M-d'T'H:m:s"
),
error: List.Get(groups, 2)
}
)
in
Collection.Filter(parsed, (x) -> x.machineId == id)

errors(1)

The output is:

[
{
"machineId": 1,
"timestamp": "2015-01-03T07:00:00.000",
"error": "error1"
},
{
"machineId": 1,
"timestamp": "2015-01-03T20:00:00.000",
"error": "error3"
},
{
"machineId": 1,
"timestamp": "2015-01-04T06:00:00.000",
"error": "error5"
},

Sensor data

Sensor data is collected and stored in CSV files. We can read it using the following function:

telemetry(id: int) =
Collection.Filter(
Csv.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/telemetry-iso-time.csv"
),
(x) -> x.machineID == id
)

Output:

[
{
"datetime": "1/1/2015 6:00:00 AM",
"machineID": 1,
"volt": 176.217853015625,
"rotate": 418.504078221616,
"pressure": 113.077935462083,
"vibration": 45.0876857639276
},
{
"datetime": "1/1/2015 7:00:00 AM",
"machineID": 1,
"volt": 162.87922289706,
"rotate": 402.747489565395,
"pressure": 95.4605253823187,
"vibration": 43.4139726834815
},

Putting it all together

Now we have all the sources defined, we can start to dig into the data to get answers. For a given machine (id), we would like to collect some information about the last failure. We are interested in:

  • Basic information such as the error, timestamp, machine age, model etc. from ‘failures’ (docker json file) and ‘machines’ (database table).
  • Sensor data of the 6 hours before the crash (‘telemetry’ from our sensor csv file).
  • Errors of the 6 hours before the crash ('errors' from log files).

Let's create a function lastFailureData which aggregates all necessary data from each one of functions created before.

lastFailureData(machineId: int) =
let
machineData = machine(machineId),
failureData = failures(machineId),
lastFailure = Collection.Max(failureData.FinishedAt),
startMeasure = Timestamp.SubtractInterval(
lastFailure,
Interval.Build(hours = 6)
),
lastFailureRecord = Collection.First(
Collection.Filter(
failureData,
(x) -> x.FinishedAt == lastFailure
)
),
lastTelemetry = Collection.Filter(
telemetry(machineId),
(x) ->
x.datetime < lastFailure and x.datetime > startMeasure
),
lastErrors = Collection.Filter(
errors(machineId),
(x) ->
x.timestamp < lastFailure and x.timestamp > startMeasure
)
in
{
lastFailure: lastFailureRecord,
machineData: machineData,
lastTelemetry: lastTelemetry,
lastErrors: lastErrors
}

lastFailureData(1)

And finally, our output is:

{
"machineId": 1,
"age": 18,
"model": "model3",
"lastFailure": {
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-12-31T06:00:00.000"
},
"lastTelemetry": [
{
"datetime": "2015-12-31T01:00:00.000",
"machineID": 1,
"volt": 147.720615260015,
"rotate": 493.074645851158,
"pressure": 104.81366016439,
"vibration": 41.2714171061972
},
{
"datetime": "2015-12-31T02:00:00.000",
"machineID": 1,
"volt": 153.93048096902,
"rotate": 353.466012177296,
"pressure": 99.6570720990314,
"vibration": 42.806176552987
},
{
"datetime": "2015-12-31T03:00:00.000",
"machineID": 1,
"volt": 175.481807900786,
"rotate": 475.951631160907,
"pressure": 88.7452579535092,
"vibration": 39.9863347521755
},
{
"datetime": "2015-12-31T04:00:00.000",
"machineID": 1,
"volt": 179.860806868559,
"rotate": 461.478368479999,
"pressure": 120.299989462607,
"vibration": 35.8235042398746
},
{
"datetime": "2015-12-31T05:00:00.000",
"machineID": 1,
"volt": 172.645716803532,
"rotate": 386.985814610685,
"pressure": 96.0729702714405,
"vibration": 35.7556427077587
}
],
"lastErrors": []
}

Ready to try it out?

Register for free and start building today!

Otherwise, if you have questions/comments, join us on Discord!