Aggregation operator examples

Example 1: Aggregating Clickstream data

Goal: Use sample Clickstream data to answer the following questions about online sales activity.

  1. For every minute, how many unique customers are there for every event type?

  2. For every minute, what is the total price of items in a customer’s shopping cart for every event type?

Clickstream data is data that is generated by a user’s interaction with an online website. For example, the user might browse for an item, add the item to a shopping cart, remove an item from the cart, log out without any purchase, or complete a purchase. Every click generates an event that can be captured in a streams flow.

Let’s design a simple streams flow in the canvas with three operators.

  1. Sample Data – This operator provides sample Clickstream data.

  2. Aggregate – This operator aggregates the data by customer ID and then applies specified functions on the data every minute.

  3. Cloud Object Storage (COS) – This operator is the target for storage.

    Aggregation streams flow

Now let’s define each operator in its Properties pane.

Define Sample Data operator

Select Clickstream. Click Edit Source Schema to look at the schema that’s coming in to the streams flow.

Define Aggregation operator

We define the parameters of this operator to give the metrics that we need to answer our two questions. Set the following properties.

Aggregator properties

Aggregation Window Type is set to tumbling because we want the aggregation functions done at a set time interval, regardless of how often or how many tuples arrive. For more information, see tumbling windows, sliding windows.

Aggregation Window size is determined by Time Unit and Number of Time Units. In our case, we’ll set it to minute and 1 because we want metrics every minute. Every minute, all tuples ‘tumble out’ and an Aggregation function is applied to each field.

The window is partitioned by customer_id into subwindows. This partition gives us metrics by customer.

Aggregation functions

We set up three Aggregation functions to define what metrics are done on all tuples in every subwindow (meaning, every customer). The following Aggregation functions are done on all subwindows every minute.

  • click_event_type – The function PassThrough passes the type of event that occurred to a field called “click_event_type”. This field tells us what action the user did, such as logging in, browsing, adding an item to the cart, and so on. The function PassThrough passes the schema attributes through to the output without doing any sort of computation or change.

  • total_carts – The function Average computes the average price for a customer’s shopping cart and stores the value in a field called “total_carts“.

  • unique_users – The function CountDistinct gives the number of unique users that are logged in, and stores the value in a field called “unique_users”. We do not use the function Count because it gives the total number of users, including multiple entries of users.

Aggregation output fields

Here’s how our streams flow looks now.

Streams flow

Note the change in the schema before and after the Aggregation operator. Only three attributes are stored in COS. The timestamp attribute is not added to the output schema.

The following functions can be applied to specific schema attributes.

Aggregate function Argument type Return type Return value
Average Integer, float Float Average of all input values.
Null if no rows are selected.
Count Integer Same as argument Number of all non-null input rows.
CountDistinct Integer Same as argument Number of distinct expression values that are computed for the tuples in the group.
Maximum Integer, float Same as argument Maximum element across all input values.
Null if no rows are selected.
Minimum Integer, float Same as argument Minimum element across all input values.
Null if no rows are selected.
PassThrough Integer, float, text Same as argument Passes schema attribute to the output as is.
Sum Integer, float Same as argument Sum of all input values.
Null if no rows are selected.
Standard Deviation Integer, float Same as argument Standard deviation of all input values.
Null if no rows are selected.

Define COS operator

We select our COS instance connection, and then define the File path for the CSV files.

COS connection

Select a COS service instance that you want to send the data to. In the File path field, click Settings icon to get a list of existing buckets in that COS instance, and then select a bucket. In this example, we add a file that is called %TIME.csv so that we’ll get a new file every minute. Otherwise, every new file overwrites the existing file.

COS connection

In the Format field, we select CSV because we have only three columns. Most likely, queries need to query row data rather than column data.

In the File Create Policy field, we select Time because our windows are tumbling every minute. We set this parameter to create a new file with every ‘tumble’.

Run the streams flow

All operations are now defined. We save the streams flow, and then click Metrics icon to run the flow.

When the streams flow runs, the Ingestion Rate for the Clickstream data and the Throughput rate for the input and output of the Aggregation operator are shown.

The Flow of Events table shows us the events that are streaming to our COS instance.

The data is stored in CSV files in your COS bucket. You can download those files from IBM Cloud for further analysis.

Metrics page

Example 2: Parallel Aggregation

Let’s say that our online website has irregular peaks of data according to the hour of the day and the season.

In this case, use parallel Aggregate operators. Both Aggregate operators are set up identically, except that each operator uses a different value for Time and Number of Time Units to evict tuples.

In our Clickstream example, the first Aggregate operator has a tumbling window every minute. Let’s set the second Aggregate operator to have a tumbling window every 10 minutes.

More examples

For more examples of the Aggregate operator, see the article Calculate moving averages on real-time data in a streams flow.

Learn more