• <li id="oiayd"><ruby id="oiayd"></ruby></li>

    <acronym id="oiayd"><strong id="oiayd"></strong></acronym>

    <acronym id="oiayd"></acronym>
    <track id="oiayd"><ruby id="oiayd"></ruby></track>
  • <track id="oiayd"><ruby id="oiayd"><menu id="oiayd"></menu></ruby></track>

  • 400-650-7353
    您所在的位置:首页 > IT干货资料 > 大数据 > 【大数据基础知识】Spark常用算子(二)

    【大数据基础知识】Spark常用算子(二)

    • 发布: 大数据培训
    • 来源:大数据干货资料
    • 2021-07-28 10:07:22
    • 阅读()
    • 分享
    • 手机端入口

    1. mapValues

    mapValues算子 ,作用于 [K,V] 格式的RDD上,并且只对V(Value)进行操作,Key值保持不变。

    (1)将[K,V] 格式的List转换为[K,V] 格式的RDD。

    scala> val rdd = sc.makeRDD(List(("Tom",100),("Mike",80)))

    rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[2] at makeRDD at :24

    (2)使用mapValues算子,将value值乘以100,key值保持不变

    scala> val rdd2=rdd.mapValues(_*100)

    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at mapValues at :26

    (3)使用collect算子回收,查看结果

    scala> rdd2.collect

    res0: Array[(String, Int)] = Array((Tom,10000), (Mike,8000))

    2. mapPartitions

    作用于RDD上的每一个分区,传递的函数相当于一个迭代器,有几个分区,就会迭代几次。

    object Test1 {

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf()

    .setMaster("local[*]")

    .setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    val rdd=sc.makeRDD(List(1,2,3,4,5,6),3);

    val values: RDD[Int] = rdd.mapPartitions(t => {

    t.map(_ * 10)

    })

    //打印输出结果

    values.foreach(println)

    }

    }

    使用上面的代码进行测试。输出结果如下:

    可以看到,因为设置了3个分区,所以相应启动了3个任务,在每个分区上进行迭代计算。

    3. filter

    filter算子过滤出所有的满足条件的元素。

    另外fliter算子不会改变分区的数量,所以经过过滤后,即使某些分区没有数据了,但是分区依然存在的。

    scala> val rdd1 = sc.makeRDD(List(1,2,3,4,5,6),3)

    rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at makeRDD at :24

    scala> val rdd2 = rdd1.filter(_>3)

    rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at filter at :26

    scala> rdd2.partitions.size

    res3: Int = 3

    4. sortBy

    sortBy算子按照指定条件进行排序。

    我们使用下面的代码进行测试:

    object Test2 {

    def main(args: Array[String]): Unit = {

    val conf=new SparkConf()

    .setMaster("local[*]")

    .setAppName(this.getClass.getSimpleName)

    val sc=new SparkContext(conf)

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("Tom", 80), ("Mike", 90), ("Mary", 85),("John",60)))

    //按value值升序排列

    val res1: RDD[(String, Int)] = rdd.sortBy(_._2)

    res1.collect.foreach(println)

    // 按value值降序排列

    val res2: RDD[(String, Int)] = rdd.sortBy(_._2, false)

    res2.collect.foreach(println)

    }

    }

    升序输出的结果如下:

    降序输出的结果如下:

    有一点需要说明的是,输出结果前,要使用collect算子把结果回收到本地。因为数据是分散在集群各节点的,如果不回收,看到的结果可能是不正确的。

    文章“【大数据基础知识】Spark常用算子(二)”已帮助

    >>本文地址:http://www.seyoho.com/zhuanye/2021/69463.html

    THE END  

    声明:本站稿件版权均属中公教育优就业所有,未经许可不得擅自转载。

    1 您的年龄

    2 您的学历

    3 您更想做哪个方向的工作?

    获取测试结果
    • 大前端大前端
    • 大数据大数据
    • 互联网营销互联网营销
    • JavaJava
    • Linux云计算Linux
    • Python+人工智能Python
    • 嵌入式物联网嵌入式
    • 全域电商运营全域电商运营
    • 软件测试软件测试
    • 室内设计室内设计
    • 平面设计平面设计
    • 电商设计电商设计
    • 网页设计网页设计
    • 全链路UI/UE设计UI设计
    • VR/AR游戏开发VR/AR
    • 网络安全网络安全
    • 新媒体与短视频运营新媒体
    • 直播带货直播带货
    • 智能机器人软件开发智能机器人
     

    快速通道fast track

    近期开班时间TIME

    日韩一级a片无卡顿

  • <li id="oiayd"><ruby id="oiayd"></ruby></li>

    <acronym id="oiayd"><strong id="oiayd"></strong></acronym>

    <acronym id="oiayd"></acronym>
    <track id="oiayd"><ruby id="oiayd"></ruby></track>
  • <track id="oiayd"><ruby id="oiayd"><menu id="oiayd"></menu></ruby></track>