PageRank怎么使用

40次阅读
没有评论

这篇文章主要讲解了“PageRank 怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“PageRank 怎么使用”吧!

PageRank 是执行多次连接的一个迭代算法,因此它是 RDD 分区操作的很好 demo,算法维护两个数据集

(pageID,listList) 包含每个页面的相邻页面列表。(pageID,rank) 包含每个页面的当前排序值,pageRank 计算过程大致如下:将每个页面的排序值初始化为 1.0 在每次迭代中,对页面 p,向其每个相邻页面 (有直接连接的页面) 发松一个值为 rank(p)/numNeighbors(p)的贡献值。将每个页面的排序值设定为 0.15 + 0.85 *contributionsReceived 其中 2 跟 3 会重复循环几次,在此过程中算法会逐渐收敛于每个页面的实际 PageRank 值,实际操作中一般迭代 10 次。

package com.sowhat.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
* links =(pageID,LinkList)
* ranks = (pageID,rank)
**/
object MyPageRank {
 def main(args: Array[String]): Unit = {
   val conf: SparkConf = new SparkConf().setMaster( local[*] ).setAppName(pagerank)

   // 创建 SparkContext,该对象是提交 spark App 的入口
   val sc = new SparkContext(conf)

   val links: RDD[(String, Seq[String])] = sc.objectFile[(String, Seq[String])](filepwd).partitionBy(new HashPartitioner(100)).persist()
   var ranks: RDD[(String, Double)] = links.mapValues(x = 1.0)

   for (i - 0 until 10) {
     val totalRDD: RDD[(String, (Seq[String], Double))] = links.join(ranks)
     val contributions: RDD[(String, Double)] = totalRDD.flatMap(
       {
         case (pageID, (links, rank)) = links.map(dest = (dest, rank / links.size))
       }
     )
     ranks = contributions.reduceByKey(_ + _).mapValues(v = 0.15 + 0.85 * v)
   }
   ranks.saveAsTextFile(ranks)
 }

}

 

算法从 ranksRDD 的每个元素的值初始化为 1.0 开始,然后每次迭代都都不断的更新 ranks 值,其中主要优化部分如下。

linksRDD 每次迭代都会跟 ranks 发生连接操作,因此将大数据集 links 进行 partitionBy 会节约相当多的网络通信优化开销。跟上面的原因一样,用 persist 可以将数据保存早内存中,以供每次迭代使用。我们在第一次创建 ranks 时,我们用 mapValues 而不是 map() 来保留父 RDD links 的分区方式,这样对第一次连接操作开销减少很多。循环体中 reduceByKey 后使用 mapValues 因为 reduceByKey 已经是哈希分区了,下一次迭代时候效率更快。

建议:为最大化分区相关优化潜在作用,在无需更改元素键的时候尽量使用 mapValues 或 flatMapValues。

本文使用 mdnice 排版

感谢各位的阅读,以上就是“PageRank 怎么使用”的内容了,经过本文的学习后,相信大家对 PageRank 怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!