This notebook shows you how to deploy a Decision Optimization model, create and monitor jobs, and get solutions using the watsonx.ai Python Client.
This notebook runs on Python.
Table of contents:
Before you use the sample code in this notebook, you must:
Import the watsonx.ai client library.
from ibm_watsonx_ai import APIClient
from ibm_watsonx_ai import Credentials
# Instantiate a client using credentials
credentials = Credentials(
api_key = "<API_key>",
url = "<instance_url>"
)
client = APIClient(credentials)
client.version
'1.0.11'
Put the model.py file in a subdirectory and create a tar.gz file. The model consists of two parts:
inputs
dictionary from files and that create files from an outputs
dictionary,Use the write_file
command to write the model to a main.py
file.
Use the tar
command to create a tar archive.
%mkdir model
%%writefile model/main.py
from docplex.util.environment import get_environment
import pandas
from six import iteritems
from collections.abc import Mapping
from os.path import join, dirname, basename, splitext, exists
import glob
class _InputDict(dict):
def __init__(self, directory, names):
dict.__init__(self)
self._directory = directory
for k in names:
dict.__setitem__(self, k, None)
file='model_schema.json'
if self._directory is not None:
file = "{0}/".format(self._directory) + file
self.dtype_schemas = self.get_dtype_schemas( file)
def __getitem__(self, key):
if isinstance(key, str):
item = dict.__getitem__(self, key)
if item is None:
file = "{0}.csv".format(key)
if file in self.dtype_schemas:
return self.read_df( key, dtype=self.dtype_schemas[file])
else:
return self.read_df( key)
else:
return item
else:
raise Exception("Accessing input dict via non string index")
def read_df(self, key, **kwargs):
env = get_environment()
file = "{0}.csv".format(key)
if self._directory is not None:
file = "{0}/".format(self._directory) + file
with env.get_input_stream(file) as ist:
params = {'encoding': 'utf8'}
if kwargs:
params.update(kwargs)
df = pandas.read_csv( ist, **params)
dict.__setitem__(self, key, df)
return df
def get_dtype_schemas(self, path):
dtype_schemas = {}
if exists(path):
input_schemas=json.load(open(path))
if 'input' in input_schemas:
for input_schema in input_schemas['input']:
dtype_schema = {}
if 'fields' in input_schema:
for input_schema_field in input_schema['fields']:
if input_schema_field['type']=='string':
dtype_schema[input_schema_field['name']]='str'
if len(dtype_schema) > 0:
dtype_schemas[input_schema['id']]=dtype_schema
print(dtype_schemas)
return dtype_schemas
class _LazyDict(Mapping):
def __init__(self, *args, **kw):
self._raw_dict = _InputDict(*args, **kw)
def __getitem__(self, key):
return self._raw_dict.__getitem__(key)
def __iter__(self):
return iter(self._raw_dict)
def __len__(self):
return len(self._raw_dict)
def read_df(self, key, **kwargs):
return self._raw_dict.read_df(key, **kwargs)
def get_all_inputs(directory=None):
'''Utility method to read a list of files and return a tuple with all
read data frames.
Returns:
a map { datasetname: data frame }
'''
all_csv = "*.csv"
g = join(directory, all_csv) if directory else all_csv
names = [splitext(basename(f))[0] for f in glob.glob(g)]
result = _LazyDict(directory, names)
return result
def write_all_outputs(outputs):
'''Write all dataframes in ``outputs`` as .csv.
Args:
outputs: The map of outputs 'outputname' -> 'output df'
'''
for (name, df) in iteritems(outputs):
csv_file = '%s.csv' % name
print(csv_file)
with get_environment().get_output_stream(csv_file) as fp:
if sys.version_info[0] < 3:
fp.write(df.to_csv(index=False, encoding='utf8'))
else:
fp.write(df.to_csv(index=False).encode(encoding='utf8'))
if len(outputs) == 0:
print("Warning: no outputs written")
Writing model/main.py
%%writefile -a model/main.py
from docplex.mp.progress import SolutionListener
import pandas
import numpy
def build_solution(sol):
solution_df = pandas.DataFrame(columns=['Food', 'value'])
for index, dvar in enumerate(sol.iter_variables()):
solution_df.loc[index,'Food'] = dvar.to_string()
solution_df.loc[index,'value'] = dvar.solution_value
outputs = {}
outputs['solution'] = solution_df
# Generate output files
write_all_outputs(outputs)
class SolutionKeeper(SolutionListener):
''' A specialized implementation of :class:`SolutionListener`, which keeps track
of the latest intermediate solution found.
'''
def __init__(self):
SolutionListener.__init__(self)
self.index = -1
def notify_solution(self, sol):
self.index +=1
build_solution(sol)
# `write_all_outputs()` will publish tables from outputs dictionnary as solution tables
write_all_outputs(outputs)
# Load CSV files into inputs dictionnary
inputs = get_all_inputs()
food = inputs['diet_food']
nutrients = inputs['diet_nutrients']
food_nutrients = inputs['diet_food_nutrients']
food_nutrients.set_index('Food', inplace=True)
from docplex.mp.model import Model
# Model
mdl = Model(name='diet')
# To obtain tables for intermediate solutions, you must add a SolutionListener
# so that you can then create solution tables and publish them.
mdl.add_progress_listener(SolutionKeeper())
# Create decision variables, limited to be >= Food.qmin and <= Food.qmax
qty = food[['name', 'qmin', 'qmax']].copy()
qty['var'] = qty.apply(lambda x: mdl.continuous_var(lb=x['qmin'],
ub=x['qmax'],
name=x['name']),
axis=1)
# make the name the index
qty.set_index('name', inplace=True)
# Limit range of nutrients, and mark them as KPIs
for n in nutrients.itertuples():
amount = mdl.sum(qty.loc[f.name]['var'] * food_nutrients.loc[f.name][n.name]
for f in food.itertuples())
mdl.add_range(n.qmin, amount, n.qmax)
mdl.add_kpi(amount, publish_name='Total %s' % n.name)
# Minimize cost
obj = mdl.sum(qty.loc[f.name]['var'] * f.unit_cost for f in food.itertuples())
mdl.add_kpi(obj, publish_name="Minimal cost");
mdl.minimize(obj)
mdl.print_information()
# solve
ok = mdl.solve()
mdl.print_solution()
build_solution(mdl.solution)
Appending to model/main.py
import tarfile
def reset(tarinfo):
tarinfo.uid = tarinfo.gid = 0
tarinfo.uname = tarinfo.gname = "root"
return tarinfo
tar = tarfile.open("model.tar.gz", "w:gz")
tar.add("model/main.py", arcname="main.py", filter=reset)
tar.close()
Store model in Watson Machine Learning with:
Get the model_uid
.
# All available meta data properties
client.repository.ModelMetaNames.show()
------------------------ ---- -------- ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ META_PROP NAME TYPE REQUIRED SCHEMA NAME str Y DESCRIPTION str N INPUT_DATA_SCHEMA list N {'id(required)': 'string', 'fields(required)': [{'name(required)': 'string', 'type(required)': 'string', 'nullable(optional)': 'string'}]} TRAINING_DATA_REFERENCES list N [{'name(optional)': 'string', 'type(required)': 'string', 'connection(required)': {'endpoint_url(required)': 'string', 'access_key_id(required)': 'string', 'secret_access_key(required)': 'string'}, 'location(required)': {'bucket': 'string', 'path': 'string'}, 'schema(optional)': {'id(required)': 'string', 'fields(required)': [{'name(required)': 'string', 'type(required)': 'string', 'nullable(optional)': 'string'}]}}] TEST_DATA_REFERENCES list N [{'name(optional)': 'string', 'type(required)': 'string', 'connection(required)': {'endpoint_url(required)': 'string', 'access_key_id(required)': 'string', 'secret_access_key(required)': 'string'}, 'location(required)': {'bucket': 'string', 'path': 'string'}, 'schema(optional)': {'id(required)': 'string', 'fields(required)': [{'name(required)': 'string', 'type(required)': 'string', 'nullable(optional)': 'string'}]}}] OUTPUT_DATA_SCHEMA dict N {'id(required)': 'string', 'fields(required)': [{'name(required)': 'string', 'type(required)': 'string', 'nullable(optional)': 'string'}]} LABEL_FIELD str N TRANSFORMED_LABEL_FIELD str N TAGS list N ['string', 'string'] SIZE dict N {'in_memory(optional)': 'string', 'content(optional)': 'string'} PIPELINE_ID str N RUNTIME_ID str N TYPE str Y CUSTOM dict N DOMAIN str N HYPER_PARAMETERS dict N METRICS list N IMPORT dict N {'name(optional)': 'string', 'type(required)': 'string', 'connection(required)': {'endpoint_url(required)': 'string', 'access_key_id(required)': 'string', 'secret_access_key(required)': 'string'}, 'location(required)': {'bucket': 'string', 'path': 'string'}} TRAINING_LIB_ID str N MODEL_DEFINITION_ID str N SOFTWARE_SPEC_ID str N TF_MODEL_PARAMS dict N FAIRNESS_INFO dict N ------------------------ ---- -------- ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Use the following code to list deployment spaces and delete any that are no longer needed:
client.spaces.list()
client.spaces.delete("")
client.spaces.get_details("")
# Find the space ID
space_name = "<space_name>"
space_id = [x['metadata']['id'] for x in client.spaces.get_details()['resources'] if x['entity']['name'] == space_name][0]
client = APIClient(credentials, space_id = space_id)
mnist_metadata = {
client.repository.ModelMetaNames.NAME: "Diet",
client.repository.ModelMetaNames.DESCRIPTION: "Model for Diet",
client.repository.ModelMetaNames.TYPE: "do-docplex_22.1",
client.repository.ModelMetaNames.SOFTWARE_SPEC_ID: client.software_specifications.get_id_by_name("do_22.1"),
# OPTIONAL but can be interesting for a better inference of string data if needed
client.repository.ModelMetaNames.INPUT_DATA_SCHEMA: [
{ "id" : "diet_food_nutrients.csv", "fields" : [
{ "name" : "Food", "type" : "string" }, { "name" : "Calories", "type" : "double" }, { "name" : "Calcium", "type" : "double" }, { "name" : "Iron", "type" : "double" },
{ "name" : "Vit_A", "type" : "double" }, { "name" : "Dietary_Fiber", "type" : "double" }, { "name" : "Carbohydrates", "type" : "double" }, { "name" : "Protein", "type" : "double" } ] },
{ "id" : "diet_food.csv", "fields" : [
{ "name" : "name", "type" : "string" }, { "name" : "unit_cost", "type" : "double" }, { "name" : "qmin", "type" : "double" }, { "name" : "qmax", "type" : "double" } ] },
{ "id" : "diet_nutrients.csv", "fields" : [
{ "name" : "name", "type" : "string" }, { "name" : "qmin", "type" : "double" }, { "name" : "qmax", "type" : "double" } ] } ],
# OPTIONAL but can be interesting for a better inference of string data mainly in context of inline data if needed
client.repository.ModelMetaNames.OUTPUT_DATA_SCHEMA: [
{ "id" : "stats.csv", "fields" : [
{ "name" : "Name", "type" : "string" }, { "name" : "Value", "type" : "string" } ] },
{ "id" : "solution.csv", "fields" : [
{ "name" : "name", "type" : "string" }, { "name" : "value", "type" : "double" } ] },
{ "id" : "kpis.csv", "fields" : [
{ "name" : "Name", "type" : "string" }, { "name" : "Value", "type" : "double" } ] } ]
}
model_details = client.repository.store_model(model='/home/wsuser/work/model.tar.gz', meta_props=mnist_metadata)
model_uid = client.repository.get_model_id(model_details)
# print model uid if needed
# print( model_uid )
Create a batch deployment for the model, providing information such as:
Get the deployment_uid
.
meta_props = {
client.deployments.ConfigurationMetaNames.NAME: "Diet Deployment",
client.deployments.ConfigurationMetaNames.DESCRIPTION: "Diet Deployment",
client.deployments.ConfigurationMetaNames.BATCH: {},
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {'name': 'S', 'num_nodes': 1}
}
deployment_details = client.deployments.create(model_uid, meta_props=meta_props)
deployment_uid = client.deployments.get_id(deployment_details)
# print deployment id if needed
# print( deployment_uid )
###################################################################################### Synchronous deployment creation for id: 'e787b573-39b4-49cd-8a20-247c59a15b65' started ###################################################################################### ready. ----------------------------------------------------------------------------------------------- Successfully finished deployment creation, deployment_id='8a4800be-39bf-4ea4-8b12-b3f0aa85120a' -----------------------------------------------------------------------------------------------
# List all existing deployments
client.deployments.list()
ID | NAME | STATE | CREATED | ARTIFACT_TYPE | SPEC_STATE | SPEC_REPLACEMENT | |
---|---|---|---|---|---|---|---|
0 | 8a4800be-39bf-4ea4-8b12-b3f0aa85120a | Diet Deployment | ready | 2024-09-02T13:15:23.620Z | do | supported |
Create a payload containing inline input data.
Create a new job with this payload and the deployment.
Get the job_uid
.
# Import pandas library
import pandas as pd
# Initialize list of lists
diet_food = pd.DataFrame([ ["Roasted Chicken", 0.84, 0, 10],
["Spaghetti W/ Sauce", 0.78, 0, 10],
["Tomato,Red,Ripe,Raw", 0.27, 0, 10],
["Apple,Raw,W/Skin", 0.24, 0, 10],
["Grapes", 0.32, 0, 10],
["Chocolate Chip Cookies", 0.03, 0, 10],
["Lowfat Milk", 0.23, 0, 10],
["Raisin Brn", 0.34, 0, 10],
["Hotdog", 0.31, 0, 10]] , columns = ["name", "unit_cost", "qmin", "qmax"])
diet_food_nutrients = pd.DataFrame([
["Spaghetti W/ Sauce", 358.2, 80.2, 2.3, 3055.2, 11.6, 58.3, 8.2],
["Roasted Chicken", 277.4, 21.9, 1.8, 77.4, 0, 0, 42.2],
["Tomato,Red,Ripe,Raw", 25.8, 6.2, 0.6, 766.3, 1.4, 5.7, 1],
["Apple,Raw,W/Skin", 81.4, 9.7, 0.2, 73.1, 3.7, 21, 0.3],
["Grapes", 15.1, 3.4, 0.1, 24, 0.2, 4.1, 0.2],
["Chocolate Chip Cookies", 78.1, 6.2, 0.4, 101.8, 0, 9.3, 0.9],
["Lowfat Milk", 121.2, 296.7, 0.1, 500.2, 0, 11.7, 8.1],
["Raisin Brn", 115.1, 12.9, 16.8, 1250.2, 4, 27.9, 4],
["Hotdog", 242.1, 23.5, 2.3, 0, 0, 18, 10.4]
] , columns = ["Food", "Calories", "Calcium", "Iron", "Vit_A", "Dietary_Fiber", "Carbohydrates", "Protein"])
diet_nutrients = pd.DataFrame([
["Calories", 2000, 2500],
["Calcium", 800, 1600],
["Iron", 10, 30],
["Vit_A", 5000, 50000],
["Dietary_Fiber", 25, 100],
["Carbohydrates", 0, 300],
["Protein", 50, 100]
], columns = ["name", "qmin", "qmax"])
If you want to enable intermediate solutions for your solve, uncomment the oaas.outputUploadPeriod
solve parameter (expressed in minutes).
You can also uncomment oaas.outputUploadFiles
if you want to get logs at each intermediate solution.
solve_payload = {
"solve_parameters" : {
#"oaas.outputUploadPeriod": "1",
#"oaas.outputUploadFiles": ".*\.txt",
"oaas.logAttachmentName":"log.txt",
"oaas.logTailEnabled":"true"
},
client.deployments.DecisionOptimizationMetaNames.INPUT_DATA: [
{
"id":"diet_food.csv",
"values" : diet_food
},
{
"id":"diet_food_nutrients.csv",
"values" : diet_food_nutrients
},
{
"id":"diet_nutrients.csv",
"values" : diet_nutrients
}
],
client.deployments.DecisionOptimizationMetaNames.OUTPUT_DATA: [
{
"id":".*\.csv"
}
],
client.deployments.DecisionOptimizationMetaNames.OUTPUT_DATA_REFERENCES: [
{
"id": "log.txt",
"type": "data_asset",
"connection": {},
"location": {"name":"job_${oaas_job_id}_log_${oaas_update_time}.txt"}
}
]
}
job_details = client.deployments.create_job(deployment_uid, solve_payload)
job_uid = client.deployments.get_job_id(job_details)
# print job id if needed
# print( job_uid )
Display job status until it is completed.
The first job of a new deployment might take some time as a compute node must be started.
from time import sleep
while job_details['entity']['decision_optimization']['status']['state'] not in ['completed', 'failed', 'canceled']:
print(job_details['entity']['decision_optimization']['status']['state'] + '...')
if('solve_state' in job_details['entity']['decision_optimization']
and 'details' in job_details['entity']['decision_optimization']['solve_state']
and 'latestOutputUpload' in job_details['entity']['decision_optimization']['solve_state']['details']):
print( " Intermediate captured at: ", job_details['entity']['decision_optimization']['solve_state']['details']['latestOutputUpload'])
else:
print( " No intermediate available" )
sleep(5)
job_details=client.deployments.get_job_details(job_uid)
if job_details['entity']['decision_optimization']['status']['state'] in ['failed']:
print( job_details['entity']['decision_optimization']['status'] )
else:
print( job_details['entity']['decision_optimization']['status']['state'] )
queued... No intermediate available queued... No intermediate available queued... No intermediate available queued... No intermediate available queued... No intermediate available queued... No intermediate available queued... No intermediate available running... No intermediate available running... No intermediate available running... No intermediate available running... No intermediate available running... No intermediate available running... No intermediate available running... No intermediate available running... No intermediate available completed
# Create a dataframe for the solution
solution = pd.DataFrame(job_details['entity']['decision_optimization']['output_data'][0]['values'],
columns = job_details['entity']['decision_optimization']['output_data'][0]['fields'])
solution.head()
Name | Value | |
---|---|---|
0 | Total Carbohydrates | 256.805764 |
1 | PROGRESS_CURRENT_OBJECTIVE | 2.690409 |
2 | Total Vit_A | 8518.432542 |
3 | Total Iron | 11.278318 |
4 | Total Calories | 2000.000000 |
print( job_details['entity']['decision_optimization']['solve_state']['details']['KPI.Total Calories'] )
2000.0
# Change the input data
diet_nutrients.at[0,'qmin'] = 1500
diet_nutrients.at[0,'qmax'] = 2000
solve_payload = {
client.deployments.DecisionOptimizationMetaNames.INPUT_DATA: [
{
"id":"diet_food.csv",
"values" : diet_food
},
{
"id":"diet_food_nutrients.csv",
"values" : diet_food_nutrients
},
{
"id":"diet_nutrients.csv",
"values" : diet_nutrients
}
],
client.deployments.DecisionOptimizationMetaNames.OUTPUT_DATA: [
{
"id":".*\.csv"
}
]
}
Create a new job.
job_details = client.deployments.create_job(deployment_uid, solve_payload)
job_uid = client.deployments.get_job_id(job_details)
# print job id if needed
# print( job_uid )
Display job status until it is completed.
while job_details['entity']['decision_optimization']['status']['state'] not in ['completed', 'failed', 'canceled']:
print(job_details['entity']['decision_optimization']['status']['state'] + '...')
sleep(5)
job_details=client.deployments.get_job_details(job_uid)
print( job_details['entity']['decision_optimization']['status']['state'])
queued... running... completed
Display the KPI Total Calories value for this modified data.
print( job_details['entity']['decision_optimization']['solve_state']['details']['KPI.Total Calories'] )
1500.0
print(client.deployments.get_job_details(job_uid)['entity']['decision_optimization']['status'])
{'completed_at': '2024-09-02T13:17:01.528Z', 'running_at': '2024-09-02T13:16:59.936Z', 'state': 'completed'}
Use the following method to delete the deployment.
client.deployments.delete(deployment_uid)
'SUCCESS'
You've successfully completed this notebook!
You've learned how to:
Check out our online documentation for more samples, tutorials and documentation: