错误思想
举个列子,当我们想要比较 一个 类型为 RDD[(Long, (String, Int))] 的RDD,让它先按Long分组,然后按int的值进行倒序排序,最容易想到的思维就是先分组,然后把Iterable 转换为 list,然后sortby,但是这样却有一个致命的缺点,就是Iterable 在内存中是一个指针,不占内存,而list是一个容器,占用内存,如果Iterable 含有元素过多,那么极易引起OOM
val cidAndSidCountGrouped: RDD[(Long, Iterable[(String, Int)])] = cidAndSidCount.groupByKey() // 4. 排序, 取top10 val result: RDD[(Long, List[(String, Int)])] = cidAndSidCountGrouped.map { case (cid, sidCountIt) => // sidCountIt 排序, 取前10 // Iterable转成容器式集合的时候, 如果数据量过大, 极有可能导致oom (cid, sidCountIt.toList.sortBy(-_._2).take(5)) }
首先,我们要知道,RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.这样能有效避免OOM的风险,但是RDD是全部排序,所以需要针对性的过滤Key值来进行排序
方法一 利用RDD排序特点
//把long(即key值)提取出来 val cids: List[Long] = categoryCountList.map(_.cid.toLong) val buffer: ListBuffer[(Long, List[(String, Int)])] = ListBuffer[(Long, List[(String, Int)])]() //根据每个key来过滤RDD for (cid <- cids) { /* List((15,(632972a4-f811-4000-b920-dc12ea803a41,10)), (15,(f34878b8-1784-4d81-a4d1-0c93ce53e942,8)), (15,(5e3545a0-1521-4ad6-91fe-e792c20c46da,8)), (15,(66a421b0-839d-49ae-a386-5fa3ed75226f,8)), (15,(9fa653ec-5a22-4938-83c5-21521d083cd0,8))) 目标: (9,List((199f8e1d-db1a-4174-b0c2-ef095aaef3ee,9), (329b966c-d61b-46ad-949a-7e37142d384a,8), (5e3545a0-1521-4ad6-91fe-e792c20c46da,8), (e306c00b-a6c5-44c2-9c77-15e919340324,7), (bed60a57-3f81-4616-9e8b-067445695a77,7))) */ val arr: Array[(String, Int)] = cidAndSidCount.filter(cid == _._1) .sortBy(-_._2._2) .take(5) .map(_._2) buffer += ((cid, arr.toList)) } buffer.foreach(println)
这样做也有缺点:即有多少个key,就有多少个Job,占用资源
方法二 利用TreeSet自动排序特性
def statCategoryTop10Session_3(sc: SparkContext, categoryCountList: List[CategroyCount], userVisitActionRDD: RDD[UserVisitAction]) = { // 1. 过滤出来 top10品类的所有点击记录 // 1.1 先map出来top10的品类id val cids = categoryCountList.map(_.cid.toLong) val topCategoryActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(action => cids.contains(action.click_category_id)) // 2. 计算每个品类 下的每个session 的点击量 rdd ((cid, sid) ,1) val cidAndSidCount: RDD[(Long, (String, Int))] = topCategoryActionRDD .map(action => ((action.click_category_id, action.session_id), 1)) // 使用自定义分区器 重点理解分区器的原理 .reduceByKey(new CategoryPartitioner(cids), _ + _) .map { case ((cid, sid), count) => (cid, (sid, count)) } // 3. 排序取top10 //因为已经按key分好了区,所以用Mappartitions ,在每个分区中新建一个TreeSet即可 val result: RDD[(Long, List[SessionInfo])] = cidAndSidCount.mapPartitions((it: Iterator[(Long, (String, Int))]) => { //new 一个TreeSet,并同时指定排序规则 var treeSet: mutable.TreeSet[CategorySession] = new mutable.TreeSet[CategorySession]()(new Ordering[CategorySession] { override def compare(x: CategorySession, y: CategorySession): Int = { if (x.clickCount >= y.clickCount) -1 else 1 } }) var id = 0l iter.foreach({ case (l, session) => { id = l treeSet.add(session) if (treeSet.size > 10) treeSet = treeSet.take(10) } }) Iterator(id, treeSet) }) result.collect.foreach(println) Thread.sleep(1000000) } } /* 根据传入的key值来决定分区号,让相同key进入相同的分区,能够避免多次shuffle */ class CategoryPartitioner(cids: List[Long]) extends Partitioner { // 用cid索引, 作为将来他的分区索引. private val cidWithIndex: Map[Long, Int] = cids.zipWithIndex.toMap // 返回集合的长度 override def numPartitions: Int = cids.length // 根据key返回分区的索引 override def getPartition(key: Any): Int = { key match { // 根据品类id返回分区的索引! 0-9 case (cid: Long, _) => cidWithIndex(cid) } } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
Spark,数据排序,OOM
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。
更新日志
- 【雨果唱片】中国管弦乐《鹿回头》WAV
- APM亚流新世代《一起冒险》[FLAC/分轨][106.77MB]
- 崔健《飞狗》律冻文化[WAV+CUE][1.1G]
- 罗志祥《舞状元 (Explicit)》[320K/MP3][66.77MB]
- 尤雅.1997-幽雅精粹2CD【南方】【WAV+CUE】
- 张惠妹.2007-STAR(引进版)【EMI百代】【WAV+CUE】
- 群星.2008-LOVE情歌集VOL.8【正东】【WAV+CUE】
- 罗志祥《舞状元 (Explicit)》[FLAC/分轨][360.76MB]
- Tank《我不伟大,至少我能改变我。》[320K/MP3][160.41MB]
- Tank《我不伟大,至少我能改变我。》[FLAC/分轨][236.89MB]
- CD圣经推荐-夏韶声《谙2》SACD-ISO
- 钟镇涛-《百分百钟镇涛》首批限量版SACD-ISO
- 群星《继续微笑致敬许冠杰》[低速原抓WAV+CUE]
- 潘秀琼.2003-国语难忘金曲珍藏集【皇星全音】【WAV+CUE】
- 林东松.1997-2039玫瑰事件【宝丽金】【WAV+CUE】