Code operator examples
The Code operator can be of type Source, Processing and Analytics, or Target.
Important
- The code must be created with the same version of packages that are listed here.
- Regardless of the operator type, you must declare all output attributes in the Edit Schema window.
Restriction
Passed datetime
objects must not be timezone-aware.
When you return or submit event tuples with datetime
objects, make sure that they are “naive”, that is “timezone-unaware”. For more information about “timezone aware” and “naive” in Python coding, see datetime — Basic date and time types.
Passing datetime
objects that are “timezone-aware” results in runtime errors, and such events are ignored.
Tip
In Python, you can convert a timezone-aware datetime
object to a “naive”, UTC-based datetime
object. The following code snippet is an example.
from dateutil.tz import tzutc
event['dt_utc'] = dt.astimezone(tzutc()).replace(tzinfo=None)
See Example 2 for a full example.
Code as a Source operator
You might use Code as a Source operator when you want to generate some test data without having to set up an Event Streams instance.
Another use is when you want to bring in data from a web socket or from your proprietary data source.
A. Asynchronous
# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)
class Code(object):
# Called when flow is starting to run
# def __enter__(self):
# pass
def __call__(self, submit):
counter = 0
while(counter <= 10):
# Submit a tuple in each iteration:
submit({"number": counter, "square": counter * counter})
counter += 1
time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
# Called to handle an exception or shutdown
# def __exit__(self, exc_type, exc_value, traceback):
# pass
When the streams flow is running, the Flow of Events shows the generated test data.
B. Synchronous
The following code shows the first approach:
import sys
import time
import logging
# User logs include messages at INFO, WARN and ERROR levels
logger = logging.getLogger(__name__)
class Code(object):
# Called when flow is starting to run
# def __enter__(self):
# pass
def __call__(self):
counter = 0
while(counter <= 10):
# Submit a tuple in each iteration:
counter += 1
time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
yield {"number": counter, "square": counter * counter}
# Called to handle an exception or shutdown
# def __exit__(self, exc_type, exc_value, traceback):
# pass
The following code shows the second approach:
import time
class Code(object):
# Called when flow is starting to run
# def __enter__(self):
# pass
def __call__(self):
return self
def __next__(self):
time.sleep(0.5) # Simulates a delay of 0.5 seconds between emitted events
self.counter += 1
# Submit a tuple in each iteration:
return {"number": self.counter, "square": self.counter * self.counter}
# Called to handle an exception or shutdown
# def __exit__(self, exc_type, exc_value, traceback):
# pass
Code as a Processing and Analytics operator
The Code as a Processing and Analytics operator has both an input parameter and a return value.
Example 1
Goal: You need to return to the output schema two new attributes that are not present in the input schema.
The following code snippet shows an example for two attributes, “friendly_greeting” and “formal_greeting”, in the output schema of the Code operator.
import sys
def process(event):
if 'name' in event:
name = event['name']
else:
name = 'stranger'
friendly_greeting = 'Hey ' + name + '!'
formal_greeting = "Dear ' + name + ','
return
{'friendly_greeting':friendly_greeting,
'formal_greeting':formal_greeting}
The returned attributes ‘friendly_greeting’ and ‘formal_greeting’ must be included in the output schema of the Code operator.
To check that they are present, click in the Code Properties pane, and then add the attributes and their type, when needed.
Example 2
Goal: You need to return to the output schema the time of an event that was changed from IST to UTC time zone.
from dateutil.parser import parse
from dateutil import tz
def process(event):
# datetime.parse doesn't understand "IST" as a time zone indicator, so swap for +05:30
dt = parse(event['event_time'].replace('IST','+05:30'))
# convert to UTC time zone too
event['dt_utc'] = dt.astimezone(tz.gettz('UTC'))
return event
For more information about date formats, see Date formats.
Example 3
Goal: You need to return every other tuple to the output schema.
import sys
counter=0
def process(event):
global counter
counter+=1
event['counter'] = counter
if counter%2 is 0:
return None
return event
Example 4
Goal: You want to report the geographic movement of mobile GPS-enabled devices (phones, tablets, cars). This scenario is an example for a use case that requires state
.
def init(state):
# Nothing to initialize, in this example
pass
def process(event, state):
deviceId = event['deviceId'] # Extract the device ID from the event tuple
if deviceId not in state:
# No previous record for this device ID, meaning it's the first event for it
message = "Detected initial location"
else:
record = state[deviceId] # Extract the device record of last (previous) location
directions = [] # Array for gathering directions: north or south, east or west
if event['lat'] > record['last_lat']:
directions.append("north")
elif event['lat'] < record['last_lat']:
directions.append("south")
if event['long'] > record['last_long']:
directions.append("east")
elif event['long'] < record['last_long']:
directions.append("west")
message = "Moved " + "-".join(directions)
output = {
'deviceId': deviceId,
'message': message
}
rememberDeviceLocation(event, state) # For comparing with future locations
return output
Example output
Input event 1 {‘deviceId’: ‘p008’, ‘lat’: ‘32.678611’, ‘long’: ‘35.576944’}
Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Detected initial location’} ***
Input event 2 {‘deviceId’: ‘p008’, ‘lat’: ‘32.67862’, ‘long’: ‘35.576944’}
Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved north’} ***
Input event 3 {‘deviceId’: ‘p008’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}
Resulting output {‘deviceId’: ‘p008’, ‘message’: ‘Moved south-west’} ***
Input event 4 {‘deviceId’: ‘x123’, ‘lat’: ‘32.6786’, ‘long’: ‘35.57694’}
Resulting output {‘deviceId’: ‘x123’, ‘message’: ‘Detected initial location’}
Example 5
Goal: Watson IoT operator ingests electricity usage readings from smart meters to bill users. You need to take that data and implement time-based billing. You also want to send an email alert to customers whose usage for the current month is high.
Watch the video Use Python code in a streams flow.
Code as a Target operator
The Code as Target operator allows you to write Python code in a target node.
The following code shows an example:
# process() function will be invoked on every event tuple
# @event a Python dictionary object representing the input event tuple as defined by the input schema
# @state a Python dictionary object for keeping state over subsequent function calls
def process(event, state):
# Do something with the event, for example use a complex filtering condition and send the event to a database.
pass