Run the following sequence of cells at least once in your notebook environment in order to install required packages: - pyarrow - provides rich, powerful features for working with columnar data - sqlparse - a non-validating SQL parser - ibmcloudsql - Data Engine client library
!pip install pyarrow
!pip install sqlparse
!pip uninstall --yes autoai-libs tensorflow-text numba
Requirement already satisfied: pyarrow in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (5.0.0) Requirement already satisfied: numpy>=1.16.6 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from pyarrow) (1.20.3) Collecting sqlparse Downloading sqlparse-0.4.2-py3-none-any.whl (42 kB) |████████████████████████████████| 42 kB 503 kB/s eta 0:00:01 Installing collected packages: sqlparse Successfully installed sqlparse-0.4.2 Found existing installation: autoai-libs 1.13.1 Uninstalling autoai-libs-1.13.1: Successfully uninstalled autoai-libs-1.13.1 Found existing installation: tensorflow-text 2.7.3 Uninstalling tensorflow-text-2.7.3: Successfully uninstalled tensorflow-text-2.7.3 Found existing installation: numba 0.54.1 Uninstalling numba-0.54.1: Successfully uninstalled numba-0.54.1
!pip install --upgrade ibmcloudsql
Collecting ibmcloudsql Downloading ibmcloudsql-0.5.10.tar.gz (57 kB) |████████████████████████████████| 57 kB 4.0 MB/s eta 0:00:01 Installing build dependencies ... done Getting requirements to build wheel ... done Preparing wheel metadata ... done Collecting sqlparse>=0.4.2 Downloading sqlparse-0.4.3-py3-none-any.whl (42 kB) |████████████████████████████████| 42 kB 1.2 MB/s eta 0:00:01 Collecting backoff==1.10.0 Downloading backoff-1.10.0-py2.py3-none-any.whl (31 kB) Requirement already satisfied: typing-extensions in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (4.1.1) Collecting deprecated Downloading Deprecated-1.2.13-py2.py3-none-any.whl (9.6 kB) Requirement already satisfied: importlib-metadata in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (4.8.2) Requirement already satisfied: requests>=2.2.0 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (2.26.0) Requirement already satisfied: pyarrow in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (5.0.0) Requirement already satisfied: numpy>=1.20.3 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (1.20.3) Collecting pre-commit Downloading pre_commit-2.20.0-py2.py3-none-any.whl (199 kB) |████████████████████████████████| 199 kB 67.2 MB/s eta 0:00:01 Requirement already satisfied: ibm-cos-sdk>=2.10.0 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (2.11.0) Requirement already satisfied: packaging in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (21.3) Requirement already satisfied: ibm-cos-sdk-core>=2.10.0 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (2.11.0) Requirement already satisfied: python-dateutil in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (2.8.2) Requirement already satisfied: pandas>=1.1.0 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibmcloudsql) (1.3.4) Collecting isodate Downloading isodate-0.6.1-py2.py3-none-any.whl (41 kB) |████████████████████████████████| 41 kB 489 kB/s eta 0:00:01 Requirement already satisfied: ibm-cos-sdk-s3transfer==2.11.0 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibm-cos-sdk>=2.10.0->ibmcloudsql) (2.11.0) Requirement already satisfied: jmespath<1.0.0,>=0.7.1 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibm-cos-sdk>=2.10.0->ibmcloudsql) (0.10.0) Requirement already satisfied: urllib3<1.27,>=1.26.7 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from ibm-cos-sdk-core>=2.10.0->ibmcloudsql) (1.26.7) Requirement already satisfied: pytz>=2017.3 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from pandas>=1.1.0->ibmcloudsql) (2021.3) Requirement already satisfied: six>=1.5 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from python-dateutil->ibmcloudsql) (1.15.0) Requirement already satisfied: idna<4,>=2.5 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from requests>=2.2.0->ibmcloudsql) (3.3) Requirement already satisfied: certifi>=2017.4.17 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from requests>=2.2.0->ibmcloudsql) (2022.6.15) Requirement already satisfied: charset-normalizer~=2.0.0 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from requests>=2.2.0->ibmcloudsql) (2.0.4) Requirement already satisfied: wrapt<2,>=1.10 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from deprecated->ibmcloudsql) (1.12.1) Requirement already satisfied: zipp>=0.5 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from importlib-metadata->ibmcloudsql) (3.6.0) Requirement already satisfied: pyparsing!=3.0.5,>=2.0.2 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from packaging->ibmcloudsql) (3.0.4) Collecting virtualenv>=20.0.8 Downloading virtualenv-20.16.5-py3-none-any.whl (8.8 MB) |████████████████████████████████| 8.8 MB 74.8 MB/s eta 0:00:01 Requirement already satisfied: toml in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from pre-commit->ibmcloudsql) (0.10.2) Collecting nodeenv>=0.11.1 Downloading nodeenv-1.7.0-py2.py3-none-any.whl (21 kB) Requirement already satisfied: pyyaml>=5.1 in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from pre-commit->ibmcloudsql) (5.4.1) Collecting identify>=1.0.0 Downloading identify-2.5.5-py2.py3-none-any.whl (98 kB) |████████████████████████████████| 98 kB 7.7 MB/s eta 0:00:01 Collecting cfgv>=2.0.0 Downloading cfgv-3.3.1-py2.py3-none-any.whl (7.3 kB) Requirement already satisfied: setuptools in /opt/ibm/conda/miniconda3.9/lib/python3.9/site-packages (from nodeenv>=0.11.1->pre-commit->ibmcloudsql) (58.0.4) Collecting platformdirs<3,>=2.4 Downloading platformdirs-2.5.2-py3-none-any.whl (14 kB) Collecting filelock<4,>=3.4.1 Downloading filelock-3.8.0-py3-none-any.whl (10 kB) Collecting distlib<1,>=0.3.5 Downloading distlib-0.3.6-py2.py3-none-any.whl (468 kB) |████████████████████████████████| 468 kB 48.9 MB/s eta 0:00:01 Building wheels for collected packages: ibmcloudsql Building wheel for ibmcloudsql (PEP 517) ... done Created wheel for ibmcloudsql: filename=ibmcloudsql-0.5.10-py3-none-any.whl size=59142 sha256=f1988663a36aaa3baa92ec30ff59d5f59f8e843db321ce529052291eb587819d Stored in directory: /home/spark/shared/.cache/pip/wheels/05/2c/f5/0a787ef991cdd05edc4b66b54b251cd5cc6b19202e83ec197d Successfully built ibmcloudsql Installing collected packages: platformdirs, filelock, distlib, virtualenv, nodeenv, identify, cfgv, sqlparse, pre-commit, isodate, deprecated, backoff, ibmcloudsql Successfully installed backoff-1.10.0 cfgv-3.3.1 deprecated-1.2.13 distlib-0.3.6 filelock-3.8.0 ibmcloudsql-0.5.10 identify-2.5.5 isodate-0.6.1 nodeenv-1.7.0 platformdirs-2.5.2 pre-commit-2.20.0 sqlparse-0.4.3 virtualenv-20.16.5
Import (load) the following packages:
- ibmcloudsql - IBM Data Engine python client
- sqlparse - a non-validating SQL parser
- pandas - an open source data manipulation and analysis tool
- pygments - a syntax highlighter library used to pretty print the SQL statements
import ibmcloudsql
import sqlparse
import pandas as pd
import getpass
import pprint
from pygments import highlight
from pygments.lexers import get_lexer_by_name
from pygments.formatters import HtmlFormatter, Terminal256Formatter
lexer = get_lexer_by_name("sql", stripall=True)
formatter = Terminal256Formatter(style='vim')
apikey=''
instancecrn=''
targeturl=''
You need an API key for an IBM cloud identity. This single key provides you accesses to both your Cloud Object Storage (COS) bucket for writing SQL results and to your Data Engine instance. To create API keys log on to the IBM Cloud console and go to Manage->Access (IAM): then select API Keys, click the Create an IBM Cloud API key
button, give the key a custom name and click Create
. In the next dialog click Show
and copy the key to your clipboard and paste it below in this notebook.
You need the instance CRN for the Data Engine instance. If you don't have an Data Engine instance created yet, create one first. If you already have one, you can find it in the IBM Cloud console dashboard. Make sure you select the right Resource Groups
for Group. In the section Services
you can see different types of services (created for the selected Group), and Data Engine instances have the icon like the one at the top of this notebook. Select the instance of Data Engine that you want to use. In the Data Engine dashboard page that opens up you find a section titled Overview with Deployment Details and copy the text after CRN. Click the button to copy the CRN into your clipboard and paste it here into the notebook.
You need to specify the location on COS where your query results should be written.This is because Data Engine instance, to process an ETL SQL statement, needs to store queried data on COS.
This COS location comprises three parts of information that you can find in the Cloud Object Storage UI for your instance in the IBM Cloud console. You need to provide it as a target_cos_url using the format cos://<endpoint>/<bucket>/[<prefix>]
.
In case you want to use the cloud object storage bucket that is associated with your Watson Studio project as target location for your query results you can make use of the project token for access and construct the target_cos_url as follows:
Only follow the instructions in this section when you want to write your SQL query results to the bucket that has been created for the project for which you have created this notebook. In any other case proceed directly with section 2.2.
Inserting the project token:
Click the More
option in the toolbar above (the three stacked dots) and select Insert project token
.
Access tokens
and click New token
. Give the token a custom name and make sure you select Editor
as Access role for project
. After you created your access token you can come back to this notebook, select the empty cell below and again select Insert project token
from the toolbar at the top.This will add a new cell at the top of your notebook with content that looks like this:
# @hidden_cell
# The project token is an authorization token that is used to access project resources like data sources, connections, and used by platform APIs.
from project_lib import Project
project = Project(project_id='<some id>', project_access_token='<some access token>')
pc = project.project_context
Leave that cell content as inserted and run the cell. Then then proceed with the following cell below:
cos_bucket = project.get_metadata()['entity']['storage']['properties']
targeturl="cos://" + cos_bucket['bucket_region'] + "/" + cos_bucket['bucket_name'] + "/"
targeturl
Now let's instantiate and configure an Data Engine Client. During instantiation the client accesses the Data Engine instance you selected and returns the link to the Data Engine service instance web console. You can use this link later to author an SQL statement interactively.
In addition to the parameters entered interactively here, there is two more optional parameters max_concurrent_jobs (from version 0.4) and max_tries
For details see use.
if apikey == '':
apikey=getpass.getpass('Enter IBM Cloud API Key: ')
else:
apikey=getpass.getpass('Enter a new IBM Cloud API Key or leave empty to use the previous one: ') or apikey
if instancecrn == '':
instancecrn=input('Enter Data Engine instance CRN to use: ')
else:
instancecrn=input('Enter new Data Engine instance CRN to use (leave empty to use ' + instancecrn + '): ') or instancecrn
if targeturl == '':
targeturl=input('Enter target URL for SQL results: ')
else:
targeturl=input('Enter new target URL for SQL results (leave empty to use ' + targeturl + '): ') or targeturl
sqlClient = ibmcloudsql.SQLQuery(apikey, instancecrn, client_info='Data Engine Starter Notebook', target_cos_url=targeturl, max_concurrent_jobs=4, max_tries=3 )
#sqlClient.configure() # use this if you want to change the API key or Data Engine CRN later
sqlClient.logon()
Enter IBM Cloud API Key: ········ Enter Data Engine instance CRN to use: crn:v1:bluemix:public:sql-query:us-south:a/d86af7367f70fba4f306d3c19c938f2f:d1b2c005-e3d8-48c0-9247-e9726a7ed510:: Enter target URL for SQL results: cos://us-south/sqltempregional/
print('\nYour Data Engine web console link:\n')
sqlClient.sql_ui_link()
Your Data Engine web console link: https://dataengine.cloud.ibm.com/sqlquery/?instance_crn=crn:v1:bluemix:public:sql-query:us-south:a/d86af7367f70fba4f306d3c19c938f2f:d1b2c005-e3d8-48c0-9247-e9726a7ed510::
'https://dataengine.cloud.ibm.com/sqlquery/?instance_crn=crn:v1:bluemix:public:sql-query:us-south:a/d86af7367f70fba4f306d3c19c938f2f:d1b2c005-e3d8-48c0-9247-e9726a7ed510::'
In order to work with your data using Data Engine you need to know the schema of your data. Since ibmcloudsql
version 0.4, you can directly query the schema of the data. Let's have a look at the employees.parquet dataset provided with Data Engine as a sample dataset. Note, that the first invocation uses the parameter dry_run=True
which is done here to demonstrate you can get a preview of the statement being executed.
NOTE: If you want to work with hive table, there is another API: describe_table()
that returns the schema of your defined table.
sqlClient.get_schema_data("cos://us-geo/sql/employees.parquet", type="parquet", dry_run=True)
sqlClient.get_schema_data("cos://us-geo/sql/employees.parquet", type="parquet")
Now that we know the schema of our data sets we can create an SQL statement to be executed on these data. You provide the SQL statement in the form of a string. In the subsequent cell a sample SQL statement on the above listed example datasets provided with Data Engine is given. You can either use that sample statement for the subsequent steps or copy and paste your own statement, which you may have authored using the Data Engine Web Console (link above) of your Data Engine service instance.
sql=input('Enter your SQL statement (leave empty to use a simple sample SQL)')
if sql == '':
sql='SELECT o.OrderID, c.CompanyName, e.FirstName, e.LastName FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o, \
cos://us-geo/sql/employees.parquet STORED AS PARQUET e, cos://us-geo/sql/customers.parquet STORED AS PARQUET c \
WHERE e.EmployeeID = o.EmployeeID AND c.CustomerID = o.CustomerID AND o.ShippedDate > o.RequiredDate AND o.OrderDate > "1998-01-01" \
ORDER BY c.CompanyName'
if " INTO " not in sql:
sql += ' INTO {}myQueryResult STORED AS CSV'.format(targeturl)
formatted_sql = sqlparse.format(sql, reindent=True, indent_tabs=True, keyword_case='upper')
lexer = get_lexer_by_name("sql", stripall=True)
formatter = Terminal256Formatter(style='tango')
result = highlight(formatted_sql, lexer, formatter)
from IPython.core.display import display, HTML
print('\nYour SQL statement is:\n')
print(result)
sqlClient.reset_()
(sqlClient.select_("o.OrderID, c.CompanyName, e.FirstName, e.LastName")
.from_cos_("cos://us-geo/sql/orders.parquet", format_type="parquet", alias="o")
.from_cos_("cos://us-geo/sql/employees.parquet", format_type="parquet", alias="e")
.from_cos_("cos://us-geo/sql/customers.parquet", alias="c")
.where_('e.EmployeeID = o.EmployeeID AND c.CustomerID = o.CustomerID AND o.ShippedDate > o.RequiredDate AND o.OrderDate > "1998-01-01"')
.order_by_("c.CompanyName")
.store_at_(targeturl + "myResult", format_type="csv")
)
sqlClient.print_sql()
Note: sql_magic stores the constructed SQL string in the SQL client object so that you can use it (e.g. to submit() it for execution) in subsequent method calls on the client object.
There are three options to execute an SQL statement, as described in the documentation.
Synchronously with result dataframe - submit, wait (until the query is completed), and return the queried data as dataframe: run_sql()
Asynchronously - submit, and return the control immediately (along with job_id) : submit_sql()
(submit()
when using sql_magic)
Synchronously with optional result dataframe - submit, wait (until the query is completed), and return a tuple (data, job_id), where data is the optional result dataframe: execute_sql()
(run()
when using sql_magic)
The last option is useful to help avoiding Python runtime memory overload. Another alternative to deal with big result sets is to paginate the result i.e. use the pagesize
option, for details see paginated result.
A Data Engine instance with free Lite plan can process one query at a time. A standard plan instance can by default process 5 concurrent queries, which can be increased via support ticket. When the maximum has been reached the Python SDK will retry (with progressive backup timing) as many times as has been specified in the max_tries when initializing the client.
This method provides a synchronous SQL execution mechanism. In the subsequent cell the above SQL statement is submitted. The method waits for the excution to be finish. The queried data are returned synchronously as a dataframe.
result_df = sqlClient.run_sql(sql)
if isinstance(result_df, str):
print(result_df)
result_df.head(10)
In the subsequent call you see how easy it can be to visualize your result.
ax = result_df.FirstName.value_counts().plot(kind='bar', title="Orders per Employee")
ax.set_xlabel("First Name")
ax.set_ylabel("Order Count");
In the subsequent cell the earlier created SQL statement is submitted. The method returns right away without waiting for the completion of the job execution.
submit_sql()
runs the given SQL string, while submit()
runs the internally via sql_magic
module generated SQL string .
jobId = sqlClient.submit_sql(sql)
print("SQL job submitted and running in the background. jobId = " + jobId)
print("Job status for " + jobId + ": " + sqlClient.get_job(jobId)['status'])
Use the wait_for_job()
method as a blocking call until your job has finished:
job_status = sqlClient.wait_for_job(jobId)
print("Job " + jobId + " terminated with status: " + job_status)
if job_status == 'failed':
details = sqlClient.get_job(jobId)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
Use the get_result()
method to retrieve a dataframe for the SQL result set:
result_df = sqlClient.get_result(jobId)
print("OK, we have a dataframe for the SQL result that has been stored by Data Engine in " + sqlClient.get_job(jobId)['resultset_location'])
print("Internal SQL string created earlier using sql_magic:\n ")
sqlClient.print_sql()
jobId= sqlClient.submit()
print("\nSQL job submitted and running in the background. jobId = " + jobId)
job_status = sqlClient.wait_for_job(jobId)
print("Job " + jobId + " terminated with status: " + job_status)
if job_status == 'failed':
details = sqlClient.get_job(jobId)
print("Error: {}\nError Message: {}".format(details['error'], details['error_message']))
The synchronuous execution of an SQL statement with run_sql() can cause Python runtime memory overload when the result set is big. Therefore a synchronous execution method is provided which allows to control whether the result set is to be returned as dataframe or only stored on cloud object storage.
execute_sql()
submits the sql statement, waits (until the query is completed), and returns a named tuple (data, job_id), with data being a dataframe optionally filled with the queried data. The option get_result
controls if data are returned in the dataframe data
or only stored on cloud object storage. Setting get_result = "false" is recommended if the result set is expected to be big. An alternative to deal with big result sets is to use pagination which is possible using the the pagesize
option as described later.
result = sqlClient.execute_sql(sql, get_result=True)
display(result.data)
The equivalent method to run a SQL statement that was generated with sql_magic is run()
.
There is another alternative to deal with large result sets to avoid Python runtime memory overload when reading the result as a dataframe. You can return the result in small enough "chunks" or "pages". All of the SQL execution methods previously introduced provide the optional paramemter pagesize
. When set the result set is written in multiple objects with each having as many rows as specified in pagesize
. Since this is implemented using the Data Engine SQL syntax clause of PARTITIONED EVERY <num> ROW
your query must not already contain another PARTITIONED
clause when you set the pagesize
parameter.
The following cells demonstrate the usage of the optional pagesize
parameter.
pagination_sql='SELECT OrderID, c.CustomerID CustomerID, CompanyName, City, Region, PostalCode \
FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o, \
cos://us-geo/sql/customers.parquet STORED AS PARQUET c \
WHERE c.CustomerID = o.CustomerID \
INTO {}paginated_orders STORED AS PARQUET'.format(targeturl)
formatted_etl_sql = sqlparse.format(pagination_sql, reindent=True, indent_tabs=True, keyword_case='upper')
result = highlight(formatted_etl_sql, lexer, formatter)
print('\nExample Statement is:\n')
print(result)
jobId = sqlClient.submit_sql(pagination_sql, pagesize=10)
job_status = sqlClient.wait_for_job(jobId)
print("Job " + jobId + " terminated with status: " + job_status)
job_details = sqlClient.get_job(jobId)
if job_status == 'failed':
print("Error: {}\nError Message: {}".format(job_details['error'], job_details['error_message']))
Let's check how many pages with each 10 rows have been written:
print("Number of pages written by job {}: {}".format(jobId, len(sqlClient.list_results(jobId))))
The following cell retrieves the first page of the result as a data frame. The desired page is specified as the optional parameter pagenumber
to the get_result()
method.
pagenumber=1
sqlClient.get_result(jobId, pagenumber=pagenumber).head(100)
The following cell gets the next page. Run it multiple times in order to retrieve the subsequent pages, one page after the another.
pagenumber+=1
sqlClient.get_result(jobId, pagenumber).head(100)
The method get_job() provides you with some execution related details. This way you can figure out the status of the job, the result_location
on COS, the query startstart_time
and end_time
, and the key performance metrics such as bytes_read
.
job_details = sqlClient.get_job(jobId)
pprint.pprint(job_details)
etl_sql='SELECT OrderID, c.CustomerID CustomerID, CompanyName, ContactName, ContactTitle, Address, City, Region, PostalCode, Country, Phone, Fax \
EmployeeID, OrderDate, RequiredDate, ShippedDate, ShipVia, Freight, ShipName, ShipAddress, \
ShipCity, ShipRegion, ShipPostalCode, ShipCountry FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o, \
cos://us-geo/sql/customers.parquet STORED AS PARQUET c \
WHERE c.CustomerID = o.CustomerID \
INTO {}customer_orders STORED AS PARQUET PARTITIONED BY (ShipCountry, ShipCity)'.format(targeturl)
formatted_etl_sql = sqlparse.format(etl_sql, reindent=True, indent_tabs=True, keyword_case='upper')
result = highlight(formatted_etl_sql, lexer, formatter)
print('\nExample ETL Statement is:\n')
print(result)
jobId = sqlClient.submit_sql(etl_sql)
print("SQL job submitted and running in the background. jobId = " + jobId)
job_status = sqlClient.wait_for_job(jobId)
print("Job " + jobId + " terminated with status: " + job_status)
job_details = sqlClient.get_job(jobId)
if job_status == 'failed':
print("Error: {}\nError Message: {}".format(job_details['error'], job_details['error_message']))
Now let's have a look at the cloud object storage location where the result objects are stored. The following cell uses the get_cos_summary()
method to print a summary of the objects that have been written by the previous ETL SQL statement.
resultset_location = job_details['resultset_location']
sqlClient.get_cos_summary(resultset_location)
Note the total_volume value. We will reference it for comparison in the next steps. Also note the number in total_objects
, which indicates that the ETL produced 69 partitions (71 objects minus two bookkeeping objects).
In the following cell we use the list_results()
method to print a list of these 71 objects that have been written by the above ETL SQL statement. Note the partition columns and their values being part of the object names now. When you have a closer look at the cloud object storage URL of the objects you will notice that it contains the values of the partionting columns, e.g. ShipCountry=Argentina/ShipCity=Buenos Aires
. All records in this result partition do have the value Argentina
for column ShipCountry
and Buenos Aires
for ChipCity
.
The fact that the result data are partitioned this way and that this fact is made externally visible by making it part of the object name can be leveraged for example by a query engine to optimize the execution performance. This type of partitioning called hive-style-partitioning is the basis for optimizing the execution of an SQL statement using predicates that match with the partitioning columns.
pd.set_option('display.max_colwidth', None)
result_objects_df = sqlClient.list_results(jobId)
print("List of objects written by ETL SQL:")
result_objects_df.head(200)
Now let's take a look at the result data with the get_result()
method. Note that the result dataframe contains the two partitioning columns. The values for these have been put together by get_result()
from the object names above because in hive style partitioning the partition column values are not stored in the objects but rather in the object names.
sqlClient.get_result(jobId).head(100)
The following cell runs an SQL query against the partitioned data that has been produced by the previous ETL SQL statement. The query uses WHERE
predicates on the columns the dataset is partitioned by. This allows for performance optimization during query execution. The query engine will physically only read the objects that match these predicate values exploiting the fact that the predicate columns match the partitioning columns.
optimized_sql='SELECT * FROM {} STORED AS PARQUET WHERE ShipCountry = "Austria" AND ShipCity="Graz" \
INTO {} STORED AS PARQUET'.format(resultset_location, targeturl)
formatted_optimized_sql = sqlparse.format(optimized_sql, reindent=True, indent_tabs=True, keyword_case='upper')
result = highlight(formatted_optimized_sql, lexer, formatter)
print('\nRunning SQL against the previously produced hive style partitioned objects as input:\n')
print(result)
jobId = sqlClient.submit_sql(optimized_sql)
job_status = sqlClient.wait_for_job(jobId)
print("Job " + jobId + " terminated with status: " + job_status)
job_details = sqlClient.get_job(jobId)
if job_status == 'failed':
print("Error: {}\nError Message: {}".format(job_details['error'], job_details['error_message']))
In the following cell we use get_job()
to verify in the job details of the just run optimized SQL that hive style partitioning has been leveraged. Note the bytes_read
value that is significantly lower than the total_volume
value of the data in the queries data set. The number is 6408 bytes - remember that the total volume of input data is 408 KB. The number of bytes_read
matches the size of the partion customer_orders/jobid=6ff7b1d0-b69c-4d1b-8ebf-968d69436f56/ShipCountry=Argentina/ShipCity=Buenos Aires/ .
This reveals the fact that the execution engine leverages the hive style partitioning to optimze performance by only reading the partitions which match the filter predicate. This I/O avoidance is the reason for the increase of query performance and the lower the query cost.
sqlClient.get_job(jobId)
Partitioning data is also a worth a consideration when writing big result sets as this enables parallel writing and avoids memory problems.
The result objects written to cloud object storage by default with a virtual sub folder structure in the following naming convention:
cos://endpoint/bucketname/resultSetName/jobid=<JOB_ID>/_SUCCESS
cos://endpoint/bucketname/resultSetName/jobid=<JOB_ID>
cos://endpoint/bucketname/resultSetName/jobid=<JOB_ID>/part-<xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx>.<format>
where cos://endpoint/bucketname
is the target URL and resultSetName the user picked result set name as specified in the SQL INTO clause. The first two objects are of zero byte size and are merely used for bookkeepung purposes. The first _SUCCESS
object indicates that all objects of the given data set have been successfully written. The second object is empty as well and denotes the name prefix of all objects belonging to a specific data set. You may think of it as the "root" in a naming hierachy for all result set related objects. The third object can in fact exist more than once if a PARTIONED INTO
or PARTITIONED EVERY
clause has been used.
The following cell runs a simple SQL and lists the result objects for it. Since it uses a PARTITIONED INTO 3 OBJECT
clause you can see three data objects being written.
pd.set_option('display.max_colwidth', None)
sql="SELECT * FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET INTO {} STORED AS PARQUET PARTITIONED INTO 3 OBJECTS".format(targeturl)
jobId = sqlClient.submit_sql(sql)
sqlClient.wait_for_job(jobId)
sqlClient.list_results(jobId).head(100)
You can delete the result set from Cloud Object Storage using the delete_result()
method which deletes all cloud object storage objects related to the specified result set.
sqlClient.delete_result(jobId)
As described above an SQL job always writes a folder structure with three (or more when partitioned) objects to the cloud object store. The fact that the jobId is generated into the name of the objects ensures that in case the same SQL statement is executed multiple times the result set is not overwritten but stored in execution specific objects. If this is not desired, but the result is supposed to be overwritten each time the SQL statement is executed JOBPREFIX NONE
clause can be used. In the subsequent sample the optional clause JOBPREFIX NONE
implies that the jobId is no longer part of the object name.
But note, that this in turn implies each time the same target path - as specified in the INTO clause - will be used and therefore the result data set objects be "overwritten". The following sample demonstrates the effect of the JOBPREFIX NONE
clause in an SQL statement on the naming of the result objects. Note, that the jobID is no longer part of the object names.
sql="SELECT * FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET LIMIT 100 INTO {}first100orders.parquet JOBPREFIX NONE STORED AS PARQUET".format(targeturl)
jobId = sqlClient.submit_sql(sql)
sqlClient.wait_for_job(jobId)
sqlClient.list_results(jobId).head(100)
With this your result object name is closer your actually specified path in the INTO
clause, but it is still not EXACTLY the same. When you write a single partitioned result there is only one object with the data and you may want to have a single result object without any virtual folder structure. For this case you can use the method rename_exact_result()
on a jobId for a job that has a single partitoned result.
sqlClient.rename_exact_result(jobId)
sqlClient.list_results(jobId).head(100)
After applying this method to the jobs' result the list_results()
method shows exactly one object - the one containing the result data - with the result set name as specified in the INTO
clause.
The set of APIs that allow you to interact with jobs is provided via this link.
Many of them are useful when you are launching many SQL statements e.g.
get_jobs_with_status()
myjobs()
get_jobs()
export_job_history()
The next cell lists the most recent 30 jobs in your instance of Data Engine:
pd.set_option('display.max_colwidth', None)
#You can change the value -1 for display.max_colwidth to a positive integer if you want to truncate the cell content to shrink the overall table display size.
job_history_df = sqlClient.get_jobs()
job_history_df.head(100)
If you need a longer history of your jobs then you need to periodically export the current job history to a location on COS. This can be achieved conveniently with the method export_job_history()
sqlClient.export_job_history(targeturl + "my_job_history/", "job_export_" , ".parquet")
# sqlClient.export_job_history(targeturl + "my_job_history/" , "job_export_" , ".parquet")
Each time when you run this function it performs a delta check of the currently available recent history of 30 jobs with the already stored jobs on the specified location and writes out an additional parquet partition with only the new jobs that hadn't been exported previously:
# Run another SQL:
sql="SELECT * FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET LIMIT 100 INTO {} STORED AS PARQUET".format(targeturl)
jobId = sqlClient.submit_sql(sql)
sqlClient.wait_for_job(jobId)
#Export job history again:
sqlClient.export_job_history(targeturl + "my_job_history/", "job_export_" , ".parquet")
# Query exported job history:
pd.set_option('display.max_colwidth', 20)
sql = "SELECT * FROM {}my_job_history/ STORED AS PARQUET INTO {} STORED AS PARQUET".format(targeturl, targeturl)
sqlClient.run_sql(sql)
Assume you want to automate your ETL process and there is a situation where you have to submit many SQL statements at once. Doing so you have to be aware of the fact that there is a limit to the number of queries that can be served by a single Data Engine instance at a time. This is 1 for Lite Plan; and 5 for Standard Plan. If some of your jobs are long running it may happen that some jobs are not yet completed when the session ends.
Since ibmcloudsql
0.4, if you're using Watson Studio,there is also the possibility to retry jobs which did not complete within the session they have been started in. At a later point in time when you start a new session you may want to restart those jobs which did not complete. You do so by connecting to the Project-Lib, and save the list of jobs submitted in a file in the project bucket. When you re-run the session, the status of the execution of all jobs is updated and the jobs which did not complete in the previous session are re-executed.
The following code in the following cell demonstrates this behaviour. Re-run the cell a couple of times and check the file "tracked_jobs_new.json" in the project bucket to observe the behavior.
This can be used in Watson Studio with ProjectLib activated using a file stored as an asset in the project, or it can also be used using a file from a local machine.
REF: The API docs is provided via this link.
file_name="tracked_jobs1.json"
sqlClient.connect_project_lib(project, file_name)
sql_stmt1='SELECT o.OrderID FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o LIMIT 5 '
sql_stmt2='SELECT o.OrderID FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o LIMIT 10 '
sql_stmt3='SELECT o.OrderID FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o LIMIT 15 '
sql_stmt4='SELECT o.OrderID FROM cos://us-geo/sql/orders.parquet STORED AS PARQUET o LIMIT 20'
jobs = [ ]
jobs.append(sqlClient.submit_and_track_sql(sql_stmt1))
jobs.append(sqlClient.submit_and_track_sql(sql_stmt2))
jobs.append(sqlClient.submit_and_track_sql(sql_stmt3))
jobs.append(sqlClient.submit_and_track_sql(sql_stmt4))
Here, if you are given a list of job_id, you can run them all, and each will be checked with those saved in the file_name
stored in ProjectLib.
sqlClient.process_failed_jobs_until_all_completed(jobs)
So far all SQL statements used cloud object storage URLs to specify the input data of an SQL statement. This can be cumbersome. That's why ibmcloudsql has dedicated functions for hive meta store support of Data Engine. This enables the usage of table names in SQL statements rather than cloud object storage URLs. The hive meta store is a catalog which holds for each table all related metadata including the table name, the schema, the backing cloud object storage objects, partitioning information etc.
NOTE: Using this requires Standard Plan for Data Engine.
One can create tables, list tables, drop tables etc. The APIs for catalog management can be found here .
Let's first create table for a simple sample dataset customers.csv provided by Data Engine. The create_table()
method creates an object in the hive metastore containing the table name and the schema of the table. When not specifying the optional paramter schema
the schema is automatically discovered from the data on COS.
sqlClient.create_table("customers", cos_url="cos://us-geo/sql/customers.csv", format_type="csv", force_recreate=True)
With describe_table()
you can retrieve the schema information in the catalog.
customers_schema = sqlClient.describe_table("customers")
customers_schema.head(100)
In the following cell we run the create_table()
again but this time we specify the schema explicitly:
sqlClient.create_table("customers", cos_url="cos://us-geo/sql/customers.csv", format_type="csv", force_recreate=True,
schema="(customerID string, companyName string, contactName string, contact_Title string, address string, city string)")
When you have hive-partitioned dataset on COS (i.e. the sub folder hierarchy adhere to the hive naming convention) you can create a partitioned table using the create_partitioned_table()
method.
df = sqlClient.show_tables()
try:
found = df[df["tableName"].str.contains("customers_partitioned")]
except Exception:
found = []
if len(found) > 0:
sqlClient.drop_table("customers_partitioned")
sqlClient.create_partitioned_table("customers_partitioned", cos_url="cos://us-geo/sql/customers_partitioned.csv", format_type="csv")
customers_partitioned_schema = sqlClient.describe_table("customers_partitioned")
customers_partitioned_schema.head(100)
Note: After creatung a partitioned table it is always initially emply until you have added the partitions to it. A convenient method to do this is via the recover_table_partitions()
method.
sqlClient.recover_table_partitions("customers_partitioned")
You can get a list of created tables with show_tables()
.
sqlClient.show_tables()
Alternatively to the above convenience methods for managing hive tables you can use DDL statements and run them via submit_sql()
. This gives you the full set of hive table management commands and options, which goes beyond what the above convenience methods cover.
In this section we use a custom Spark application context that we configure with Data Engine as the table catalog. You have to run the Notebook with a Spark runtime environment to exercise this section. The section is divided into two parts:
Note The steps in this sub section are only required if you're not using one of Watson Studio Spark environment or Analytics Engine serverless Spark. In those services the Data Engine libraries are already configured out of the box and you can immediately proceed with section 10.2 below.
Download Hive client library for Spark to connect to Hive Metastore in Data Engine. The download directory needs to be specified to Data Engine's Spark session builder later on:
hive_client_dir="/tmp/dataengine_jars"
!mkdir -p {hive_client_dir}
!wget https://us.sql-query.cloud.ibm.com/download/catalog/hive-metastore-standalone-client-3.1.2-sqlquery.jar -O {hive_client_dir}/dataengine-hive-client.jar
Download and install Data Engine session builder libraries that allow to create a Spark session readily configured with Data Engine with a single line of code:
dataengine_spark_version="1.0.10"
!wget https://us.sql-query.cloud.ibm.com/download/catalog/dataengine-spark-integration-{dataengine_spark_version}.jar -O user-libs/spark2/dataengine-spark.jar
!wget https://us.sql-query.cloud.ibm.com/download/catalog/dataengine_spark-{dataengine_spark_version}-py3-none-any.whl -O /tmp/dataengine_spark-{dataengine_spark_version}-py3-none-any.whl
!pip install --user --force-reinstall /tmp/dataengine_spark-{dataengine_spark_version}-py3-none-any.whl
Also make sure that you run the cells to set instancecrn
and apikey
in section 2.2 Setting the Data Engine parameters again after restarting the kernel.
Now set up a Spark session with the hive client library that you just downloaded above into the in hive_client_dir
directory:
from dataengine import SparkSessionWithDataengine
session_builder = SparkSessionWithDataengine.enableDataengine(instancecrn, apikey, "public", hive_client_dir)
spark = session_builder.appName("Spark DataEngine integration").getOrCreate()
In case you want to run only this noebook section here with a fresh notebook runtime, you must make sure that you first run the cells to set instancecrn
and apikey
in section 2.2 Setting the Data Engine parameters because we need these variables to configure the Hive client with your Data Engine instance.
Set up a Data Eninge session for Spark (don't run the following cell if you went through sectioon 10.1. using your own Spark runtime outside of Watson Studio or Analytics Engine because in 10.1. we already configured your spark
session):
from dataengine import SparkSessionWithDataengine
session_builder = SparkSessionWithDataengine.enableDataengine(instancecrn, apikey, "public")
spark = session_builder.appName("Spark DataEngine integration").getOrCreate()
Only for informational purposes you can now introspect Hive Metastore parameters of the Data Engine Spark session. As you can see it is configured to connect to Data Engine's URI as Hive Metastore:
for conf in spark.sparkContext.getConf().getAll():
key = conf[0]
value = "***" if apikey == conf[1] else conf[1]
if key.startswith("spark.hive.metastore"):
print(key, value)
Prepare a table my_customers
in your own bucket from the customers sample data:
if not targeturl.endswith('/'):
targeturl+="/"
jobId = sqlClient.submit_sql("SELECT * FROM cos://us-geo/sql/customers.csv INTO {}my_customers.parquet JOBPREFIX NONE STORED AS PARQUET".format(targeturl))
sqlClient.wait_for_job(jobId)
sqlClient.rename_exact_result(jobId)
sqlClient.create_table("my_customers", cos_url="{}my_customers.parquet".format(targeturl), format_type="parquet", force_recreate=True)
List all tables in Data Engine catalog:
spark.sql('show tables').show(truncate=False)
Run a Spark SQL on a table in Data Engine catalog:
spark_df = spark.sql('select count(*), country from my_customers group by country')
spark_df.show(truncate=True)
In this notebook you learned how you can use the ibmcloudsql
library in a Python notebook to submit SQL queries on data in IBM Cloud Object Storage and how you can interact with the query results. If you want to automate such an SQL job execution as part of your cloud solution, apart from the solution using ProjectLib, you can use the IBM Cloud Functions framework. There is a dedicated SQL function available that lets you set up a cloud function to run SQL statements with IBM Data Engine. You can find the documentation for doing this here.
Torsten Steinbach, Torsten is IBM's CTO for Big Data in Cloud.
Tuan M. HoangTrong, Tuan is the research staff member in the Distributed AI, TimeSeries Group.
Copyright © IBM Corp. 2020-2024. This notebook and its source code are released under the terms of the MIT License.