0 / 0
Go back to the English version of the documentation
Spark SQL 数据跳过
Last updated: 2024年11月21日
Spark SQL 数据跳过

数据跳过功能可根据与每个对象关联的汇总元数据跳过不相关的数据对象或文件,从而显著提高 SQL 查询的性能。

数据跳过功能使用开源 Xskipper 库,通过 Apache Spark 来创建、管理和部署数据跳过索引。 请参阅 Xskipper - 可扩展的数据跳过框架

有关如何使用 Xskipper 的更多详细信息,请参阅:

除了 Xskipper 中的开源功能外,还提供了以下功能:

地理空间数据跳过

您还可以在使用地理空间功能时空库中查询地理空间数据集时使用数据跳过。

  • 要在包含纬度和经度列的数据集中通过数据跳过来简化操作,您可以收集纬度和经度列的最小/最大索引。
  • 通过使用内置的 Xskipper 插件,可以在包含几何列(UDT 列)的数据集中使用数据跳过。

以下部分将展示如何使用地理空间插件。

设置地理空间插件

要使用该插件,请使用 "注册" 模块装入相关实现。 请注意,您只能在由Apache Spark 支持的IBM Analytics Engine中的应用程序中使用Scala,而不能在watsonx.aiStudio 中使用。

  • 对于 Scala:

    import com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory
    import com.ibm.xskipper.stmetaindex.index.STIndexFactory
    import com.ibm.xskipper.stmetaindex.translation.parquet.{STParquetMetaDataTranslator, STParquetMetadatastoreClauseTranslator}
    import io.xskipper._
    
    Registration.addIndexFactory(STIndexFactory)
    Registration.addMetadataFilterFactory(STMetaDataFilterFactory)
    Registration.addClauseTranslator(STParquetMetadatastoreClauseTranslator)
    Registration.addMetaDataTranslator(STParquetMetaDataTranslator)
    
  • 对于 Python:

    from xskipper import Xskipper
    from xskipper import Registration
    
    Registration.addMetadataFilterFactory(spark, 'com.ibm.xskipper.stmetaindex.filter.STMetaDataFilterFactory')
    Registration.addIndexFactory(spark, 'com.ibm.xskipper.stmetaindex.index.STIndexFactory')
    Registration.addMetaDataTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetaDataTranslator')
    Registration.addClauseTranslator(spark, 'com.ibm.xskipper.stmetaindex.translation.parquet.STParquetMetadatastoreClauseTranslator')
    

索引建立

要构建索引,可以使用 addCustomIndex API。 请注意,您只能在由Apache Spark 支持的IBM Analytics Engine中的应用程序中使用Scala,而不能在watsonx.aiStudio 中使用。

  • 对于 Scala:

    import com.ibm.xskipper.stmetaindex.implicits._
    
    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    
    xskipper
      .indexBuilder()
      // using the implicit method defined in the plugin implicits
      .addSTBoundingBoxLocationIndex("location")
      // equivalent
      //.addCustomIndex(STBoundingBoxLocationIndex("location"))
      .build(reader).show(false)
    
  • 对于 Python:

    xskipper = Xskipper(spark, dataset_path)
    
    # adding the index using the custom index API
    xskipper.indexBuilder() \
            .addCustomIndex("com.ibm.xskipper.stmetaindex.index.STBoundingBoxLocationIndex", ['location'], dict()) \
            .build(reader) \
            .show(10, False)
    

支持的功能

支持的地理空间函数列表中包括下列函数:

  • ST_Distance
  • ST_Intersects
  • ST_Contains
  • ST_Equals
  • ST_Crosses
  • ST_Touches
  • ST_Within
  • ST_Overlaps
  • ST_EnvelopesIntersect
  • ST_IntersectsInterior

加密索引

如果使用 Parquet 元数据存储,那么可以选择使用 Parquet Modular Encryption (PME) 来加密元数据。 要实现此目的,可以将元数据本身存储为 Parquet 数据集,然后使用 PME 对其进行加密。 此功能适用于所有输入格式,例如,以 CSV 格式存储的数据集可以使用 PME 来加密其元数据。

在以下部分中,除非另有说明,否则在提及页脚和列等内容时,它们都与元数据对象相关,而不是与索引数据集中的对象相关。

可通过以下方式对索引加密进行模块化和细化:

  • 每个索引都可以加密 (使用每个索引的密钥粒度) 或保留为纯文本
  • “页脚 + 对象名称”列:
    • 元数据对象的“页脚”列,它本身就是一个 Parquet 文件,其中包含:
      • 元数据对象的模式,其中显示所收集的所有索引的类型、参数和列名称。 例如,您可以学会如何在 city 列上定义误报概率为 0.1BloomFilter
      • 原始数据集或表名(如果存在 Hive 元存储表)的完整路径。
    • “对象名称”列存储所有索引对象的名称。
  • “页脚 + 元数据”列可以:
    • 都使用相同密钥进行加密。 这是缺省值。 在此情况下,包含元数据的 Parquet 对象的明文页脚配置处于加密页脚方式,并且“对象名称”列使用所选密钥进行加密。

    • 都为明文。 在此情况下,包含元数据的 Parquet 对象处于明文页脚方式,并且“对象名称”列未加密。

      如果至少有一个索引被标记为“已加密”,那么无论是否启用明文页脚方式,都必须配置页脚密钥。 如果设置了明文页脚,那么页脚密钥仅用于防篡改。 请注意,在这种情况下,“对象名称”列不可防篡改。

      如果配置了页脚密钥,那么必须至少已加密一个索引。

在使用索引加密之前,您应该查看 PME 上的文档并确保您熟悉这些概念。

重要信息: 使用索引加密时,每当在任何 Xskipper API 中配置 key 时,它始终是标签 "从不密钥本身"。

要使用索引加密:

  1. 按照所有这些步骤来确保启用了 PME。 请参阅 PME (PME)

  2. 完成所有常规的 PME 配置,包括密钥管理配置。

  3. 为数据集创建加密元数据:

    1. 按照常规流程创建元数据。
    2. 配置一个页脚密钥。 如果想要设置“明文页脚 + 对象名称”列,请将 io.xskipper.parquet.encryption.plaintext.footer 设置为 true(请参阅下面的样本)。
    3. IndexBuilder 中,针对每个要加密的索引,添加要用于该索引的密钥标签。

    要在查询期间使用元数据或刷新现有元数据,除了确保密钥可访问所需的常规 PME 设置外,无需再进行其他设置(与读取加密数据集所需的配置完全相同)。

样本

