Data Virtualization
Implement a data virtualization layers using APIs.
If you are already familiar with data virtualization, feel free to peek at some of the examples below that define endpoints in RAW that serve virtualized data:
Machine data processing
- Overview
- Code
This example shows how to integrate IoT and log data from multiple data sources.
The scenario
Suppose we are a site reliability engineer managing control software for industrial machines.
Recently we are having suspicious software crashes, and we want to create a data service to collect diagnostics/data to help us identify the cause(s).
The datasets
We have the following data sources:
- Machine status information logged periodically and stored in a PostgreSQL database.
- Software crashes from Docker, these come in a JSON format and output by familiar docker inspect commands.
- Sensor data from the machines, exported in CSV.
- Error Logs, stored in an S3 bucket, in familiar log file formats that require parsing.
Therefore, we need to combine data stored:
- in a database;
- and in a S3 bucket with several formats:
- log files
- CSV files
- JSON files.
Machine Status
Machines are being monitored and their status and location are being stored in a PostgreSQL database. This table looks something like this:
id | model | age | status | latitude | longitude |
---|---|---|---|---|---|
1 | model3 | 18 | OK | 46.515471 | 6.644706 |
2 | model4 | 7 | OK | 46.564782 | 6.551355 |
3 | model3 | 8 | OK | 46.537984 | 6.629472 |
4 | model3 | 7 | OK | 46.570500 | 6.591574 |
To read it we can use PostgreSQL.InferAndRead
. Here we created a function where you pass a machine ID
and returns the corresponding record from the table.
machine(id: int) =
let
data = PostgreSQL.InferAndRead(
"raw",
"example",
"machines",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "..."
)
in
Collection.First(Collection.Filter(data, (x) -> x.id == id))
We are reading data from the database "raw", schema "example" and table "machines". The output for id=1 is:
{
"id": 1,
"model": "model3",
"age": 18,
"status": "OK",
"latitude": 46.515471,
"longitude": 6.644706
}
Software crashes from Docker
Each machine has a specific service controlling it.
These services are deployed using docker. The status of this software can be extracted from the output of the docker-inspect command.
The output of docker-inspect is a (long) JSON document, e.g.:
{
"Id": "806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3",
"Name": "machine 98",
"Created": "2015-11-26T06:00:00.000",
"Path": "bash",
"Image": "sha256:9873176a8ff5ac192ce4d7df8a403787558b9f3981a4c4d74afb3edceeda451c",
"Driver": "overlay2",
"Platform": "linux",
"Args": [
"arg1"
],
"State": {
"Status": "running",
"Running": false,
"Paused": false,
"Restarting": false,
"OOMKilled": false,
"Dead": true,
"Pid": 86629,
"ExitCode": 3,
"Error": "comp3",
"StartedAt": "2015-11-26T06:00:00.000",
"FinishedAt": "2015-11-26T06:00:00.000"
},
"ResolvConfPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/resolv.conf",
"HostnamePath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hostname",
"HostsPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hosts",
"LogPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3-json.log",
"RestartCount": 0,
We can get the corresponding machine from the Name
field. The field State
has an exit code,
which tells us if the software finished successfully or not.
The following function extracts the relevant information in an easier-to-consume tabular form.
failures(id: int) =
let
dockerInspect = Json.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/docker-inspect-output.json"
),
summary = Collection.Transform(
dockerInspect,
(x) ->
{
MachineId: Int.From(String.SubString(x.Name, 9, 1)),
ExitCode: x.State.ExitCode,
Error: x.State.Error,
FinishedAt: x.State.FinishedAt
}
)
in
Collection.Filter(
summary,
(x) -> x.ExitCode > 0 and x.MachineId == id
)
[
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-01-05T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 1,
"Error": "comp1",
"FinishedAt": "2015-03-06T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 2,
"Error": "comp2",
"FinishedAt": "2015-04-20T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-06-19T06:00:00.000"
},
Error logs
Errors are collected from logs. These logs are uploaded to a logging service, which in turn collects and saves all records into an S3 bucket.
2015-01-01T05:54:15 WARN vibration close to treshold, check instrumentation panel ASAP.
2015-01-01T05:54:58 INFO calibration at 100%, checking inner sub-systems.
2015-01-01T05:55:41 ERROR voltage not measured for more than 25 seconds, reboot machine.
2015-01-01T05:56:24 INFO cleaning procedure schedulled soon, performing sub task 111.
2015-01-01T05:57:07 INFO task 155 schedulled soon, preparing next task.
2015-01-01T05:57:50 WARN inner temp increasing rapidly, please check internet connection.
2015-01-01T05:58:33 INFO cleaning procedure starting, calibrating.
2015-01-01T06:00:00 WARN machine 24 with error=error1
2015-01-01T05:54:15 ERROR inner temp not measured for more than 16 seconds, please call 041 123 456 789.
This file has a lot of data, but right now, we are only interested in lines that report machine errors.
We can use Collection.Filter
and a regex to remove all unwanted lines, like this:
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches( x, "(.*) WARN machine (\\d+) with error=(\\w+).*")
)
in
filtered
Output:
[
"2015-01-01T06:00:00 WARN machine 24 with error=error1",
"2015-01-01T06:00:00 WARN machine 73 with error=error4",
"2015-01-01T06:00:00 WARN machine 81 with error=error1",
"2015-01-01T07:00:00 WARN machine 43 with error=error3",
"2015-01-01T08:00:00 WARN machine 14 with error=error4",
"2015-01-01T08:00:00 WARN machine 76 with error=error5"
Now we can use Regex.Groups
to extract all the relevant fields. This is how the final function looks like:
errors(id: int) =
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
),
parsed = Collection.Transform(
filtered,
(x) ->
let
groups = Regex.Groups(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
in
{
machineId: Int.From(List.Get(groups, 1)),
timestamp: Timestamp.Parse(
List.Get(groups, 0),
"y-M-d'T'H:m:s"
),
error: List.Get(groups, 2)
}
)
in
Collection.Filter(parsed, (x) -> x.machineId == id)
errors(1)
Output:
[
{
"machineId": 1,
"timestamp": "2015-01-03T07:00:00.000",
"error": "error1"
},
{
"machineId": 1,
"timestamp": "2015-01-03T20:00:00.000",
"error": "error3"
},
{
"machineId": 1,
"timestamp": "2015-01-04T06:00:00.000",
"error": "error5"
},
Sensor data
Sensor data is collected and stored in CSV files. We can read it using the following function:
telemetry(id: int) =
Collection.Filter(
Csv.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/telemetry-iso-time.csv"
),
(x) -> x.machineID == id
)
Output:
[
{
"datetime": "1/1/2015 6:00:00 AM",
"machineID": 1,
"volt": 176.217853015625,
"rotate": 418.504078221616,
"pressure": 113.077935462083,
"vibration": 45.0876857639276
},
{
"datetime": "1/1/2015 7:00:00 AM",
"machineID": 1,
"volt": 162.87922289706,
"rotate": 402.747489565395,
"pressure": 95.4605253823187,
"vibration": 43.4139726834815
},
Collecting our information for interrogation
Now we have all the sources defined, we can start to dig into the data to get answers. For a given machine (id), we would like to collect some information about the last failure. We are interested in:
- Basic information such as the error, timestamp, machine age, model etc. from ‘failures’ (docker json file) and ‘machines’ (database table).
- Sensor data of the 6 hours before the crash (‘telemetry’ from our sensor csv file).
- Errors of the 6 hours before the crash ('errors' from log files).
Let's create a function lastFailureData
which aggregates all necessary data from each one of functions created before.
lastFailureData(machineId: int) =
let
machineData = machine(machineId),
failureData = failures(machineId),
lastFailure = Collection.Max(failureData.FinishedAt),
startMeasure = Timestamp.SubtractInterval(
lastFailure,
Interval.Build(hours = 6)
),
lastFailureRecord = Collection.First(
Collection.Filter(
failureData,
(x) -> x.FinishedAt == lastFailure
)
),
lastTelemetry = Collection.Filter(
telemetry(machineId),
(x) ->
x.datetime < lastFailure and x.datetime > startMeasure
),
lastErrors = Collection.Filter(
errors(machineId),
(x) ->
x.timestamp < lastFailure and x.timestamp > startMeasure
)
in
{
lastFailure: lastFailureRecord,
machineData: machineData,
lastTelemetry: lastTelemetry,
lastErrors: lastErrors
}
lastFailureData(1)
Output:
{
"machineId": 1,
"age": 18,
"model": "model3",
"lastFailure": {
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-12-31T06:00:00.000"
},
"lastTelemetry": [
{
"datetime": "2015-12-31T01:00:00.000",
"machineID": 1,
"volt": 147.720615260015,
"rotate": 493.074645851158,
"pressure": 104.81366016439,
"vibration": 41.2714171061972
},
{
"datetime": "2015-12-31T02:00:00.000",
"machineID": 1,
"volt": 153.93048096902,
"rotate": 353.466012177296,
"pressure": 99.6570720990314,
"vibration": 42.806176552987
},
{
"datetime": "2015-12-31T03:00:00.000",
"machineID": 1,
"volt": 175.481807900786,
"rotate": 475.951631160907,
"pressure": 88.7452579535092,
"vibration": 39.9863347521755
},
{
"datetime": "2015-12-31T04:00:00.000",
"machineID": 1,
"volt": 179.860806868559,
"rotate": 461.478368479999,
"pressure": 120.299989462607,
"vibration": 35.8235042398746
},
{
"datetime": "2015-12-31T05:00:00.000",
"machineID": 1,
"volt": 172.645716803532,
"rotate": 386.985814610685,
"pressure": 96.0729702714405,
"vibration": 35.7556427077587
}
],
"lastErrors": []
}
// Machine information taken from a postgresql database
machine(id: int) =
let
data = PostgreSQL.InferAndRead(
"raw",
"example",
"machines",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "BTSWkufumcv5oSq1vcbVF9f0"
)
in
Collection.First(Collection.Filter(data, (x) -> x.id == id))
// Software crashes taken from docker instect file
failures(id: int) =
let
dockerInspect = Json.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/docker-inspect-output.json"
),
summary = Collection.Transform(
dockerInspect,
(x) ->
{
MachineId: Int.From(String.SubString(x.Name, 9, 1)),
ExitCode: x.State.ExitCode,
Error: x.State.Error,
FinishedAt: x.State.FinishedAt
}
)
in
Collection.Filter(
summary,
(x) -> x.ExitCode > 0 and x.MachineId == id
)
// Errors taken from log files
errors(id: int) =
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
),
parsed = Collection.Transform(
filtered,
(x) ->
let
groups = Regex.Groups(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
in
{
machineId: Int.From(List.Get(groups, 1)),
timestamp: Timestamp.Parse(
List.Get(groups, 0),
"y-M-d'T'H:m:s"
),
error: List.Get(groups, 2)
}
)
in
Collection.Filter(parsed, (x) -> x.machineId == id)
// Sensor data taken from a CSV file
telemetry(id: int) =
Collection.Filter(
Csv.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/telemetry-iso-time.csv"
),
(x) -> x.machineID == id
)
// Function getting the last failure of a machine and all relevant data
// Collects sensor data and errors for the 6 hours before the crash
main(machineId: int) =
let
machineData = machine(machineId),
failureData = failures(machineId),
lastFailure = Collection.Max(failureData.FinishedAt),
startMeasure = Timestamp.SubtractInterval(
lastFailure,
Interval.Build(hours = 6)
),
lastFailureRecord = Collection.First(
Collection.Filter(
failureData,
(x) -> x.FinishedAt == lastFailure
)
),
lastTelemetry = Collection.Filter(
telemetry(machineId),
(x) ->
x.datetime < lastFailure and x.datetime > startMeasure
),
lastErrors = Collection.Filter(
errors(machineId),
(x) ->
x.timestamp < lastFailure and x.timestamp > startMeasure
)
in
{
lastFailure: lastFailureRecord,
machineData: machineData,
lastTelemetry: lastTelemetry,
lastErrors: lastErrors
}
// The following test will run if you press the [Run Code] button directly.
main(1)
Analyzing the news
- Overview
- Code
This example shows how to create an endpoint that analyzes the news by combining data from multiple external web APIs.
How to analyze the news?
There are three steps involved:
- obtain a machine-readable list of news articles;
- process the news articles to obtain additional metadata;
- use a natural language API to detect well identified entities.
To start, we use an RSS feed which is a well-known XML standard to present updates to websites in a computer-readable format. For our example, we use CNN US channel that has a list of news feeds on different subjects.
Then, for each news article link in the file, we issue a call to OpenGraph.io
,
which exposes an API that extracts OpenGraph metadata from the content of a URL.
Finally, text summaries are sent to Google's natural language API that returns a number of well identified entities (people, companies, locations) it recognized in it.
Now that we have associated each article to its formal entities, we perform an aggregation tto shape these information to our needs.
Step 1: Read an RSS feed
RSS format is based on XML.
Snapi reads XML natively with Xml.InferAndRead
. Here's how to read the CNN US news feed:
let
feed = Xml.InferAndRead("http://rss.cnn.com/rss/edition_us.rss")
in
// links, titles and other metadata which reside inside the `item` node, within `channel`.
feed.channel.item.title
The results look like:
[
"Suspect in Dallas Zoo animal thefts allegedly admitted to the crime and says he would do it again, affidavits claim",
"School and food vendor apologize for insensitive lunch served on first day of Black History Mont",
"An off-duty New York police officer who was shot while trying to buy an SUV has died",
"Labor Secretary Marty Walsh expected to leave Biden administration | CNN Politics",
...
"HS football players gain perspective helping vets",
"Milwaukee Dancing Grannies planning return",
"Fire crews respond to fire at boarded up building",
...
]
Step 2: Extracting OpenGraph metadata
RSS data contains some metadata about each article it refers to (e.g. its title), but more metadata can be found in the articles themselves. We have to traverse down to process articles.
OpenGraph specifies a set of <meta/>
HTML tags that help including generic web pages in Facebook's social graph.
News articles include the title of the article, its type (e.g. article, opinion), links to illustrations and a description that contains a summary of the article.
Here are the tags found in one of the CNN articles.
<meta property="og:title" content="Suspect in Dallas Zoo animal thefts allegedly admitted to the crime and says he would do it again, affidavits claim">
<meta property="og:site_name" content="CNN">
<meta property="og:type" content="article">
<meta property="og:url" content="https://www.cnn.com/2023/02/08/us/dallas-zoo-suspect-arrest-affidavits/index.html">
<meta property="og:image" content="https://cdn.cnn.com/cnnnext/dam/assets/230207231552-01-dallas-zoo-020323-file-super-tease.jpg">
<meta property="og:description" content="The man who faces charges stemming from a string of suspicious activities at the Dallas Zoo allegedly admitted to stealing two tamarin monkeys and trying to steal the clouded snow leopard last month, according to arrest warrant affidavits.">
The OpenGraph.io
website exposes an API that
extracts OpenGraph metadata from the content of a URL. This includes the
description
field. We'd like to isolate that description field in order to
perform textual analysis later. As we're processing the collection of links
found in the RSS file, the content of their description
tag can be obtained
by passing that link to OpenGraph.io
's API.
Let's define a function that performs the HTTP call to OpenGraph.io
.
description(url: string) =
let
encoded = Http.UrlEncode(url),
key = "####",
opengraphReq = Http.Get(
"https://opengraph.io/api/1.1/site/" + encoded,
args = [{"app_id", key}]
),
metadata = Json.Read(
opengraphReq,
type record(
hybridGraph: record(title: string, description: string)
)
)
in
metadata.hybridGraph.description
Here's what is obtained with the article used as an example:
{
"title": "Suspect in Dallas Zoo animal thefts allegedly admitted to the crime and says he would do it again, affidavits claim | CNN",
"description": "The man who faces charges stemming from a string of suspicious activities at the Dallas Zoo allegedly admitted to stealing two tamarin monkeys and trying to steal the clouded snow leopard last month, according to arrest warrant affidavits.",
"type": "article",
"image": {
"url": "https://media.cnn.com/api/v1/images/stellar/prod/230207231552-01-dallas-zoo-020323-file.jpg?c=16x9&q=w_800,c_fill"
},
"url": "https://www.cnn.com/2023/02/08/us/dallas-zoo-suspect-arrest-affidavits/index.html",
"site_name": "CNN",
"articlePublishedTime": "2023-02-08T07:33:17Z",
"articleModifiedTime": "2023-02-08T08:14:39Z"
}
Step 3: Perform the textual analysis
A second function called analyze
is defined (code isn't shown here) that
sends the content of the description
field Google's Natural Language API,
using HTTP too. The function returns the set of entities identified by the
service. Here's the entity matching Joe Biden.
{
"name": "Joe Biden",
"type": "PERSON",
"metadata": {
"mid": "/m/012gx2",
"wikipedia_url": "https://en.wikipedia.org/wiki/Joe_Biden"
},
"salience": 0.2149425,
"mentions": [
{ "text": { "content": "Biden", "beginOffset": 54 }, "type": "PROPER" },
{ "text": { "content": "Joe Biden", "beginOffset": 190 }, "type": "PROPER" },
{ "text": { "content": "President", "beginOffset": 180 }, "type": "COMMON" }
]
}
Step 4: Our data product
Both functions are cascaded in order to augment the RSS initial data with textual analysis:
let
feed = Xml.InferAndRead("http://rss.cnn.com/rss/edition_us.rss"),
items = feed.channel.item,
withMetadata = Collection.Transform(
items,
(i) ->
{title: i.title, link: i.link, description: description(i.link)}
),
withAnalysis = Collection.Transform(
withMetadata,
(r) -> Record.AddField(r, analysis = analyze(r.description))
),
....
Here is an example of a row that has been augmented with both the description and its entities:
{
"title": "Labor Secretary Marty Walsh expected to leave Biden administration | CNN Politics",
"link": "https://www.cnn.com/2023/02/07/politics/marty-walsh-leaving/index.html",
"description": "Labor Secretary Marty Walsh is expected to depart the Biden administration soon, according to two people familiar with the matter, marking the first Cabinet secretary departure of President Joe Biden's presidency.",
"analysis": {
"entities": [
{
"name": "Marty Walsh",
"type": "PERSON",
"metadata": {
"wikipedia_url": "https://en.wikipedia.org/wiki/Marty_Walsh",
"mid": "/m/0swn343"
},
"salience": 0.50773776,
"mentions": [
{ "text": { "content": "Marty Walsh", "beginOffset": 16 }, "type": "PROPER" },
{ "text": { "content": "Labor Secretary", "beginOffset": 0 }, "type": "COMMON" }
]
},
{
"name": "Joe Biden",
"type": "PERSON",
"metadata": {
"mid": "/m/012gx2",
"wikipedia_url": "https://en.wikipedia.org/wiki/Joe_Biden"
},
"salience": 0.2149425,
"mentions": [
{ "text": { "content": "Biden", "beginOffset": 54 }, "type": "PROPER" },
{ "text": { "content": "Joe Biden", "beginOffset": 190 }, "type": "PROPER" },
{ "text": { "content": "President", "beginOffset": 180 }, "type": "COMMON" }
]
},
...
{
"name": "secretary departure",
"type": "EVENT",
"metadata": {},
"salience": 0.045937307,
"mentions": [
{ "text": { "content": "secretary departure", "beginOffset": 157 }, "type": "COMMON" }
]
},
...
],
"language": "en"
}
}
Present aggregated results
Results are now the output from two external APIs, added to our input RSS feed items.
Depending on what question we are asking, the final query could return different structures.
We show here a query that returns aggregated Entity and Type information across all the pages in the RSS feed, in descending order of "hits", to see what’s "most reported".
let //
// ...
//
explodeEntities = Collection.Explode(
withAnalysis,
(row) -> row.analysis.entities
),
interestingEntities = Collection.Filter(
explodeEntities,
(row) ->
List.Contains(
[
"PERSON",
"LOCATION",
"ORGANIZATION",
"EVENT",
"WORK_OF_ART",
"CONSUMER_GOOD"
],
row.`type`
)
),
grouped = Collection.GroupBy(
interestingEntities,
(row) ->
{name: row.name, `type`: row.`type`, metadata: row.metadata}
),
report = Collection.Transform(grouped,
g -> {
g.key,
total_salience: Collection.Sum(g.group.salience),
story_count: Collection.Count(g.group),
stories: Collection.Distinct(g.group.link),
mention_count: Collection.Count(Collection.Explode(g.group, g -> g.mentions))
})
in
Collection.OrderBy(report, row -> row.story_count, "DESC")
The results are:
[
{
"key": {
"name": "police",
"type": "PERSON",
"metadata": {
"value": null,
"wikipedia_url": null,
"mid": null,
"currency": null,
"year": null
}
},
"total_salience": 0.56725444,
"story_count": 3,
"stories": [
"https://abc7ny.com/police-involved-shooting-grand-concourse-section-suspect-shot-in-head-and-leg-bronx/12524318",
"https://www.atlantanewsfirst.com/2022/12/04/police-2-ford-mustangs-totaling-nearly-200k-stolen-upson-county-dealership/",
"https://www.cbs58.com/news/horizon-west-condo-owners-in-waukesha-remember-building-fire-one-year-later"
],
"mention_count": 3
},
{
"key": {
"name": "students",
"type": "PERSON",
"metadata": {
"value": null,
"wikipedia_url": null,
"mid": null,
"currency": null,
"year": null
}
},
"total_salience": 0.42099381599999997,
"story_count": 2,
"stories": [
"https://www.cnn.com/2023/02/06/us/aramark-black-history-month-menu-school-reaj/index.html",
"https://www.wptv.com/news/education/200-000-worth-of-supplies-distributed-for-palm-beach-county-schools-during-giveaway-event"
],
"mention_count": 2
},
{
"key": {
"name": "Amazon",
"type": "ORGANIZATION",
"metadata": {
"value": null,
"wikipedia_url": "https://en.wikipedia.org/wiki/Amazon_(company)",
"mid": "/m/0mgkg",
"currency": null,
"year": null
}
},
"total_salience": 0.006539275,
"story_count": 1,
"stories": [
"https://www.tmj4.com/news/local-news/10-year-old-upset-over-vr-headset-fatally-shoots-mother-charged-as-an-adult"
],
"mention_count": 1
},
...
...
...
description(url: string) =
let
encoded = Http.UrlEncode(url),
key = Environment.Secret("opengraphKey"),
opengraphReq = Http.Get(
"https://opengraph.io/api/1.1/site/" + encoded,
args = [{_1: "app_id", _2: key}]
),
metadata = Json.Read(
opengraphReq,
type record(
hybridGraph: record(title: string, description: string)
)
)
in
metadata.hybridGraph.description
analyze(text: string) =
let
outputType = type record(
entities: collection(
record(
name: string,
`type`: string,
metadata: record(),
salience: int,
mentions: collection(
record(
text: record(content: string, beginOffset: int),
`type`: string
)
)
)
),
language: string
),
query = Json.Print(
{
document: {`type`: "PLAIN_TEXT", content: text},
encodingType: "UTF8"
}
),
key = Environment.Secret("language@google"),
httpPost = Http.Post(
"https://language.googleapis.com/v1/documents:analyzeEntities",
args = [{_1: "key", _2: key}],
headers = [
{_1: "x-raw-output-format", _2: "json"},
{_1: "Content-Type", _2: "application/json; charset=utf-8"}
],
bodyString = query
),
r = Json.Read(httpPost, outputType)
in
r
let
feed = Xml.InferAndRead("http://rss.cnn.com/rss/edition_us.rss"),
items = Collection.Take(feed.channel.item, 10),
withMetadata = Collection.Transform(
items,
(i) ->
{title: i.title, link: i.link, description: description(i.link)}
),
withAnalysis = Collection.Transform(
withMetadata,
(r) -> Record.AddField(r, analysis = analyze(r.description))
),
explodeEntities = Collection.Explode(
withAnalysis,
(row) -> row.analysis.entities
),
interestingEntities = Collection.Filter(
explodeEntities,
(row) ->
List.Contains(
[
"PERSON",
"LOCATION",
"ORGANIZATION",
"EVENT",
"WORK_OF_ART",
"CONSUMER_GOOD"
],
row.`type`
)
),
grouped = Collection.GroupBy(
interestingEntities,
(row) ->
{name: row.name, `type`: row.`type`, metadata: row.metadata}
),
report = Collection.Transform(grouped,
g -> {
g.key,
total_salience: Collection.Sum(g.group.salience),
story_count: Collection.Count(g.group),
stories: Collection.Distinct(g.group.link),
mention_count: Collection.Count(Collection.Explode(g.group, g -> g.mentions))
})
in
Collection.OrderBy(report, row -> row.story_count, "DESC")
Where is the ISS?
- Overview
- Code
This example shows how to create a REST API that combines data from multiple web services.
Where is the ISS?
The goal is to deploy an API showing the nearest location on Earth to the International Space Station. For this, we combine two existing APIs:
- https://api.open-notify.org/, which gives us the ISS coordinates in real-time;
- https://opencagedata.com/, which implements a reverse geocoding service to obtain the city nearest to a given set of coordinates.
origin/main
The resulting REST API returns a list of the couple of places the ISS is flying over.
For instance, at the time of development,
/issLocation
returned:
[
"Adrar, Mauritania"
]
Getting the ISS position
Here's the API call to get the ISS location: http://api.open-notify.org/iss-now.json
.
Here's an example response this service returns:
{
"message": "success",
"timestamp": 1675083740,
"iss_position": {
"longitude": "106.5087",
"latitude": "38.2938"
}
}
The message only includes GPS coordinates and does not explicitly states where the ISS is in the world. To find a named location, we need reverse geocoding of the ISS latitude and longitude.
Reverse Geocoding
Many geocoding APIs are available. We use the OpenCage Geocoding API. This service requires an API key that is generated when creating an account. Both the API key and the coordinates have to be provided as query parameters, e.g.:
https://api.opencagedata.com/geocode/v1/json?key=<API_KEY>&q=<LATITUDE>,<LONGITUDE>
Latitude and longitude of the ISS are extracted from the former JSON record after it is downloaded and decoded with Json.InferAndRead
.
let iss = Json.InferAndRead("http://api.open-notify.org/iss-now.json"),
latitude = iss.iss_position.latitude,
longitude = iss.iss_position.longitude,
Here's how to prepare the HTTP query to OpenCage and retrieve the result.
let //
// ...
//
geoQuery = latitude + "+" + longitude, // a URL parameter required by the API
key = Environment.Secret("opencagedata"), // the API key is stored as a secret
httpQuery = Http.Get(
"https://api.opencagedata.com/geocode/v1/json",
args = [
{"q", geoQuery},
{"key", key},
{"no_annotations", "1"} // no_annotations returns simpler data
]
),
// etc.
issGeoLoc = Json.InferAndRead(httpQuery, preferNulls = true)
The call returns a JSON record with lots of information. We need the location's name. It is found under field name formatted
:
{
"documentation": "https://opencagedata.com/api",
...
"results": [
{
...
"formatted": "Adrar, Mauritania"
...
}
],
"status": {
"code": 200,
"message": "OK"
},
...
}
Once issGeoLoc
is populated with the geolocation data, the city name is easily retrieved from issGeoLoc.results.formatted
.
Here's the full code of our service.
let
iss = Json.InferAndRead("http://api.open-notify.org/iss-now.json"),
latitude = iss.iss_position.latitude,
longitude = iss.iss_position.longitude,
geoQuery = latitude + "+" + longitude,
key = Environment.Secret("opencagedata"),
httpQuery = Http.Get(
"https://api.opencagedata.com/geocode/v1/json",
args = [
{"q", geoQuery},
{"key", key},
{"no_annotations", "1"}
]
),
issGeoLoc = Json.InferAndRead(httpQuery, preferNulls = true)
in
issGeoLoc.results.formatted
let
iss = Json.InferAndRead("http://api.open-notify.org/iss-now.json"),
latitude = iss.iss_position.latitude,
longitude = iss.iss_position.longitude,
geoQuery = latitude + "+" + longitude,
key = Environment.Secret("opencagedata"),
httpQuery = Http.Get(
"https://api.opencagedata.com/geocode/v1/json",
args = [
{"q", geoQuery},
{"key", key},
{"no_annotations", "1"}
]
),
issGeoLoc = Json.InferAndRead(httpQuery, preferNulls = true)
in
issGeoLoc.results.formatted
Data virtualization involves creating a virtual layer that sits on top of existing data sources and presents them as a single, unified view to users. This virtual layer is created using metadata and a set of rules that specify how the underlying data sources are to be integrated. Data virtualization enables users to access and query data from multiple sources without the need for physically moving or consolidating the data.
This applies to semi-structured or unstructured data, stored in-house or available externally, since all of these can be transformed into well-structured data and served as an API in RAW.
To build a data virtualization layer in RAW, you create APIs that:
- Access data at source from databases, files, data lakes or even services;
- Combine data together;
- Transform data;
- Deliver data.
Let's look at each of these phases in turn.
Step 1: Accessing Data at source
RAW can read data directly from the source system, whether it is a file, database, data lake or a web service. No ingestion necessary.
Here are examples for accessing data from files:
Build an API over a CSV file
- Overview
- Code
Sample usage:
/airports-csv[?country=<string>&city=<string>&iata=<string>]
For instance, to ask for airports in Portugal and in the city of Lisbon, use:
/airports-csv?country=Portugal&city=Lisbon
main(country: string = null, city: string = null, iata: string = null) =
let
airports = Csv.InferAndRead(
"https://raw-tutorial.s3.eu-west-1.amazonaws.com/airports.csv")
in
Collection.Filter(
airports,
a -> (Nullable.IsNull(country) or a.Country == country)
and (Nullable.IsNull(city) or a.City == city)
and (Nullable.IsNull(iata) or a.IATA_FAA == iata))
// The following test will run if you press the [Run Code] button directly.
main(country = "Portugal", city = "Lisbon")
Build an API over a JSON file
- Overview
- Code
Sample usage:
/patients-json[?country=<string>&minYear=<int>&maxYear=<int>&code=<string>]
The following URL returns the patients born between 1990 and 1995 that were diagnosed with L53.3.
/patients-json?minYear=1990&maxYear=1995&code=L53.3
main(country: string = null, minYear: int = null, maxYear: int = null, code: string = null) =
let
patients = Json.InferAndRead(
"https://raw-tutorial.s3.eu-west-1.amazonaws.com/patients.json")
in
Collection.Filter(
patients,
p -> (Nullable.IsNull(country) or p.country == country)
and (Nullable.IsNull(minYear) or p.year_of_birth >= minYear)
and (Nullable.IsNull(maxYear) or p.year_of_birth <= maxYear)
and (Nullable.IsNull(code) or
Collection.Count(Collection.Filter(p.diagnosis, (d) -> d.code == code)) > 0))
// The following test will run if you press the [Run Code] button directly.
main(minYear = 1990, maxYear = 1995, code = "L53.3")
Build an API over an XML file
- Overview
- Code
Sample usage:
/tutorial/people-xml[?name=<person_name>]
For instance, to get the information about bob use:
/tutorial/people-xml?name=bob
main(name: string = null) =
let people = Xml.InferAndRead("https://raw-tutorial.s3.amazonaws.com/inference/people.xml")
in Collection.Filter(people.person, x -> Nullable.IsNull(name) or x.name == name)
// The following test will run if you press the [Run Code] button directly.
main("bob")
Here are examples that access data from a database:
Build an API over a MySQL database
- Overview
- Code
Sample usage:
/airports-mysql[?code=<iata_code>]
For instance, to get La Guardia's (LGA) information use:
/airports-mysql?code=LGA
main(code: string = null) =
let airports = MySQL.InferAndRead(
"raw", "airports",
host = "example-mysql.raw-labs.com",
username = "mysql_guest",
password = "BTSWkufumcv5oSq1vcbVF9f0"
)
in Collection.Filter(airports, x -> Nullable.IsNull(code) or x.iata_faa == code)
// The following test will run if you press the [Run Code] button directly.
main("LIS")
Build an API over a PostgreSQL database
- Overview
- Code
Sample usage:
/airports-postgresql[?code=<iata_code>]
For instance, to get La Guardia's (LGA) information use:
/airports-postgresql?code=LGA
main(code: string = null) =
let airports = PostgreSQL.InferAndRead(
"raw", "example", "airports",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "BTSWkufumcv5oSq1vcbVF9f0"
)
in Collection.Filter(airports, x -> Nullable.IsNull(code) or x.iata_faa == code)
// The following test will run if you press the [Run Code] button directly.
main("LIS")
Build an API over a Oracle database
- Overview
- Code
Sample usage:
/airports-oracle[?code=<iata_code>]
For instance, to get La Guardia's (LGA) information use:
/airports-oracle?code=LGA
main(code: string = null) =
let airports = Oracle.InferAndRead(
"orcl", "example", "airports",
host = "example-oracle.raw-labs.com",
username = "oracle_guest",
password = "BTSWkufumcv5oSq1vcbVF9f0"
)
in Collection.Filter(airports, x -> Nullable.IsNull(code) or x.IATA_FAA == code)
// The following test will run if you press the [Run Code] button directly.
main("LIS")
Build an API over a Microsoft SQL Server database
- Overview
- Code
Sample usage:
/airports-sqlserver[?code=<iata_code>]
For instance, to get La Guardia's (LGA) information use:
/airports-sqlserver?code=LGA
main(code: string = null) =
let airports = SQLServer.InferAndRead(
"raw", "example", "airports",
host = "example-mssql.raw-labs.com",
username = "mssql_guest",
password = "BTSWkufumcv5oSq1vcbVF9f0"
)
in Collection.Filter(airports, x -> Nullable.IsNull(code) or x.iata_faa == code)
// The following test will run if you press the [Run Code] button directly.
main("LIS")
Here are examples that access data from a web service:
Querying web services
- Overview
- Code
This example shows how to make HTTP requests:
- How to use different HTTP methods like GET, PUT, POST, DELETE, etc;
- How to pass headers;
- How to pass query paramenters.
Using HTTP URLs directly
Functions that read data from external locations can receive a URL. For instance, this example does an HTTP GET request on the source data.
Json.InferAndRead("http://test-data.raw-labs.com/public/authors.json")
It is equivalent to the following example:
Json.InferAndRead(Http.Get("http://test-data.raw-labs.com/public/authors.json"))
That is because HTTP URLs are converted directly to "Http.Get" requests.
However, sometimes you need to configure the HTTP request: use POST instead of GET, or pass headers or parameters. The next section describes how.
Making HTTP requests
To change HTTP method, specify headers, or pass query parameters along with other options, use the functions in the Http
library.
The following example performs a POST request - using the Http.Post
function - and passes a string as the HTTP POST body - using the optional argument bodyString
:
String.Read(
Http.Post(
"http://somewhere/api/clients"
bodyString = """{"name": "john", "query": "account"}"""
)
)
There are functions in the Http
library for each HTTP method: e.g. Http.Get
for GET requests, Http.Post
for POST requests, Http.Put
for PUT requests, etc.
Please refer to the documentation for additional details.
These functions accept the same arguments to pass headers, query parameters and other properties of the HTTP request.
Sample usage
The following example performs an HTTP request to wikidata with a SPARQL query listing cat entries.
It passes query parameters (args
) containing the query and uses HTTP headers (headers
) to set the output format as CSV:
main() =
let query = "SELECT ?item ?birthdate ?itemLabel \n" +
"WHERE {\n" +
" ?item wdt:P31 wd:Q146. # Must be of a cat \n" +
" ?item wdt:P569 ?birthdate . # P569 : birthdate\n" +
" SERVICE wikibase:label { bd:serviceParam wikibase:language \"[AUTO_LANGUAGE],en\". } \n" +
"}",
data = Csv.Read(
Http.Get(
"https://query.wikidata.org/bigdata/namespace/wdq/sparql",
args = [{"query", query}],
headers = [{"Accept", "text/csv"}]
),
type collection(record(item: string, birthdate: string, itemLabel: string)),
skip = 1
)
in data
main()
main() =
let query = "SELECT ?item ?birthdate ?itemLabel \n" +
"WHERE {\n" +
" ?item wdt:P31 wd:Q146. # Must be of a cat \n" +
" ?item wdt:P569 ?birthdate . # P569 : birthdate\n" +
" SERVICE wikibase:label { bd:serviceParam wikibase:language \"[AUTO_LANGUAGE],en\". } \n" +
"}",
data = Csv.Read(
Http.Get(
"https://query.wikidata.org/bigdata/namespace/wdq/sparql",
args = [{"query", query}],
headers = [{"Accept", "text/csv"}]
),
type collection(record(item: string, birthdate: string, itemLabel: string)),
skip = 1
)
in data
// The following test will run if you press the [Run Code] button directly.
main()
Querying a JIRA server
- Overview
- Code
A JIRA server exposes multiple REST APIs that offer programmatic access to its
database. Responses are in JSON. We use Json.InferAndRead
.
The query below fetches data about entry JRASERVER-9 and extracts a couple of fields.
let issue = Json.InferAndRead(Http.Get("https://jira.atlassian.com/rest/api/latest/issue/JRASERVER-9"))
in { issue.key, issue.fields.summary, status: issue.fields.status.name }
This evaluates to:
{
"key": "JRASERVER-9",
"summary": "User Preference: User Time Zones",
"status": "Closed"
}
Pagination
A call to JIRA's search
REST API returns the set of issues matching a given
search criteria. Its results are paginated.
The whole set of paginated results can be retrieved by looping and calling
search
with an increasing startAt
argument, until the issues
array is
empty.
Here is an implementation internally using a recursive function.
jql(projectURL: string, query: string) =
// recursive function (annotated with rec)
let rec issues(startAt: int = 0): collection(issueType) =
let reports = Json.Read(Http.Get(projectURL + "/rest/api/latest/search", args=[{"jql", query}, {"startAt", String.From(startAt)}]), jqlType)
in
if Collection.Count(reports.issues) == 0
then reports.issues
else Collection.Union(reports.issues, issues(startAt + 50)) // recursive call
in issues(0)
If called with the fixVersion=9.0.0
query, jql
returns all JIRA issues fixed in version 9.0.0:
[
{
"key": "JRASERVER-73294",
"summary": "Update product documentation for Zero Downtime Upgrade (ZDU) related steps",
"status": "Closed",
"resolutiondate": "2022-11-22T14:25:58.000+0000"
},
{
"key": "JRASERVER-74200",
"summary": "Improve the Archiving a project and Archiving an issue documentation to account for the need of a re-index to assertively decrease Index size (and disk space)",
"status": "Closed",
"resolutiondate": "2022-11-22T14:18:20.000+0000"
},
{
"key": "JRASERVER-74506",
"summary": "Product document Running Jira applications over SSL or HTTPS has incorrect step for command line installation",
"status": "Closed",
"resolutiondate": "2022-11-21T10:05:10.000+0000"
},
...
...
...
{
"key": "JRASERVER-72995",
"summary": "Jira webhooks stop working after \"I/O reactor terminated abnormally\" error",
"status": "Closed",
"resolutiondate": "2022-03-31T10:35:40.000+0000"
},
{
"key": "JRASERVER-73252",
"summary": "Restarting the database causes cache replication issues",
"status": "Closed",
"resolutiondate": "2022-03-31T10:32:55.000+0000"
}
]
jql(projectURL: string, query: string) =
// Type of the JSON returned by JIRA's search API to describe an issue.
let issueType = type record(
expand: string,
id: string,
self: string,
key: string,
fields: record(
statuscategorychangedate: string,
issuetype: record(
self: string,
id: string,
description: string,
iconUrl: string,
name: string,
subtask: bool,
avatarId: int,
hierarchyLevel: int),
timespent: undefined,
project: record(
self: string,
id: string,
key: string,
name: string,
projectTypeKey: string,
simplified: bool,
avatarUrls: record(
`48x48`: string,
`24x24`: string,
`16x16`: string,
`32x32`: string)),
fixVersions: collection(undefined),
aggregatetimespent: undefined,
resolution: undefined,
customfield_10630: undefined,
customfield_10631: undefined,
customfield_10621: undefined,
customfield_10500: undefined,
resolutiondate: undefined,
customfield_10627: undefined,
customfield_10628: undefined,
customfield_10629: undefined,
workratio: int,
watches: record(
self: string,
watchCount: int,
isWatching: bool),
lastViewed: undefined,
created: string,
priority: record(
self: string,
iconUrl: string,
name: string,
id: string),
customfield_10100: undefined,
labels: collection(string),
customfield_10620: undefined,
customfield_10610: undefined,
customfield_10611: undefined,
customfield_10612: undefined,
customfield_10613: undefined,
timeestimate: undefined,
customfield_10614: undefined,
aggregatetimeoriginalestimate: undefined,
customfield_10615: collection(undefined),
versions: collection(
record(
self: string,
id: string,
name: string,
archived: bool,
released: bool,
description: string)),
customfield_10616: undefined,
customfield_10617: undefined,
customfield_10618: undefined,
customfield_10619: undefined,
issuelinks: collection(undefined),
assignee: undefined,
updated: string,
status: record(
self: string,
description: string,
iconUrl: string,
name: string,
id: string,
statusCategory: record(
self: string,
id: int,
key: string,
colorName: string,
name: string)),
components: collection(
record(
self: string,
id: string,
name: string,
description: string)),
timeoriginalestimate: undefined,
description: string,
customfield_10600: undefined,
security: undefined,
customfield_10601: undefined,
customfield_10602: undefined,
aggregatetimeestimate: undefined,
customfield_10603: collection(undefined),
customfield_10604: undefined,
customfield_10648: undefined,
customfield_10605: undefined,
customfield_10606: undefined,
customfield_10607: undefined,
customfield_10608: undefined,
customfield_10609: undefined,
summary: string,
creator: record(
self: string,
accountId: string,
avatarUrls: record(
`48x48`: string,
`24x24`: string,
`16x16`: string,
`32x32`: string),
displayName: string,
active: bool,
timeZone: string,
accountType: string),
subtasks: collection(undefined),
reporter: record(
self: string,
accountId: string,
avatarUrls: record(
`48x48`: string,
`24x24`: string,
`16x16`: string,
`32x32`: string),
displayName: string,
active: bool,
timeZone: string,
accountType: string),
aggregateprogress: record(progress: int, total: int),
customfield_10000: string,
customfield_10001: undefined,
customfield_10002: string,
customfield_10200: record(
hasEpicLinkFieldDependency: bool,
showField: bool,
nonEditableReason: record(
reason: string,
message: string)),
customfield_10003: undefined,
customfield_10400: undefined,
customfield_10004: undefined,
environment: undefined,
duedate: undefined,
progress: record(progress: int, total: int),
votes: record(self: string, votes: int, hasVoted: bool))),
// Type of the JSON returned by JIRA's search API (a page of results).
jqlType = type record(
expand: string,
startAt: int,
maxResults: int,
total: int,
issues: collection(issueType)
),
rec issues(startAt: int = 0): collection(issueType) =
let reports = Json.Read(Http.Get(projectURL + "/rest/api/latest/search", args=[{"jql", query}, {"startAt", String.From(startAt)}]), jqlType)
in if Collection.Count(reports.issues) == 0
then reports.issues
else Collection.Union(reports.issues, issues(startAt + 50))
in issues(0)
main() =
let v900issues = jql("https://jira.atlassian.com", "fixVersion=9.0.0"),
simplified = Collection.Transform(v900issues, i -> { i.key, i.fields.summary, status: i.fields.status.name, i.fields.resolutiondate })
in Collection.OrderBy(simplified, i -> i.resolutiondate, "DESC")
// The following test will run if you press the [Run Code] button directly.
main()
Next step is to combine data.
Step 2: Combining data
Combining data is often necessary when parts of the dataset are split across multiple systems.
For instance, you may have data stored in files across multiple S3 buckets. Or you may have data stored across two database instances.
Here are examples that combine data from different systems:
Serve data from S3 buckets across separate AWS accounts
- Overview
- Code
This example illustrates how to create a REST API that queries data live from two separate S3 buckets, each under separate AWS accounts.
We have been given two sets of credentials for two different S3 buckets, each of which contains JSON files of a specific format.
Here's for example a file found in bucket s3://log-server-a
.
{
"creation_date": "2022-04-01",
"entries": [{"hostname": "host01"}, {"hostname": "host02"}]
}
Here's a file found in the second bucket s3://log-server-b
.
{
"creation_date": "2022-04-03",
"entries": [{"hostname": "host95"}, {"hostname": "host96"}, {"hostname": "host97"}]
}
We're interested in the content of the entries
field of these JSON files. Our
goal is to read every JSON file across both buckets and merge their entries
lists into a single one.
["host01","host02", ...., "host95","host96","host97"]
The code executed by the REST API works as follows.
The read_logs
function computes the list of all hostnames found in a given bucket.
read_logs(path: string, aws_config: record(region: string, accessKey: string, secret: string)) =
let
// list all files of the bucket path
bucket = S3.Build(path, region=aws_config.region, accessKey=aws_config.accessKey, secretKey=aws_config.secret),
files = Location.Ls(bucket),
// open each file as JSON
contents = List.Transform(files, f -> Json.Read(f, json_type)),
// `Explode` the entries field
entries = List.Explode(contents, c -> c.entries)
in
// project only the 'hostname' column to obtain the expected list of strings
entries.hostname
read_logs
is called on both s3://log-server-a
and s3://log-server-b
with the corresponding set of credentials.
let
awsAccountA = {region: "eu-west-1", accessKey: "<access-key-for-a>", secret: "<secret-for-a>"},
awsAccountB = {region: "eu-west-1", accessKey: "<access-key-for-b>", secret: "<secret-for-b>"}
// Union the lists returned by `read_logs` for both buckets/accounts.
in List.Union(
read_logs("s3://log-server-a/*.json", awsAccountA),
read_logs("s3://log-server-b/*.json", awsAccountB)
)
Sample usage
The service is triggered by accessing the URL:
/hostnames-s3-multiple-buckets
The expected list is returned:
[
"host01",
"host02",
"host03",
"host04",
"host05",
"host06",
"host07",
"host91",
"host92",
"host93",
"host94",
"host95",
"host96",
"host97"
]
⚠️ Never store sensitive information as clear text in the code. Instead use secrets, which are key/value pairs that are stored securely outside of the source code. Secrets can be accessed using the built-in function
Environment.Secret
.
main() =
let
// The type of each JSON file
json_type = type record(creation_date: string, entries: list(record(hostname: string))),
// Function returning the concatenated list of hostnames for a specific path & aws_config
read_logs(path: string, aws_config: record(region: string, accessKey: string, secret: string)) =
let
// list all files of the bucket path
bucket = S3.Build(path, region=aws_config.region, accessKey=aws_config.accessKey, secretKey=aws_config.secret),
files = Location.Ls(bucket),
// open each file as JSON
contents = List.Transform(files, f -> Json.Read(f, json_type)),
// `Explode` the entries field
entries = List.Explode(contents, c -> c.entries)
in
// project only the 'hostname' column to obtain the expected list of strings
entries.hostname
in let
awsAccountA = {region: "eu-west-1", accessKey: "AKIAZ6SK5NCTDAAESLXU", secret: "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"},
awsAccountB = {region: "eu-west-1", accessKey: "AKIAZ6SK5NCTLPK7QE4N", secret: "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"}
// Union the lists returned by `read_logs` for both buckets/accounts.
in List.Union(
read_logs("s3://log-server-a/*.json", awsAccountA),
read_logs("s3://log-server-b/*.json", awsAccountB)
)
// The following test will run if you press the [Run Code] button directly.
main()
Merging rows of a collection
- Overview
- Code
Collections can be merged together into one using Collection.Union
.
Collection.Union
takes two or more collections as parameters and concatenates them into one.
In the example, two datasets - one with summer olympic games data and another with winter olympic games data - are merged and then queried as if they were a single one.
let summer = Csv.InferAndRead("https://raw-tutorial.s3.eu-west-1.amazonaws.com/summer_olympics.csv"),
winter = Csv.InferAndRead("https://raw-tutorial.s3.eu-west-1.amazonaws.com/winter_olympics.csv"),
olympics = Collection.Union(summer, winter)
in ...
Sample usage
This example deploys a service which merges data on olympic and summer games and allows queries on medals obtained by country and year. For instance:
/tutorial/olympic-medals?country=FRA&year=1992
Both datasets are searched for matching entries and the matching recoreds are returned for both summer and winter olympics.
main(country: string, year: int) =
let summer = Csv.InferAndRead("https://raw-tutorial.s3.eu-west-1.amazonaws.com/summer_olympics.csv"),
winter = Csv.InferAndRead("https://raw-tutorial.s3.eu-west-1.amazonaws.com/winter_olympics.csv"),
olympics = Collection.Union(summer, winter)
in Collection.Filter(olympics, entry -> entry.Country == country and entry.Year == year)
// The following test will run if you press the [Run Code] button directly.
main("FRA", 1992)
RAW includes powerful support for processing complex data types, such as hierarchical data. Refer to this guide for examples.
Step 3: Transforming data
Once the dataset is accessible, the data must be combined with other data as to produce the desired result.
For instance, you may want to join multiple tables that exist logically in different systems. Or augmenting, cleaning or otherwise improving a dataset by combining fields from multiple sources of information into one.
Here are examples that apply data transformations:
Machine data processing
- Overview
- Code
This example shows how to integrate IoT and log data from multiple data sources.
The scenario
Suppose we are a site reliability engineer managing control software for industrial machines.
Recently we are having suspicious software crashes, and we want to create a data service to collect diagnostics/data to help us identify the cause(s).
The datasets
We have the following data sources:
- Machine status information logged periodically and stored in a PostgreSQL database.
- Software crashes from Docker, these come in a JSON format and output by familiar docker inspect commands.
- Sensor data from the machines, exported in CSV.
- Error Logs, stored in an S3 bucket, in familiar log file formats that require parsing.
Therefore, we need to combine data stored:
- in a database;
- and in a S3 bucket with several formats:
- log files
- CSV files
- JSON files.
Machine Status
Machines are being monitored and their status and location are being stored in a PostgreSQL database. This table looks something like this:
id | model | age | status | latitude | longitude |
---|---|---|---|---|---|
1 | model3 | 18 | OK | 46.515471 | 6.644706 |
2 | model4 | 7 | OK | 46.564782 | 6.551355 |
3 | model3 | 8 | OK | 46.537984 | 6.629472 |
4 | model3 | 7 | OK | 46.570500 | 6.591574 |
To read it we can use PostgreSQL.InferAndRead
. Here we created a function where you pass a machine ID
and returns the corresponding record from the table.
machine(id: int) =
let
data = PostgreSQL.InferAndRead(
"raw",
"example",
"machines",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "..."
)
in
Collection.First(Collection.Filter(data, (x) -> x.id == id))
We are reading data from the database "raw", schema "example" and table "machines". The output for id=1 is:
{
"id": 1,
"model": "model3",
"age": 18,
"status": "OK",
"latitude": 46.515471,
"longitude": 6.644706
}
Software crashes from Docker
Each machine has a specific service controlling it.
These services are deployed using docker. The status of this software can be extracted from the output of the docker-inspect command.
The output of docker-inspect is a (long) JSON document, e.g.:
{
"Id": "806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3",
"Name": "machine 98",
"Created": "2015-11-26T06:00:00.000",
"Path": "bash",
"Image": "sha256:9873176a8ff5ac192ce4d7df8a403787558b9f3981a4c4d74afb3edceeda451c",
"Driver": "overlay2",
"Platform": "linux",
"Args": [
"arg1"
],
"State": {
"Status": "running",
"Running": false,
"Paused": false,
"Restarting": false,
"OOMKilled": false,
"Dead": true,
"Pid": 86629,
"ExitCode": 3,
"Error": "comp3",
"StartedAt": "2015-11-26T06:00:00.000",
"FinishedAt": "2015-11-26T06:00:00.000"
},
"ResolvConfPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/resolv.conf",
"HostnamePath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hostname",
"HostsPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/hosts",
"LogPath": "/var/lib/docker/containers/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3/806f4355da135542086c3fdb1365ed7ec4df45223c056d77635b33befee296c3-json.log",
"RestartCount": 0,
We can get the corresponding machine from the Name
field. The field State
has an exit code,
which tells us if the software finished successfully or not.
The following function extracts the relevant information in an easier-to-consume tabular form.
failures(id: int) =
let
dockerInspect = Json.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/docker-inspect-output.json"
),
summary = Collection.Transform(
dockerInspect,
(x) ->
{
MachineId: Int.From(String.SubString(x.Name, 9, 1)),
ExitCode: x.State.ExitCode,
Error: x.State.Error,
FinishedAt: x.State.FinishedAt
}
)
in
Collection.Filter(
summary,
(x) -> x.ExitCode > 0 and x.MachineId == id
)
[
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-01-05T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 1,
"Error": "comp1",
"FinishedAt": "2015-03-06T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 2,
"Error": "comp2",
"FinishedAt": "2015-04-20T06:00:00.000"
},
{
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-06-19T06:00:00.000"
},
Error logs
Errors are collected from logs. These logs are uploaded to a logging service, which in turn collects and saves all records into an S3 bucket.
2015-01-01T05:54:15 WARN vibration close to treshold, check instrumentation panel ASAP.
2015-01-01T05:54:58 INFO calibration at 100%, checking inner sub-systems.
2015-01-01T05:55:41 ERROR voltage not measured for more than 25 seconds, reboot machine.
2015-01-01T05:56:24 INFO cleaning procedure schedulled soon, performing sub task 111.
2015-01-01T05:57:07 INFO task 155 schedulled soon, preparing next task.
2015-01-01T05:57:50 WARN inner temp increasing rapidly, please check internet connection.
2015-01-01T05:58:33 INFO cleaning procedure starting, calibrating.
2015-01-01T06:00:00 WARN machine 24 with error=error1
2015-01-01T05:54:15 ERROR inner temp not measured for more than 16 seconds, please call 041 123 456 789.
This file has a lot of data, but right now, we are only interested in lines that report machine errors.
We can use Collection.Filter
and a regex to remove all unwanted lines, like this:
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches( x, "(.*) WARN machine (\\d+) with error=(\\w+).*")
)
in
filtered
Output:
[
"2015-01-01T06:00:00 WARN machine 24 with error=error1",
"2015-01-01T06:00:00 WARN machine 73 with error=error4",
"2015-01-01T06:00:00 WARN machine 81 with error=error1",
"2015-01-01T07:00:00 WARN machine 43 with error=error3",
"2015-01-01T08:00:00 WARN machine 14 with error=error4",
"2015-01-01T08:00:00 WARN machine 76 with error=error5"
Now we can use Regex.Groups
to extract all the relevant fields. This is how the final function looks like:
errors(id: int) =
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
),
parsed = Collection.Transform(
filtered,
(x) ->
let
groups = Regex.Groups(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
in
{
machineId: Int.From(List.Get(groups, 1)),
timestamp: Timestamp.Parse(
List.Get(groups, 0),
"y-M-d'T'H:m:s"
),
error: List.Get(groups, 2)
}
)
in
Collection.Filter(parsed, (x) -> x.machineId == id)
errors(1)
Output:
[
{
"machineId": 1,
"timestamp": "2015-01-03T07:00:00.000",
"error": "error1"
},
{
"machineId": 1,
"timestamp": "2015-01-03T20:00:00.000",
"error": "error3"
},
{
"machineId": 1,
"timestamp": "2015-01-04T06:00:00.000",
"error": "error5"
},
Sensor data
Sensor data is collected and stored in CSV files. We can read it using the following function:
telemetry(id: int) =
Collection.Filter(
Csv.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/telemetry-iso-time.csv"
),
(x) -> x.machineID == id
)
Output:
[
{
"datetime": "1/1/2015 6:00:00 AM",
"machineID": 1,
"volt": 176.217853015625,
"rotate": 418.504078221616,
"pressure": 113.077935462083,
"vibration": 45.0876857639276
},
{
"datetime": "1/1/2015 7:00:00 AM",
"machineID": 1,
"volt": 162.87922289706,
"rotate": 402.747489565395,
"pressure": 95.4605253823187,
"vibration": 43.4139726834815
},
Collecting our information for interrogation
Now we have all the sources defined, we can start to dig into the data to get answers. For a given machine (id), we would like to collect some information about the last failure. We are interested in:
- Basic information such as the error, timestamp, machine age, model etc. from ‘failures’ (docker json file) and ‘machines’ (database table).
- Sensor data of the 6 hours before the crash (‘telemetry’ from our sensor csv file).
- Errors of the 6 hours before the crash ('errors' from log files).
Let's create a function lastFailureData
which aggregates all necessary data from each one of functions created before.
lastFailureData(machineId: int) =
let
machineData = machine(machineId),
failureData = failures(machineId),
lastFailure = Collection.Max(failureData.FinishedAt),
startMeasure = Timestamp.SubtractInterval(
lastFailure,
Interval.Build(hours = 6)
),
lastFailureRecord = Collection.First(
Collection.Filter(
failureData,
(x) -> x.FinishedAt == lastFailure
)
),
lastTelemetry = Collection.Filter(
telemetry(machineId),
(x) ->
x.datetime < lastFailure and x.datetime > startMeasure
),
lastErrors = Collection.Filter(
errors(machineId),
(x) ->
x.timestamp < lastFailure and x.timestamp > startMeasure
)
in
{
lastFailure: lastFailureRecord,
machineData: machineData,
lastTelemetry: lastTelemetry,
lastErrors: lastErrors
}
lastFailureData(1)
Output:
{
"machineId": 1,
"age": 18,
"model": "model3",
"lastFailure": {
"MachineId": 1,
"ExitCode": 4,
"Error": "comp4",
"FinishedAt": "2015-12-31T06:00:00.000"
},
"lastTelemetry": [
{
"datetime": "2015-12-31T01:00:00.000",
"machineID": 1,
"volt": 147.720615260015,
"rotate": 493.074645851158,
"pressure": 104.81366016439,
"vibration": 41.2714171061972
},
{
"datetime": "2015-12-31T02:00:00.000",
"machineID": 1,
"volt": 153.93048096902,
"rotate": 353.466012177296,
"pressure": 99.6570720990314,
"vibration": 42.806176552987
},
{
"datetime": "2015-12-31T03:00:00.000",
"machineID": 1,
"volt": 175.481807900786,
"rotate": 475.951631160907,
"pressure": 88.7452579535092,
"vibration": 39.9863347521755
},
{
"datetime": "2015-12-31T04:00:00.000",
"machineID": 1,
"volt": 179.860806868559,
"rotate": 461.478368479999,
"pressure": 120.299989462607,
"vibration": 35.8235042398746
},
{
"datetime": "2015-12-31T05:00:00.000",
"machineID": 1,
"volt": 172.645716803532,
"rotate": 386.985814610685,
"pressure": 96.0729702714405,
"vibration": 35.7556427077587
}
],
"lastErrors": []
}
// Machine information taken from a postgresql database
machine(id: int) =
let
data = PostgreSQL.InferAndRead(
"raw",
"example",
"machines",
host = "example-psql.raw-labs.com",
username = "pgsql_guest",
password = "BTSWkufumcv5oSq1vcbVF9f0"
)
in
Collection.First(Collection.Filter(data, (x) -> x.id == id))
// Software crashes taken from docker instect file
failures(id: int) =
let
dockerInspect = Json.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/docker-inspect-output.json"
),
summary = Collection.Transform(
dockerInspect,
(x) ->
{
MachineId: Int.From(String.SubString(x.Name, 9, 1)),
ExitCode: x.State.ExitCode,
Error: x.State.Error,
FinishedAt: x.State.FinishedAt
}
)
in
Collection.Filter(
summary,
(x) -> x.ExitCode > 0 and x.MachineId == id
)
// Errors taken from log files
errors(id: int) =
let
data = String.ReadLines(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/machine_logs.log"
),
filtered = Collection.Filter(
data,
(x) ->
Regex.Matches(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
),
parsed = Collection.Transform(
filtered,
(x) ->
let
groups = Regex.Groups(
x,
"(.*) WARN machine (\\d+) with error=(\\w+).*"
)
in
{
machineId: Int.From(List.Get(groups, 1)),
timestamp: Timestamp.Parse(
List.Get(groups, 0),
"y-M-d'T'H:m:s"
),
error: List.Get(groups, 2)
}
)
in
Collection.Filter(parsed, (x) -> x.machineId == id)
// Sensor data taken from a CSV file
telemetry(id: int) =
Collection.Filter(
Csv.InferAndRead(
"s3://raw-tutorial/ipython-demos/predictive-maintenance/telemetry-iso-time.csv"
),
(x) -> x.machineID == id
)
// Function getting the last failure of a machine and all relevant data
// Collects sensor data and errors for the 6 hours before the crash
main(machineId: int) =
let
machineData = machine(machineId),
failureData = failures(machineId),
lastFailure = Collection.Max(failureData.FinishedAt),
startMeasure = Timestamp.SubtractInterval(
lastFailure,
Interval.Build(hours = 6)
),
lastFailureRecord = Collection.First(
Collection.Filter(
failureData,
(x) -> x.FinishedAt == lastFailure
)
),
lastTelemetry = Collection.Filter(
telemetry(machineId),
(x) ->
x.datetime < lastFailure and x.datetime > startMeasure
),
lastErrors = Collection.Filter(
errors(machineId),
(x) ->
x.timestamp < lastFailure and x.timestamp > startMeasure
)
in
{
lastFailure: lastFailureRecord,
machineData: machineData,
lastTelemetry: lastTelemetry,
lastErrors: lastErrors
}
// The following test will run if you press the [Run Code] button directly.
main(1)
Where is the ISS?
- Overview
- Code
This example shows how to create a REST API that combines data from multiple web services.
Where is the ISS?
The goal is to deploy an API showing the nearest location on Earth to the International Space Station. For this, we combine two existing APIs:
- https://api.open-notify.org/, which gives us the ISS coordinates in real-time;
- https://opencagedata.com/, which implements a reverse geocoding service to obtain the city nearest to a given set of coordinates.
origin/main
The resulting REST API returns a list of the couple of places the ISS is flying over.
For instance, at the time of development,
/issLocation
returned:
[
"Adrar, Mauritania"
]
Getting the ISS position
Here's the API call to get the ISS location: http://api.open-notify.org/iss-now.json
.
Here's an example response this service returns:
{
"message": "success",
"timestamp": 1675083740,
"iss_position": {
"longitude": "106.5087",
"latitude": "38.2938"
}
}
The message only includes GPS coordinates and does not explicitly states where the ISS is in the world. To find a named location, we need reverse geocoding of the ISS latitude and longitude.
Reverse Geocoding
Many geocoding APIs are available. We use the OpenCage Geocoding API. This service requires an API key that is generated when creating an account. Both the API key and the coordinates have to be provided as query parameters, e.g.:
https://api.opencagedata.com/geocode/v1/json?key=<API_KEY>&q=<LATITUDE>,<LONGITUDE>
Latitude and longitude of the ISS are extracted from the former JSON record after it is downloaded and decoded with Json.InferAndRead
.
let iss = Json.InferAndRead("http://api.open-notify.org/iss-now.json"),
latitude = iss.iss_position.latitude,
longitude = iss.iss_position.longitude,
Here's how to prepare the HTTP query to OpenCage and retrieve the result.
let //
// ...
//
geoQuery = latitude + "+" + longitude, // a URL parameter required by the API
key = Environment.Secret("opencagedata"), // the API key is stored as a secret
httpQuery = Http.Get(
"https://api.opencagedata.com/geocode/v1/json",
args = [
{"q", geoQuery},
{"key", key},
{"no_annotations", "1"} // no_annotations returns simpler data
]
),
// etc.
issGeoLoc = Json.InferAndRead(httpQuery, preferNulls = true)
The call returns a JSON record with lots of information. We need the location's name. It is found under field name formatted
:
{
"documentation": "https://opencagedata.com/api",
...
"results": [
{
...
"formatted": "Adrar, Mauritania"
...
}
],
"status": {
"code": 200,
"message": "OK"
},
...
}
Once issGeoLoc
is populated with the geolocation data, the city name is easily retrieved from issGeoLoc.results.formatted
.
Here's the full code of our service.
let
iss = Json.InferAndRead("http://api.open-notify.org/iss-now.json"),
latitude = iss.iss_position.latitude,
longitude = iss.iss_position.longitude,
geoQuery = latitude + "+" + longitude,
key = Environment.Secret("opencagedata"),
httpQuery = Http.Get(
"https://api.opencagedata.com/geocode/v1/json",
args = [
{"q", geoQuery},
{"key", key},
{"no_annotations", "1"}
]
),
issGeoLoc = Json.InferAndRead(httpQuery, preferNulls = true)
in
issGeoLoc.results.formatted
let
iss = Json.InferAndRead("http://api.open-notify.org/iss-now.json"),
latitude = iss.iss_position.latitude,
longitude = iss.iss_position.longitude,
geoQuery = latitude + "+" + longitude,
key = Environment.Secret("opencagedata"),
httpQuery = Http.Get(
"https://api.opencagedata.com/geocode/v1/json",
args = [
{"q", geoQuery},
{"key", key},
{"no_annotations", "1"}
]
),
issGeoLoc = Json.InferAndRead(httpQuery, preferNulls = true)
in
issGeoLoc.results.formatted
Step 4: Delivering Data
Once the API is built, you may want to improve its delivery. Here are some suggestions:
- Learn how to cache API results to improve performance or protect backend systems;
- Learn how to secure your API.