Spark自定义排序与分区
2020-12-10 03:18
标签:sorted style set key spark col 出现 继承 底层原理 随着信息时代的不断发展,数据成了时代主题,今天的我们徜徉在数据的海洋中;由于数据的爆炸式增长,各种数据计算引擎如雨后春笋般冲击着这个时代。作为时下最主流的计算引擎之一 Spark也是从各方面向时代展示自己的强大能力。Spark无论是在数据处理还是数据分析、挖掘方面都展现出了强大的主导能力。其分布式计算能力受到越来越多的青睐。本文将介绍spark的排序以及分区。 在spark中定义了封装了很多高级的api,在我们的日常开发中使用这些api能获得不少的便利。但是有的时候这些默认的规则并不足以实现我们的目的,这时候需要我们了解其底层原理,编写一套适合我们需求的处理逻辑。下面通过代码简单介绍一下spark如何自定义排序。 对于自定义排序有多种方式实现: 1、User类继承Ordered使User类变成可排序的类。在spark中由于我们虽然测试是在本地测试,但是他会模拟集群模式,所以我们自定义的object在运行时会shuffle有网络传输会涉及序列化的问题。所以需要同时继承Serializable。 2、使用case class样例类: 不需要继承序列化类,case class默认已经实现序列化。 3、定义样例类隐式排序规则 主程序代码: 4、某些特殊数据类型不需要自定义,使用原生api更方便。 5、将排序规则添加到隐士转换中 1、combineByKey 在reduceByKey、groupByKey等算子都基于combineByKey算子实现。这是一个底层的算子,可以自定义一些规则,比较灵活。 参数解释: (1)、相同key的value放入一个分区 (2)、局部聚合 (3)、全局聚合 (4)、分区数(可以设置分区数) (5)、是否进行map端局部聚合 (6)、序列化参数 conbineByKey是一个较为底层的api,一般情况下可能不会用到它,但是当一些高级api满足不了我们的需求的时候它给我们提供了解决便利。 2、自定义分区器 在spark计算中不可避免的会涉及到shuffle,数据会根据不同的规则有分区器分发到不同的分区中。所以分区器决定了上游的数据发送到哪个下游。以不同专业学生数据计算不同专业的学生成绩。分组取topN : (1)、自定义分区器 (2)、使用自定义分区器 通过这样自定义分区器后,数据通过shuffle之后每个分区的数据就是一个专业的学生数据,对这个分区的数据排序后取出前N个就是所需结果了。但是这个程序中还是会出现一个问题,当数据量太大的时候可能会导致内存溢出的情况,因为我们是将数据放到了list中进行排序,而list是存放于内存中。所以会导致内存溢出。那么怎么才能避免这个情况呢。我们可以在mapPartitions内部定义一个集合,不加载所有数据。,每次将这个集合排序后最小的值移除,通过多次循环后最终集合中剩下的就是需要的结果。 无论是排序还是分区,在spark中都封装了高级的api共我们使用,但是他不会适用于所有情况,只会适用与部分情况,而通过对这些api的底层实现了解,通过自定义规则可以编辑一套适合于我们需求的程序。这样一来可以大大提高效率。没有什么能适配万物,随机应变才是取胜之道。 Spark自定义排序与分区 标签:sorted style set key spark col 出现 继承 底层原理 原文地址:https://www.cnblogs.com/lsbigdata/p/10933494.htmlSpark自定义排序与分区
前言:
一、Spark自定义排序
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object CustomSort1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CustomSort1").setMaster("local[*]")
val sc = new SparkContext(conf)
//排序规则:首先按照颜值的降序,如果颜值相等,再按照年龄的升序
val users= Array("laoduan 30 99", "laozhao 29 9999", "laozhang 28 98", "laoyang 28 99")
//将Driver端的数据并行化变成RDD
val lines: RDD[String] = sc.parallelize(users)
//切分整理数据
val userRDD: RDD[User] = lines.map(line => {
val fields = line.split(" ")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toInt
//(name, age, fv)
new User(name, age, fv)
})
//不满足要求
//tpRDD.sortBy(tp => tp._3, false)
//将RDD里面装的User类型的数据进行排序
val sorted: RDD[User] = userRDD.sortBy(u => u)
val r = sorted.collect()
println(r.toBuffer)
sc.stop()
}
}
class User(val name: String, val age: Int, val fv: Int) extends Ordered[User] with Serializable {
override def compare(that: User): Int = {
if(this.fv == that.fv) {
this.age - that.age
} else {
-(this.fv - that.fv)
}
}
override def toString: String = s"name: $name, age: $age, fv: $fv"
}
case class Man(age: Int, fv: Int) extends Ordered[Man] {}
object SortRules {
implicit object OrderingUser extends Ordering[User] {
override def compare(x: User, y: User): Int = {
if(x.fv == y.fv) {
x.age - y.age
} else {
y.fv - x.fv
}
}
}
}
//切分整理数据
val tpRDD: RDD[(String, Int, Int)] = lines.map(line => {
val fields = line.split(" ")
val name = fields(0)
val age = fields(1).toInt
val fv = fields(2).toInt
(name, age, fv)
})
//排序(传入了一个排序规则,不会改变数据的格式,只会改变顺序)
import SortRules.OrderingUser
val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => User(tp._2, tp._3))
//充分利用元组的比较规则,元组的比较规则:先比第一,相等再比第二个
val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => (-tp._3, tp._2))
//Ordering[(Int, Int)]最终比较的规则格式
//on[(String, Int, Int)]未比较之前的数据格式
//(t =>(-t._3, t._2))怎样将规则转换成想要比较的格式
implicit val rules = Ordering[(Int, Int)].on[(String, Int, Int)](t =>(-t._3, t._2))
val sorted: RDD[(String, Int, Int)] = tpRDD.sortBy(tp => tp)
二、Spark自定义分区器
Rdd.combineByKey(x=>x,(m:Int,n:Int)=>m+n,(a:Int,B:Int)=>a+b,new HashPartition(2),true,null)
//自定义分区器:majors:专业集合
class MajorParitioner(majors: Array[String]) extends Partitioner {
//相当于主构造器(new的时候回执行一次)
//用于存放规则的一个map
val rules = new mutable.HashMap[String, Int]()
var i = 0
for(major majors) {
//rules(major) = i
rules.put(major, i)
i += 1
}
//返回分区的数量(下一个RDD有多少分区)
override def numPartitions: Int = majors.length
//根据传入的key计算分区标号
//key是一个元组(String, String)
override def getPartition(key: Any): Int = {
//获取key
val major= key.asInstanceOf[(String, String)]._1
//根据规则计算分区编号
rules(major)
}
}
//调用自定义的分区器,并且按照指定的分区器进行分区
val majorPatitioner = new MajorParitioner(subjects);
//partitionBy按照指定的分区规则进行分区
//调用partitionBy时RDD的Key是(String, String)
val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(majorPatitioner )
//如果一次拿出一个分区(可以操作一个分区中的数据了)
val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
//将迭代器转换成list,然后排序,在转换成迭代器返回
it.toList.sortBy(_._2).reverse.take(topN).iterator
})
//
val r: Array[((String, String), Int)] = sorted.collect()
三、总结