Data historian example streams flow
Table of contents
Data historian - What’s it all about?
Data historian is an efficient way to collect and store time series data. The data might come from production lines, transportation routes, network devices, satellites, and other devices. The data is stored with a time stamp and other identifying information such as device ID and location. You can build a dashboard to get information in real-time fashion, or you can store the data for offline analysis.
Sample data historian data
The sample data that is used in the Data Historian streams flow contains formatted weather data that streams from five personal weather stations. The data includes station ID, time zone, date in Universal Coordinated Time (UTC) format, latitude, longitude, temperature, barometric pressure, humidity, indoor temperature, and rainfall.
Our goal is to collect data from the weather stations and compute average temperature, barometric pressure, humidity, indoor temperature, and rainfall for each weather station every minute.
Description of operators
The following screen capture shows how the data historian example streams flow looks in the canvas.
Let’s look more closely at these operators.
Sample data operator
Sample data is the source of weather data for the streams flow. The following screen captures show the sample weather data properties and some of its schema attributes.
The streams flow ingests the sample data. The schema attributes include weather station ID, time zone, date in UTC format, time stamp, the longitude of the weather station, and so on.
First Aggregation operator
Next, we want to calculate the average barometric pressure, humidity, indoor temperature, and rainfall today for each weather station. We can make those calculations with the first Aggregation operator.
Aggregation is done on “windows” of data. The types of windowing and which type you use depend on when and how often you want to calculate the results.
A sliding window calculates the result of the aggregation whenever a new tuple arrives. A tuple stays in the window until its age exceeds the window size. For example, if the window size is 30 days, then a tupple is removed when the current time is more than 30 days.
This type of windowing is used for dynamic, up-to-the-moment calculations such as rainfall information that is being sent to a mobile app.
A tumbling window calculates the result of the aggregation once at the end of each designated period, regardless of how often tuples arrive. All tuples (not just the oldest) then “tumble out” of the window.
This type of windowing is best for situations where updates at specific intervals are required. For example, to report hourly temperature, pressure, and humidity.
We’re going to use a tumbling window because we want to apply a function on the tuples every minute.
Our goal is to compute averages for each weather station. To do these calculations, we partition by “id”. The sample data is from five weather stations. Each weather station is a separate partition, for a total of five partitions.
What happens during tumbling?
Since we are using a 60-second window, all tuples ‘tumble out’ every 60 seconds, and then the designated function for each attribute is applied. Aggregation is done to individual partitions (weather stations) in the window.
For example, the function “PassThrough” is done on “id”, “tz”, and ‘dateutc” attributes of each partition. “PassThrough” permits those attributes to pass through without any filtering or change. In contrast, the function “Average” is done on the “rainfall” and “humidity” attributes to calculate their average amounts for each partition (weather station).
Second Aggregation operator
The second Aggregation operator ingests the output from the first Aggregation operator. The second operator is defined like the first, except that the tuples ‘tumble out’ every 180 seconds. Aggregation is done much less frequently in the second Aggregation operator. As a result, less data is sent for storage in Cloud Object Storage.
Cloud Object Storage
Cloud Object Storage provides cloud storage for massive amounts of unstructured data. A Cloud Object Storage instance must be provisioned in IBM Cloud, and then associated with the project.
In our example, we define the File path where to store the data in COS. First, we click , and then select the existing bucket called
datahistorian/weather\_storage. We make a new file called
HD_%TIME.csv. We create a new file every 1800 seconds (10 minutes). By appending “%TIME” to the file name, we’ll create a unique name every time the file is created. Otherwise, the new file overwrites the existing one.
Running the Data Historian example streams flow
When you select Data Historian Example in the
The Metrics page has several panes:
1.Streams Flow pane - Shows streaming data as it flows among operators in your streams flow.
Hover your mouse pointer over the data flow coming out of the first Aggregation operator to see the number of events per second. Now, do the same over the data flow from the second Aggregation operator. Note that less data is flowing out of the second operator than out of the first.
To see the actual data being used in the averages of the first Aggregation operator, click the data flow coming out of the first operator. The Flow of Events table opens.
2.Ingest Rate – For each streams flow source, shows the number of sample events that are submitted to the streams flow per second.
3.Throughput – For each operator, shows the throughput of input and output flows, if they exist. It also shows events that have errors. Click each operator to see its throughput.
4.Flow of Events table - Shows the events that flow between operators in both table and JSON formats. This table only opens when you click the data flow between operators.