如何在Spark键值对数据中,对指定的Key进行输出/筛选/模式匹配

在用键值对RDD进行操作时,经常会遇到不知道如何筛选出想要数据的情况,这里提供了一些解决方法



1、对固定的Key数据进行查询

代码说明:

  1. SparkConf:配置 Spark 应用程序的一些基本信息。
  2. SparkContext:创建 Spark 上下文以在 Spark 中执行操作。
  3. parallelize:生成一个包含多个键值对的初始 RDD。
  4. filter:使用 filter 方法筛选出 key 等于指定值的元素。
  5. collect:收集结果并在驱动程序上进行输出。
  6. foreach:用来遍历和打印过滤后的结果。
import org.apache.spark.{SparkConf, SparkContext}

object KeyFilterExample {
  def main(args: Array[String]): Unit = {
    // 初始化 SparkContext
    val conf = new SparkConf().setAppName("Key Filter Example").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 创建一个示例 RDD,包含 key-value 键值对
    val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3), ("a", 4), ("b", 5)))

    // 定义要筛选的特定 key
    val specifiedKey = "a"

    // 使用 filter 操作输出指定的 key 值
    val filteredRdd = rdd.filter { case (key, _) => key == specifiedKey }

    // 输出结果
    filteredRdd.collect().foreach { case (key, value) =>
      println(s"Key: $key, Value: $value")
    }

    // 停止 SparkContext
    sc.stop()
  }
}



2、对不固定的Key数据进行模糊查询

代码说明:

  1. SparkConfSparkContext:与之前示例相同,用于初始化 Spark 应用。
  2. parallelize:生成一个包含多个键值对的初始 RDD。
  3. filter:使用 Scala 的模式匹配功能来筛选出以字母 'a' 开头的 keys。
    • case (key, _) if key.startsWith("a"):当 key 以 'a' 开头时,返回 true,否则返回 false
  4. collect:收集结果并在驱动程序上进行输出。
  5. foreach:遍历并打印过滤后的结果。
import org.apache.spark.{SparkConf, SparkContext}

object PatternMatchingKeyExample {
  def main(args: Array[String]): Unit = {
    // 初始化 SparkContext
    val conf = new SparkConf().setAppName("Pattern Matching Key Example").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 创建一个示例 RDD,包含 key-value 键值对
    val rdd = sc.parallelize(Seq(("apple", 1), ("banana", 2), ("apricot", 3),
                                  ("berry", 4), ("avocado", 5)))

    // 使用 filter 操作与模式匹配筛选以 'a' 开头的 keys
    val patternMatchedRdd = rdd.filter { 
      case (key, _) if key.startsWith("a") => true
      case _ => false
    }

    // 输出结果
    patternMatchedRdd.collect().foreach { case (key, value) =>
      println(s"Key: $key, Value: $value")
    }

    // 停止 SparkContext
    sc.stop()
  }
}