0 / 0
Data skipping for Spark SQL
Last updated: Nov 21, 2024
Data skipping for Spark SQL

Data skipping can significantly boost the performance of SQL queries by skipping over irrelevant data objects or files based on a summary metadata associated with each object.

Data skipping uses the open source Xskipper library for creating, managing and deploying data skipping indexes with Apache Spark. See Xskipper - An Extensible Data Skipping Framework.

For more details on how to work with Xskipper see:

In addition to the open source features in Xskipper, the following features are also available:

Geospatial data skipping

You can also use data skipping when querying geospatial data sets using geospatial functions from the spatio-temporal library.

  • To benefit from data skipping in data sets with latitude and longitude columns, you can collect the min/max indexes on the latitude and longitude columns.
  • Data skipping can be used in data sets with a geometry column (a UDT column) by using a built-in Xskipper plugin.

The next sections show you to work with the geospatial plugin.

Setting up the geospatial plugin

To use the plugin, load the relevant implementations using the Registration module. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.

  • For Scala:

    import com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory
    import com.ibm.xskipper.stmetaindex.index.STIndexFactory
    import com.ibm.xskipper.stmetaindex.translation.parquet.{STParquetMetaDataTranslator, STParquetMetadatastoreClauseTranslator}
    import io.xskipper._
    
    Registration.addIndexFactory(STIndexFactory)
    Registration.addMetadataFilterFactory(STMetaDataFilterFactory)
    Registration.addClauseTranslator(STParquetMetadatastoreClauseTranslator)
    Registration.addMetaDataTranslator(STParquetMetaDataTranslator)
    
  • For Python:

    from xskipper import Xskipper
    from xskipper import Registration
    
    Registration.addMetadataFilterFactory(spark, 'com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory')
    Registration.addIndexFactory(spark, 'com.ibm.xskipper.stmetaindex.index.STIndexFactory')
    Registration.addMetaDataTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetaDataTranslator')
    Registration.addClauseTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetadatastoreClauseTranslator')
    

Index building

To build an index, you can use the addCustomIndex API. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.

  • For Scala:

    import com.ibm.xskipper.stmetaindex.implicits._
    
    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    
    xskipper
      .indexBuilder()
      // using the implicit method defined in the plugin implicits
      .addSTBoundingBoxLocationIndex("location")
      // equivalent
      //.addCustomIndex(STBoundingBoxLocationIndex("location"))
      .build(reader).show(false)
    
  • For Python:

    xskipper = Xskipper(spark, dataset_path)
    
    # adding the index using the custom index API
    xskipper.indexBuilder() \
            .addCustomIndex("com.ibm.xskipper.stmetaindex.index.STBoundingBoxLocationIndex", ['location'], dict()) \
            .build(reader) \
            .show(10, False)
    

Supported functions

The list of supported geospatial functions includes the following:

  • ST_Distance
  • ST_Intersects
  • ST_Contains
  • ST_Equals
  • ST_Crosses
  • ST_Touches
  • ST_Within
  • ST_Overlaps
  • ST_EnvelopesIntersect
  • ST_IntersectsInterior

Encrypting indexes

If you use a Parquet metadata store, the metadata can optionally be encrypted using Parquet Modular Encryption (PME). This is achieved by storing the metadata itself as a Parquet data set, and thus PME can be used to encrypt it. This feature applies to all input formats, for example, a data set stored in CSV format can have its metadata encrypted using PME.

In the following section, unless specified otherwise, when referring to footers, columns, and so on, these are with respect to metadata objects, and not to objects in the indexed data set.

Index encryption is modular and granular in the following way:

  • Each index can either be encrypted (with a per-index key granularity) or left in plain text
  • Footer + object name column:
    • Footer column of the metadata object which in itself is a Parquet file contains, among other things:
      • Schema of the metadata object, which reveals the types, parameters and column names for all indexes collected. For example, you can learn that a BloomFilter is defined on column city with a false-positive probability of 0.1.
      • Full path to the original data set or a table name in case of a Hive metastore table.
    • Object name column stores the names of all indexed objects.
  • Footer + metadata column can either be:
    • Both encrypted using the same key. This is the default. In this case, the plain text footer configuration for the Parquet objects comprising the metadata in encrypted footer mode, and the object name column is encrypted using the selected key.

    • Both in plain text. In this case, the Parquet objects comprising the metadata are in plain text footer mode, and the object name column is not encrypted.

      If at least one index is marked as encrypted, then a footer key must be configured regardless of whether plain text footer mode is enabled or not. If plain text footer is set then the footer key is used only for tamper-proofing. Note that in that case the object name column is not tamper proofed.

      If a footer key is configured, then at least one index must be encrypted.

Before using index encryption, you should check the documentation on PME and make sure you are familiar with the concepts.

Important: When using index encryption, whenever a key is configured in any Xskipper API, it's always the label `NEVER the key itself`.

