Demo Notebook for Reading & Writing from PySpark to dashDB

In order to connect this notebook to one of your dashDB systems, insert credentials cell here. To do so click "Find and Add Data" at top right of the screen, then select "Connection" and select "Insert to code" for the dashDB system of your choice. Make sure you have a dashDB connection set up in your project beforehand.

Make sure the inserted properties are labelled credentials_1.

In [1]:
 

Reading dashDB table data into PySpark data frames

In [2]:
sqlContext=SQLContext(sc)
transportation = sqlContext.read.jdbc(credentials_1["jdbcurl"],
                                     "SAMPLES.TRANSPORTATION",
                                      properties = {"user" : credentials_1["username"], "password" : credentials_1["password"]})
transportation.show()
+---------+--------------------+
|JWTR_CODE|           JWTR_DESC|
+---------+--------------------+
|       bb|  N/A (Never Worked)|
|       01|  Car, truck, or van|
|       02|  Bus or trolley bus|
|       03|Streetcar or trol...|
|       04|              Subway|
|       05|            Railroad|
|       06|           Ferryboat|
|       07|             Taxicab|
|       08|          Motorcycle|
|       09|             Bicycle|
|       10|                Walk|
|       11|      Work From Home|
|       12|               Other|
+---------+--------------------+

Writing PySpark data frames to dashDB tables

Set up Spark env for PySpark writing to dashDB

We need to run some Scala logic to configure the JDBC dialect for dashDB correctly. Pixiedust provides us a way to do it So first we make sure to have the latest pixiedust assembly:

In [12]:
!pip install --user --upgrade pixiedust
print("Please restart the kernel and then run through the notebook again (skipping this cell).")
Requirement already up-to-date: pixiedust in /gpfs/global_fs01/sym_shared/YPProdSpark/user/sa53-b3957c2b380967-819c8cb83d28/.local/lib/python2.7/site-packages
Requirement already up-to-date: mpld3 in /gpfs/global_fs01/sym_shared/YPProdSpark/user/sa53-b3957c2b380967-819c8cb83d28/.local/lib/python2.7/site-packages (from pixiedust)
Requirement already up-to-date: lxml in /gpfs/global_fs01/sym_shared/YPProdSpark/user/sa53-b3957c2b380967-819c8cb83d28/.local/lib/python2.7/site-packages (from pixiedust)
Please restart the kernel and then run through the notebook again (skipping this cell).
In [3]:
import pixiedust
Pixiedust database opened successfully
Pixiedust version 0.70

Do you have more than 20 string properties in a single data frames that you want to write to dashDB? If yes, please change the maxStringColumnLength value in the next cell to a value according to a value of 640.000 / number of string properties. E.g. when you have a data frame with 50 string properties, you should set maxStringColumnLength=12800.

In [4]:
maxStringColumnLength=32000

Next cell sets up the custom dashDB dialect (with proper string type mapping) using PixieDust scala bridge:

In [5]:
%%scala cl=dialect global=true
import org.apache.spark.sql.jdbc._
import org.apache.spark.sql.types.{StringType, BooleanType, DataType}

object dashDBCustomDialect extends JdbcDialect {
    override def canHandle(url: String): Boolean = url.startsWith("jdbc:db2")
    override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
            case StringType => Option(JdbcType("VARCHAR(" + maxStringColumnLength + ")", java.sql.Types.VARCHAR))
            case BooleanType => Option(JdbcType("CHAR(1)", java.sql.Types.CHAR))
            case _ => None
    }
}
JdbcDialects.registerDialect(dashDBCustomDialect)

Creating and writing data frames

Create some test data frames that we are going to write:

In [6]:
sqlContext=SQLContext(sc)
mydfWithString = sqlContext.createDataFrame(
[(2010, 'Camping Equipment', 3),
 (2010, 'Golf Equipment', 1),
 (2010, 'Mountaineering Equipment', 1),
 (2010, 'Outdoor Protection', 2),
 (2010, 'Personal Accessories', 2),
 (2011, 'Camping Equipment', 4),
 (2011, 'Golf Equipment', 5),
 (2011, 'Mountaineering Equipment',2),
 (2011, 'Outdoor Protection', 4),
 (2011, 'Personal Accessories', 2),
 (2012, 'Camping Equipment', 5),
 (2012, 'Golf Equipment', 5),
 (2012, 'Mountaineering Equipment', 3),
 (2012, 'Outdoor Protection', 5),
 (2012, 'Personal Accessories', 3),
 (2013, 'Camping Equipment', 8),
 (2013, 'Golf Equipment', 5),
 (2013, 'Mountaineering Equipment', 3),
 (2013, 'Outdoor Protection', 8),
 (2013, 'Personal Accessories', 4)],
["year","zone","unique_customers"])

mydfWithoutString = sqlContext.createDataFrame(
[(2010, 3),
 (2010, 1),
 (2010, 1),
 (2010, 2),
 (2010, 2),
 (2011, 4),
 (2011, 5),
 (2011, 2),
 (2011, 4),
 (2011, 2),
 (2012, 5),
 (2012, 5),
 (2012, 3),
 (2012, 5),
 (2012, 3),
 (2013, 8),
 (2013, 5),
 (2013, 3),
 (2013, 8),
 (2013, 4)],
["year","unique_customers"])

Now write the data frame that doesn't hold a string. This will work even when you have skipped the registration of dashDB dialect via PixieDust scala bridge above:

In [7]:
mydfWithoutString.write.jdbc(credentials_1["jdbcurl"], "TEST_DATAFRAME_WITHOUT_STRING", properties = {"user" : credentials_1["username"], "password" : credentials_1["password"]}, mode="overwrite")

Now write the data frame that holds a string. This requires the registration of dashDB dialect via PixieDust scala bridge above:

In [8]:
mydfWithString.write.jdbc(credentials_1["jdbcurl"], "TEST_DATAFRAME_WITH_STRING", properties = {"user" : credentials_1["username"], "password" : credentials_1["password"]}, mode="overwrite")
In [ ]: