Use Spark to predict business area for car rental company with ibm-watsonx-ai
¶
This notebook contains steps and code to create a predictive model, and deploy it on WML. This notebook introduces commands for pipeline creation, model training, model persistance to Watson Machine Learning repository, model deployment, and scoring.
Some familiarity with Python is helpful. This notebook uses Python 3.11 and Apache® Spark 3.4.
You will use car_rental_training dataset.
Learning goals¶
The learning goals of this notebook are:
- Load a CSV file into an Apache® Spark DataFrame.
- Explore data.
- Prepare data for training and evaluation.
- Create an Apache® Spark machine learning pipeline.
- Train and evaluate a model.
- Persist a pipeline and model in Watson Machine Learning repository.
- Deploy a model for online scoring using Wastson Machine Learning API.
- Score sample scoring data using the Watson Machine Learning API.
Contents¶
This notebook contains the following parts:
Note: This notebook works correctly with kernel Python 3.11 with Spark 3.4
, please do not change kernel.
1. Set up the environment¶
Before you use the sample code in this notebook, you must perform the following setup tasks:
- Create a Watson Machine Learning (WML) Service instance (a free plan is offered and information about how to create the instance can be found here).
!pip install wget | tail -n 1
!pip install pyspark==3.4.3 | tail -n 1
!pip install -U ibm-watsonx-ai | tail -n 1
Connection to WML¶
Authenticate the Watson Machine Learning service on IBM Cloud. You need to provide platform api_key
and instance location
.
You can use IBM Cloud CLI to retrieve platform API Key and instance location.
API Key can be generated in the following way:
ibmcloud login
ibmcloud iam api-key-create API_KEY_NAME
In result, get the value of api_key
from the output.
Location of your WML instance can be retrieved in the following way:
ibmcloud login --apikey API_KEY -a https://cloud.ibm.com
ibmcloud resource service-instance WML_INSTANCE_NAME
In result, get the value of location
from the output.
Tip: Your Cloud API key
can be generated by going to the Users section of the Cloud console. From that page, click your name, scroll down to the API Keys section, and click Create an IBM Cloud API key. Give your key a name and click Create, then copy the created key and paste it below. You can also get a service specific url by going to the Endpoint URLs section of the Watson Machine Learning docs. You can check your instance location in your Watson Machine Learning (WML) Service instance details.
You can also get service specific apikey by going to the Service IDs section of the Cloud Console. From that page, click Create, then copy the created key and paste it below.
Action: Enter your api_key
and location
in the following cell.
api_key = 'PASTE YOUR PLATFORM API KEY HERE'
location = 'PASTE YOUR INSTANCE LOCATION HERE'
from ibm_watsonx_ai import Credentials
credentials = Credentials(
api_key=api_key,
url='https://' + location + '.ml.cloud.ibm.com'
)
from ibm_watsonx_ai import APIClient
client = APIClient(credentials)
Working with spaces¶
First of all, you need to create a space that will be used for your work. If you do not have space already created, you can use Deployment Spaces Dashboard to create one.
- Click New Deployment Space
- Create an empty space
- Select Cloud Object Storage
- Select Watson Machine Learning instance and press Create
- Copy
space_id
and paste it below
Tip: You can also use SDK to prepare the space for your work. More information can be found here.
Action: Assign space ID below
space_id = 'PASTE YOUR SPACE ID HERE'
You can use list
method to print all existing spaces.
client.spaces.list(limit=10)
To be able to interact with all resources available in Watson Machine Learning, you need to set space which you will be using.
client.set.default_space(space_id)
'SUCCESS'
Test Spark¶
try:
from pyspark.sql import SparkSession
except:
print('Error: Spark runtime is missing. If you are using Watson Studio change the notebook runtime to Spark.')
raise
2. Load and explore data¶
In this section you will load the data as an Apache Spark DataFrame and perform a basic exploration.
Read data into Spark DataFrame from file.
import os
from wget import download
sample_dir = 'spark_sample_model'
if not os.path.isdir(sample_dir):
os.mkdir(sample_dir)
filename = os.path.join(sample_dir, 'car_rental_training_data.csv')
if not os.path.isfile(filename):
filename = download('https://github.com/IBM/watson-machine-learning-samples/raw/master/cloud/data/cars-4-you/car_rental_training_data.csv', out=sample_dir)
spark = SparkSession.builder.getOrCreate()
df_data = spark.read\
.format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
.option('header', 'true')\
.option('inferSchema', 'true')\
.option("delimiter", ";")\
.load(filename)
df_data.take(3)
Explore data¶
df_data.printSchema()
root |-- ID: integer (nullable = true) |-- Gender: string (nullable = true) |-- Status: string (nullable = true) |-- Children: integer (nullable = true) |-- Age: double (nullable = true) |-- Customer_Status: string (nullable = true) |-- Car_Owner: string (nullable = true) |-- Customer_Service: string (nullable = true) |-- Satisfaction: integer (nullable = true) |-- Business_Area: string (nullable = true) |-- Action: string (nullable = true)
As you can see, the data contains eleven fields. Business_Area
field is the one you would like to predict using feedback data in Customer_Service
field.
print("Number of records: " + str(df_data.count()))
Number of records: 486
Let's see distribution of target field.
df_data.select('Business_Area').groupBy('Business_Area').count().show(truncate=False)
+----------------------------------+-----+ |Business_Area |count| +----------------------------------+-----+ |Service: Accessibility |26 | |Product: Functioning |150 | |Service: Attitude |24 | |Service: Orders/Contracts |32 | |Product: Availability/Variety/Size|42 | |Product: Pricing and Billing |24 | |Product: Information |8 | |Service: Knowledge |180 | +----------------------------------+-----+
3. Create an Apache Spark machine learning model¶
In this section you will learn how to:
3.1 Prepare data for model training and evaluation¶
In this subsection you will split your data into: train and test data set.
train_data, test_data = df_data.select("ID", "Customer_Service", "Business_Area").randomSplit([0.8, 0.2], 24)
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))
Number of training records: 401 Number of testing records : 85
3.2 Create the pipeline¶
In this section you will create an Apache Spark machine learning pipeline and then train the model.
from pyspark.ml.feature import StringIndexer, IndexToString, HashingTF, IDF, Tokenizer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.sql.types import *
In the first data preprocessing step, create features from Customer_Service
field.
tokenizer = Tokenizer(inputCol="Customer_Service", outputCol="words")
hashing_tf = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='hash')
idf = IDF(inputCol=hashing_tf.getOutputCol(), outputCol="features", minDocFreq=5)
In the following step, use the StringIndexer transformer to convert Business_Area
to numeric.
string_indexer_label = StringIndexer(inputCol="Business_Area", outputCol="label").fit(train_data)
Add decision tree model to predict Business_Area
.
dt_area = DecisionTreeClassifier(labelCol="label", featuresCol=idf.getOutputCol())
Finally, setup transformer to convert the indexed labels back to original labels.
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=string_indexer_label.labels)
pipeline = Pipeline(stages=[tokenizer, hashing_tf, idf, string_indexer_label, dt_area, label_converter])
3.3 Train the model¶
In this subsection you will train model and evaluate its accuracy.
model = pipeline.fit(train_data)
predictions = model.transform(test_data)
predictions.select('Customer_Service','Business_Area','predictedLabel').show(3)
24/03/06 14:19:34 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB +--------------------+--------------------+------------------+ | Customer_Service| Business_Area| predictedLabel| +--------------------+--------------------+------------------+ |I do not underst...|Product: Pricing ...|Service: Knowledge| |I was penalty cha...|Product: Pricing ...|Service: Knowledge| |My experience was...|Product: Pricing ...|Service: Knowledge| +--------------------+--------------------+------------------+ only showing top 3 rows
predictions.printSchema()
root |-- ID: integer (nullable = true) |-- Customer_Service: string (nullable = true) |-- Business_Area: string (nullable = true) |-- words: array (nullable = true) | |-- element: string (containsNull = true) |-- hash: vector (nullable = true) |-- features: vector (nullable = true) |-- label: double (nullable = false) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = false) |-- predictedLabel: string (nullable = true)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %3.2f" % accuracy)
24/03/06 14:19:41 WARN DAGScheduler: Broadcasting large task binary with size 4.1 MiB Accuracy = 0.54
Note: Accuracy of the model is low, however based on customer comment more than one Business Area could be selected. In such cases top k (for example k=3) would be more suited for model evaluation.
4. Persist model¶
In this section you will learn how to store your pipeline and model in Watson Machine Learning repository by using python client libraries.
Note: Apache® Spark 3.4 is required.
Save training data in your Cloud Object Storage¶
ibm-cos-sdk library allows Python developers to manage Cloud Object Storage (COS).
import ibm_boto3
from ibm_botocore.client import Config
Action: Put credentials from Object Storage Service in Bluemix here.
cos_credentials = {
"apikey": "***",
"cos_hmac_keys": {
"access_key_id": "***",
"secret_access_key": "***"
},
"endpoints": "***",
"iam_apikey_description": "***",
"iam_apikey_name": "***",
"iam_role_crn": "***",
"iam_serviceid_crn": "***",
"resource_instance_id": "***"
}
connection_apikey = cos_credentials['apikey']
connection_resource_instance_id = cos_credentials["resource_instance_id"]
connection_access_key_id = cos_credentials['cos_hmac_keys']['access_key_id']
connection_secret_access_key = cos_credentials['cos_hmac_keys']['secret_access_key']
Action: Define the service endpoint we will use.
Tip: You can find this information in Endpoints section of your Cloud Object Storage intance's dashbord.
service_endpoint = 'https://s3.us.cloud-object-storage.appdomain.cloud'
You also need IBM Cloud authorization endpoint to be able to create COS resource object.
auth_endpoint = 'https://iam.cloud.ibm.com/identity/token'
We create COS resource to be able to write data to Cloud Object Storage.
cos = ibm_boto3.resource('s3',
ibm_api_key_id=cos_credentials['apikey'],
ibm_service_instance_id=cos_credentials['resource_instance_id'],
ibm_auth_endpoint=auth_endpoint,
config=Config(signature_version='oauth'),
endpoint_url=service_endpoint)
Now you will create bucket in COS and copy training dataset
for model from car_rental_training_data.csv.
from uuid import uuid4
bucket_id = str(uuid4())
score_filename = "car_rental_training_data.csv"
buckets = ["car-rental-" + bucket_id]
for bucket in buckets:
if not cos.Bucket(bucket) in cos.buckets.all():
print('Creating bucket "{}"...'.format(bucket))
try:
cos.create_bucket(Bucket=bucket)
except ibm_boto3.exceptions.ibm_botocore.client.ClientError as e:
print('Error: {}.'.format(e.response['Error']['Message']))
Creating bucket "car-rental-86243bdc-fc02-4318-9dbf-8b282fe956d9"...
bucket_obj = cos.Bucket(buckets[0])
print('Uploading data {}...'.format(score_filename))
with open(filename, 'rb') as f:
bucket_obj.upload_fileobj(f, score_filename)
print('{} is uploaded.'.format(score_filename))
Uploading data car_rental_training_data.csv... car_rental_training_data.csv is uploaded.
Create connections to a COS bucket¶
datasource_type = client.connections.get_datasource_type_id_by_name('bluemixcloudobjectstorage')
conn_meta_props= {
client.connections.ConfigurationMetaNames.NAME: "COS connection - spark",
client.connections.ConfigurationMetaNames.DATASOURCE_TYPE: datasource_type,
client.connections.ConfigurationMetaNames.PROPERTIES: {
'bucket': buckets[0],
'access_key': connection_access_key_id,
'secret_key': connection_secret_access_key,
'iam_url': auth_endpoint,
'url': service_endpoint
}
}
conn_details = client.connections.create(meta_props=conn_meta_props)
Creating connections... SUCCESS
Note: The above connection can be initialized alternatively with api_key
and resource_instance_id
.
The above cell can be replaced with:
conn_meta_props= {
client.connections.ConfigurationMetaNames.NAME: f"Connection to Database - {db_name} ",
client.connections.ConfigurationMetaNames.DATASOURCE_TYPE: client.connections.get_datasource_type_id_by_name(db_name),
client.connections.ConfigurationMetaNames.DESCRIPTION: "Connection to external Database",
client.connections.ConfigurationMetaNames.PROPERTIES: {
'bucket': bucket_name,
'api_key': cos_credentials['apikey'],
'resource_instance_id': cos_credentials['resource_instance_id'],
'iam_url': 'https://iam.cloud.ibm.com/identity/token',
'url': 'https://s3.us.cloud-object-storage.appdomain.cloud'
}
}
conn_details = client.connections.create(meta_props=conn_meta_props)
connection_id = client.connections.get_id(conn_details)
4.2 Save the pipeline and model¶
training_data_references = [
{
"id":"car-rental-training",
"type": "connection_asset",
"connection": {
"id": connection_id
},
"location": {
"bucket": buckets[0],
"file_name": score_filename,
}
}
]
saved_model = client.repository.store_model(
model=model,
meta_props={
client.repository.ModelMetaNames.NAME:"CARS4U - Business Area Prediction Modeljj",
client.repository.ModelMetaNames.TYPE: "mllib_3.4",
client.repository.ModelMetaNames.SOFTWARE_SPEC_ID: client.software_specifications.get_id_by_name('spark-mllib_3.4'),
client.repository.ModelMetaNames.TRAINING_DATA_REFERENCES: training_data_references,
client.repository.ModelMetaNames.LABEL_FIELD: "Business_Area",
},
training_data=train_data,
pipeline=pipeline)
Get saved model metadata from Watson Machine Learning.
published_model_id = client.repository.get_model_id(saved_model)
print("Model Id: " + str(published_model_id))
Model Id: 81a8fb9a-d082-4a6b-80cf-d1e40fb16e57
Model Id can be used to retrive latest model version from Watson Machine Learning instance.
Below you can see stored model details.
client.repository.get_model_details(published_model_id)
5. Deploy model in the IBM Cloud¶
In this section you will learn how to create model deployment in the IBM Cloud and retreive information about scoring endpoint.
deployment_details = client.deployments.create(
published_model_id,
meta_props={
client.deployments.ConfigurationMetaNames.NAME: "CARS4U - Business Area Prediction Model deployment",
client.deployments.ConfigurationMetaNames.ONLINE: {}
}
)
####################################################################################### Synchronous deployment creation for uid: '81a8fb9a-d082-4a6b-80cf-d1e40fb16e57' started ####################################################################################### initializing Note: online_url and serving_urls are deprecated and will be removed in a future release. Use inference instead. ready ------------------------------------------------------------------------------------------------ Successfully finished deployment creation, deployment_uid='33b650d3-90af-4aca-94e2-87c5c6b69e8b' ------------------------------------------------------------------------------------------------
deployment_details
6. Score¶
fields = ['ID', 'Gender', 'Status', 'Children', 'Age', 'Customer_Status','Car_Owner', 'Customer_Service', 'Business_Area', 'Satisfaction']
values = [3785, 'Male', 'S', 1, 17, 'Inactive', 'Yes', 'The car should have been brought to us instead of us trying to find it in the lot.', 'Product: Information', 0]
import json
payload_scoring = {"input_data": [{"fields": fields,"values": [values]}]}
scoring_response = client.deployments.score(client.deployments.get_id(deployment_details), payload_scoring)
print(json.dumps(scoring_response, indent=3))
{ "predictions": [ { "fields": [ "ID", "Gender", "Status", "Children", "Age", "Customer_Status", "Car_Owner", "Customer_Service", "Business_Area", "Satisfaction", "words", "hash", "features", "label", "rawPrediction", "probability", "prediction", "predictedLabel" ], "values": [ [ 3785, "Male", "S", 1.0, 17.0, "Inactive", "Yes", "The car should have been brought to us instead of us trying to find it in the lot.", "Product: Information", 0.0, [ "the", "car", "should", "have", "been", "brought", "to", "us", "instead", "of", "us", "trying", "to", "find", "it", "in", "the", "lot." ], [ 262144, [ 27576, 30950, 33123, 49013, 52351, 71578, 91878, 95889, 109156, 212044, 219087, 227152, 227431, 250855, 253475 ], [ 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0 ] ], [ 262144, [ 27576, 30950, 33123, 49013, 52351, 71578, 91878, 95889, 109156, 212044, 219087, 227152, 227431, 250855, 253475 ], [ 1.5094101471187573, 1.5305439699644374, 4.204692619390966, 3.5115454388310208, 0.0, 1.136639684257349, 3.288401887516811, 1.2986891158031049, 6.0014396301300605, 4.050541939563708, 1.747956846569662, 0.0, 3.5985568158206505, 1.8220648187233839, 1.8693177035739295 ] ], 7.0, [ 127.0, 91.0, 14.0, 21.0, 21.0, 13.0, 17.0, 6.0 ], [ 0.4096774193548387, 0.29354838709677417, 0.04516129032258064, 0.06774193548387097, 0.06774193548387097, 0.041935483870967745, 0.054838709677419356, 0.01935483870967742 ], 0.0, "Service: Knowledge" ] ] } ] }
7. Clean up¶
If you want to clean up all created assets:
- experiments
- trainings
- pipelines
- model definitions
- models
- functions
- deployments
please follow up this sample notebook.
8. Summary and next steps¶
You successfully completed this notebook! You learned how to use Apache Spark machine learning as well as Watson Machine Learning for model creation and deployment. Check out our Online Documentation for more samples, tutorials, documentation, how-tos, and blog posts.
Authors¶
Amadeusz Masny, Python Software Developer in Watson Machine Learning at IBM
Mateusz Szewczyk, Software Engineer at Watson Machine Learning
Copyright © 2020-2024 IBM. This notebook and its source code are released under the terms of the MIT License.