Python Machine Learning in a streams flow


The Python Machine Learning operator provides a simple way for you to run Python models to do real-time predictions and scoring.

How it works

File objects are the external file resources that are required by your code at execution time. Your Machine Learning process() function can expect these resources to be available on the runtime-local file system before it gets invoked for the first time.

You can specify more than one file object, such as when you want to also use a tokenizer or a dictionary for text analysis. For each file object, you specify its location path in Cloud Object Storage (COS) and a typically short reference name that are used in its callback function. Clicking Generate Callbacks appends a callback function stub to your code, for each file object.

When the flow starts running, we download each specified file object from COS and place it at a unique location on the runtime-local file system. At that point, we call your callback function with the runtime-local file path as an argument. Your callback function then instantiates and keeps the respective object for usage in subsequent processing.

All specified file objects must be available on COS before we call your process() function for the first time. Until then, any incoming events are held back.

We continually scan COS and check if the file object was updated. If so, we reload the file to the runtime-local file system. Then, we again call its callback function, which redeserializes the respective object and updates the state with the new model object, without restarting the flow.


The Python objects that you load into COS must be created with the same version of packages that are used in the streams flow. To see the list of preinstalled and user-installed packages, go to the canvas, click Settings icon, and then click Environment.


Goal: Run predictive analysis by using a tokenizer and a model that were uploaded to COS. After you define the file objects and click Generate Callbacks, we generate stubs for your load_model() and load_tokenizer() callback functions.

import requests
import sys
import os
import pickle

def init(state):

def process(event,state):  
   text = event[‘tweet’]
   text_t = state[‘vectorizer’].transform([text])
   y_pred = state[‘classifier’].predict(text_t) # predicts class as number

   labels = [‘irrelevant’, ‘negative’, ‘neutral’, ‘positive’]
   event[‘sentiment’] = labels[y_pred]

   return event

def load_classifier(state, path_classifier):
    state[‘classifier’] = pickle.load(open(path_classifier, “rb” ))

def load_vectorizer(state, path_vectorizer):
    state[‘vectorizer’] = pickle.load(open(path_vectorizer, “rb” ))