Code operator examples

The Code operator can be of type Source, Processing and Analytics, or Target.

Important

  • The code must be created with the same version of packages that are listed here.
  • Regardless of the operator type, you must declare all output attributes in the Edit Schema window.

Restriction

Passed datetime objects must not be timezone-aware.

When you return or submit event tuples with datetime objects, make sure that they are “naive”, that is “timezone-unaware”. For more information about “timezone aware” and “naive” in Python coding, see datetime — Basic date and time types.

Passing datetime objects that are “timezone-aware” results in runtime errors, and such events are ignored.

Tip

In Python, you can convert a timezone-aware datetime object to a “naive”, UTC-based datetime object. The following code snippet is an example.

from dateutil.tz import tzutc

event['dt_utc'] = dt.astimezone(tzutc()).replace(tzinfo=None)

See Example 2 for a full example.

 

Code as a Source operator

You might use Code as a Source operator when you want to generate some test data without having to set up an Event Streams instance.

Another use is when you want to bring in data from a web socket or from your proprietary data source.

A. Asynchronous

# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)

class Code(object):
    # Called when flow is starting to run
    # def __enter__(self):
    #     pass

    def __call__(self, submit):
        counter = 0
        while(counter <= 10):
            # Submit a tuple in each iteration:
            submit({"number": counter, "square": counter * counter})
            counter += 1
            time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events

    # Called to handle an exception or shutdown
    # def __exit__(self, exc_type, exc_value, traceback):
    #     pass

When the streams flow is running, the Flow of Events shows the generated test data.

Flow of Events for code as source

B. Synchronous

The following code shows the first approach:

import sys
import time
import logging

# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)

class Code(object):
   # Called when flow is starting to run
   # def __enter__(self):
   #     pass

   def __call__(self):
       counter = 0
       while(counter <= 10):
           # Submit a tuple in each iteration:
           counter += 1
           time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
           yield {"number": counter, "square": counter * counter}

   # Called to handle an exception or shutdown
   # def __exit__(self, exc_type, exc_value, traceback):
   #     pass

The following code shows the second approach:

import time


class Code(object):
    # Called when flow is starting to run
    # def __enter__(self):
    #     pass

    def __call__(self):
        return self

    def __next__(self):
        time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
        self.counter += 1
        # Submit a tuple in each iteration:
        return {"number": self.counter, "square": self.counter * self.counter}

    # Called to handle an exception or shutdown
    # def __exit__(self, exc_type, exc_value, traceback):
    #     pass


Code as a Processing and Analytics operator

The Code as a Processing and Analytics operator has both an input parameter and a return value.

Example 1

Goal: You need to return to the output schema two new attributes that are not present in the input schema.

The following code snippet shows an example for two attributes, “friendly_greeting” and “formal_greeting”, in the output schema of the Code operator.

import sys
def process(event):
  if 'name' in event:
    name = event['name']
  else:
    name = 'stranger'
  friendly_greeting = 'Hey ' + name + '!'
  formal_greeting = "Dear ' + name + ','
  return
  {'friendly_greeting':friendly_greeting,
   'formal_greeting':formal_greeting}

The returned attributes ‘friendly_greeting’ and ‘formal_greeting’ must be included in the output schema of the Code operator. To check that they are present, click Edit Output Schema in the Code Properties pane, and then add the attributes and their type, when needed.

Example 2

Goal: You need to return to the output schema the time of an event that was changed from IST to UTC time zone.

from dateutil.parser import parse
from dateutil import tz
def process(event):
  # datetime.parse doesn't understand "IST" as a time zone indicator, so swap for +05:30

  dt = parse(event['event_time'].replace('IST','+05:30'))

  # convert to UTC time zone too

  event['dt_utc'] = dt.astimezone(tz.gettz('UTC'))
  return event

For more information about date formats, see Date formats.

Example 3

Goal: You need to return every other tuple to the output schema.

import sys

counter=0

def process(event):
	global counter
	counter+=1
	event['counter'] = counter
	if counter%2 is 0:
		return None
	return event

Example 4

Goal: You want to report the geographic movement of mobile GPS-enabled devices (phones, tablets, cars). This scenario is an example for a use case that requires state.

def init(state):
    # Nothing to initialize, in this example
    pass


def process(event, state):
    deviceId = event['deviceId']  # Extract the device ID from the event tuple
    if deviceId not in state:
        # No previous record for this device ID, meaning it's the first event for it
        message = "Detected initial location"
    else:
        record = state[deviceId]  # Extract the device record of last (previous) location
        directions = []  # Array for gathering directions: north or south, east or west

        if event['lat'] > record['last_lat']:
            directions.append("north")
        elif event['lat'] < record['last_lat']:
            directions.append("south")

        if event['long'] > record['last_long']:
            directions.append("east")
        elif event['long'] < record['last_long']:
            directions.append("west")

        message = "Moved " + "-".join(directions)

    output = {
        'deviceId': deviceId,
        'message': message
    }
    rememberDeviceLocation(event, state)  # For comparing with future locations
    return output

Example output

Input event 1 {‘deviceId’: ‘p008’, ‘lat’: ‘32.678611’, ‘long’: ‘35.576944’}

Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Detected initial location’} ***

Input event 2 {‘deviceId’: ‘p008’, ‘lat’: ‘32.67862’, ‘long’: ‘35.576944’}

Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved north’} ***

Input event 3 {‘deviceId’: ‘p008’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}

Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved south-west’} ***

Input event 4 {‘deviceId’: ‘x123’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}

Resulting output {‘deviceId’: ‘x123’, ‘message’: ‘Detected initial location’}

Example 5

Goal: Watson IoT operator ingests electricity usage readings from smart meters to bill users. You need to take that data and implement time-based billing. You also want to send an email alert to customers whose usage for the current month is high.

Watch the video Use Python code in a streams flow.

Code as a Target operator

The Code as Target operator allows you to write Python code in a target node.

The following code shows an example:

# process() function will be invoked on every event tuple
# @event a Python dictionary object representing the input event tuple as defined by the input schema
# @state a Python dictionary object for keeping state over subsequent function calls
def process(event, state):
    # Do something with the event, for example use a complex filtering condition and send the event to a database.
    pass

Learn more