To use index encryption:

  1. Follow all the steps to make sure PME is enabled. See PME.

  2. Perform all regular PME configurations, including Key Management configurations.

  3. Create encrypted metadata for a data set:

    1. Follow the regular flow for creating metadata.
    2. Configure a footer key. If you wish to set a plain text footer + object name column, set io.xskipper.parquet.encryption.plaintext.footer to true (See samples below).
    3. In IndexBuilder, for each index you want to encrypt, add the label of the key to use for that index.

    To use metadata during query time or to refresh existing metadata, no setup is necessary other than the regular PME setup required to make sure the keys are accessible (literally the same configuration needed to read an encrypted data set).

Samples

The following samples show metadata creation using a key named k1 as a footer + object name key, and a key named k2 as a key to encrypt a MinMax for temp, while also creating a ValueList for city, which is left in plain text. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.

  • For Scala:

    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    // Configuring the JVM wide parameters
    val jvmComf = Map(
      "io.xskipper.parquet.mdlocation" -> md_base_location,
      "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION")
    Xskipper.setConf(jvmConf)
    // set the footer key
    val conf = Map(
      "io.xskipper.parquet.encryption.footer.key" -> "k1")
    xskipper.setConf(conf)
    xskipper
      .indexBuilder()
      // Add an encrypted MinMax index for temp
      .addMinMaxIndex("temp", "k2")
      // Add a plaintext ValueList index for city
      .addValueListIndex("city")
      .build(reader).show(false)
    
  • For Python

    xskipper = Xskipper(spark, dataset_path)
    # Add JVM Wide configuration
    jvmConf = dict([
      ("io.xskipper.parquet.mdlocation", md_base_location),
      ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
    Xskipper.setConf(spark, jvmConf)
    # configure footer key
    conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1")])
    xskipper.setConf(conf)
    # adding the indexes
    xskipper.indexBuilder() \
            .addMinMaxIndex("temp", "k1") \
            .addValueListIndex("city") \
            .build(reader) \
            .show(10, False)
    

If you want the footer + object name to be left in plain text mode (as mentioned above), you need to add the configuration parameter:

  • For Scala:

    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    // Configuring the JVM wide parameters
    val jvmComf = Map(
      "io.xskipper.parquet.mdlocation" -> md_base_location,
      "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION")
    Xskipper.setConf(jvmConf)
    // set the footer key
    val conf = Map(
      "io.xskipper.parquet.encryption.footer.key" -> "k1",
      "io.xskipper.parquet.encryption.plaintext.footer" -> "true")
    xskipper.setConf(conf)
    xskipper
      .indexBuilder()
      // Add an encrypted MinMax index for temp
      .addMinMaxIndex("temp", "k2")
      // Add a plaintext ValueList index for city
      .addValueListIndex("city")
      .build(reader).show(false)
    
  • For Python

    xskipper = Xskipper(spark, dataset_path)
    # Add JVM Wide configuration
    jvmConf = dict([
    ("io.xskipper.parquet.mdlocation", md_base_location),
    ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
    Xskipper.setConf(spark, jvmConf)
    # configure footer key
    conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1"),
    ("io.xskipper.parquet.encryption.plaintext.footer", "true")])
    xskipper.setConf(conf)
    # adding the indexes
    xskipper.indexBuilder() \
            .addMinMaxIndex("temp", "k1") \
            .addValueListIndex("city") \
            .build(reader) \
            .show(10, False)
    

Data skipping with joins (for Spark 3 only)

With Spark 3, you can use data skipping in join queries such as:

SELECT *
FROM orders, lineitem 
WHERE l_orderkey = o_orderkey and o_custkey = 800

This example shows a star schema based on the TPC-H benchmark schema (see TPC-H) where lineitem is a fact table and contains many records, while the orders table is a dimension table which has a relatively small number of records compared to the fact tables.

The above query has a predicate on the orders tables which contains a small number of records which means using min/max will not benefit much from data skipping.

Dynamic data skipping is a feature which enables queries such as the above to benefit from data skipping by first extracting the relevant l_orderkey values based on the condition on the orders table and then using it to push down a predicate on l_orderkey that uses data skipping indexes to filter irrelevant objects.

To use this feature, enable the following optimization rule. Note that you can only use Scala in applications in IBM Analytics Engine powered by Apache Spark, not in watsonx.ai Studio.

  • For Scala:

      import com.ibm.spark.implicits.
    
      spark.enableDynamicDataSkipping()
    
  • For Python:

        from sparkextensions import SparkExtensions
    
        SparkExtensions.enableDynamicDataSkipping(spark)
    

Then use the Xskipper API as usual and your queries will benefit from using data skipping.

For example, in the above query, indexing l_orderkey using min/max will enable skipping over the lineitem table and will improve query performance.

Support for older metadata

Xskipper supports older metadata created by the MetaIndexManager seamlessly. Older metadata can be used for skipping as updates to the Xskipper metadata are carried out automatically by the next refresh operation.

If you see DEPRECATED_SUPPORTED in front of an index when listing indexes or running a describeIndex operation, the metadata version is deprecated but is still supported and skipping will work. The next refresh operation will update the metadata automatically.

Parent topic: Notebooks and scripts