以下样本展示了元数据创建过程,方法是使用名为 k1 的密钥作为“页脚 + 对象名称”密钥,使用名为 k2 的密钥作为用于为 temp 加密 MinMax 的密钥并为 city 创建 ValueList(保留为明文)。 请注意,您只能在由Apache Spark 支持的IBM Analytics Engine中的应用程序中使用Scala,而不能在watsonx.aiStudio 中使用。

  • 对于 Scala:

    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    // Configuring the JVM wide parameters
    val jvmComf = Map(
      "io.xskipper.parquet.mdlocation" -> md_base_location,
      "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION")
    Xskipper.setConf(jvmConf)
    // set the footer key
    val conf = Map(
      "io.xskipper.parquet.encryption.footer.key" -> "k1")
    xskipper.setConf(conf)
    xskipper
      .indexBuilder()
      // Add an encrypted MinMax index for temp
      .addMinMaxIndex("temp", "k2")
      // Add a plaintext ValueList index for city
      .addValueListIndex("city")
      .build(reader).show(false)
    
  • 对于 Python:

    xskipper = Xskipper(spark, dataset_path)
    # Add JVM Wide configuration
    jvmConf = dict([
      ("io.xskipper.parquet.mdlocation", md_base_location),
      ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
    Xskipper.setConf(spark, jvmConf)
    # configure footer key
    conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1")])
    xskipper.setConf(conf)
    # adding the indexes
    xskipper.indexBuilder() \
            .addMinMaxIndex("temp", "k1") \
            .addValueListIndex("city") \
            .build(reader) \
            .show(10, False)
    

如果想要将“页脚 + 对象名称”保留为明文方式(如上所述),那么需要添加配置参数:

  • 对于 Scala:

    // index the dataset
    val xskipper = new Xskipper(spark, dataset_path)
    // Configuring the JVM wide parameters
    val jvmComf = Map(
      "io.xskipper.parquet.mdlocation" -> md_base_location,
      "io.xskipper.parquet.mdlocation.type" -> "EXPLICIT_BASE_PATH_LOCATION")
    Xskipper.setConf(jvmConf)
    // set the footer key
    val conf = Map(
      "io.xskipper.parquet.encryption.footer.key" -> "k1",
      "io.xskipper.parquet.encryption.plaintext.footer" -> "true")
    xskipper.setConf(conf)
    xskipper
      .indexBuilder()
      // Add an encrypted MinMax index for temp
      .addMinMaxIndex("temp", "k2")
      // Add a plaintext ValueList index for city
      .addValueListIndex("city")
      .build(reader).show(false)
    
  • 对于 Python:

    xskipper = Xskipper(spark, dataset_path)
    # Add JVM Wide configuration
    jvmConf = dict([
    ("io.xskipper.parquet.mdlocation", md_base_location),
    ("io.xskipper.parquet.mdlocation.type", "EXPLICIT_BASE_PATH_LOCATION")])
    Xskipper.setConf(spark, jvmConf)
    # configure footer key
    conf = dict([("io.xskipper.parquet.encryption.footer.key", "k1"),
    ("io.xskipper.parquet.encryption.plaintext.footer", "true")])
    xskipper.setConf(conf)
    # adding the indexes
    xskipper.indexBuilder() \
            .addMinMaxIndex("temp", "k1") \
            .addValueListIndex("city") \
            .build(reader) \
            .show(10, False)
    

使用连接实现数据跳过(仅适用于 Spark 3)

如果使用 Spark 3,那么可以在如下连接查询中使用数据跳过:

SELECT *
FROM orders, lineitem 
WHERE l_orderkey = o_orderkey and o_custkey = 800

此示例显示了基于 TPC-H 基准模式(请参阅TPC-H)的星型模式,其中 lineitem 是一个事实表并包含大量记录,而 orders 表是一个维度表,与事实表相比,它包含的记录数量相对较少。

上述查询在包含少量记录的 orders 表上有一个谓词,这意味着当使用 min/max 时,数据跳过功能的作用不大。

动态数据跳过功能可以通过数据跳过来简化像上面那样的查询,方法是先根据 orders 表上的条件提取相关 l_orderkey 值,然后使用此功能将谓词下推到 l_orderkey,以便使用数据跳过索引来过滤不相关的对象。

要使用此功能,请启用以下优化规则。 请注意,您只能在由Apache Spark 支持的IBM Analytics Engine中的应用程序中使用Scala,而不能在watsonx.aiStudio 中使用。

  • 对于 Scala:

      import com.ibm.spark.implicits.
    
      spark.enableDynamicDataSkipping()
    
  • 对于 Python:

        from sparkextensions import SparkExtensions
    
        SparkExtensions.enableDynamicDataSkipping(spark)
    

然后像往常一样使用 Xskipper API,并通过数据跳过来简化您的查询。

例如,在上述查询中,通过使用 min/max 对 l_orderkey 建立索引,便可以跳过 lineitem 表并提高查询性能。

支持旧元数据

Xskipper 无缝支持由 MetaIndexManager 创建的旧元数据。 旧元数据可用于跳过,因为下一次刷新操作会自动执行 Xskipper 元数据更新。

如果您在列出索引或运行 describeIndex 操作时在某个索引前面看到 DEPRECATED_SUPPORTED,那么不推荐使用但仍支持元数据版本,并且跳过将起作用。 下一次刷新操作将自动更新元数据。

父主题: Notebook 和脚本

Generative AI search and answer
These answers are generated by a large language model in watsonx.ai based on content from the product documentation. Learn more