博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark购物篮分析:关联规则挖掘
阅读量:2492 次
发布时间:2019-05-11

本文共 8093 字,大约阅读时间需要 26 分钟。

1、

2、

3、Spark购物篮分析

过程分析:

import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport scala.collection.mutable.ListBufferobject FindAssociationRules {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("market-basket-analysis").setMaster("local")    val sc = new SparkContext(sparkConf)    val input = "file:///media/chenjie/0009418200012FF3/ubuntu/mba2.txt"    val output = "file:///media/chenjie/0009418200012FF3/ubuntu/mba2"    val transactions = sc.textFile(input)    /*    * a,b,c      a,b,d      b,c      b,c    * */   /* val tests = transactions.flatMap(line => {      println("line=" + line)      val items = line.split(",").toList      // Converting to List is required because Spark doesn't partition on Array (as returned by split method)      //(0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)      println("result=" + items.combinations(2).mkString(","))      val list = ListBuffer.empty[List[String]]      for(i <- 0 to items.size){        list.++= (items.combinations(i).toBuffer)      }      list.toList.filter(xs => !xs.isEmpty)    })    tests.foreach(println)*/    val patterns = transactions.flatMap(line => {      val items = line.split(",").toList      // Converting to List is required because Spark doesn't partition on Array (as returned by split method)      (0 to items.size) flatMap items.combinations filter (xs => !xs.isEmpty)      /*        combinations(n: Int): Iterator[List[A]] 取列表中的n个元素进行组合,返回不重复的组合列表,结果一个迭代器       */      /*      * 上句话等价于:      * val list = ListBuffer.empty[List[String]]        for(i <- 0 to items.size){          list.++= (items.combinations(i).toBuffer)        }        list.toList.filter(xs => !xs.isEmpty)      * 即对a,b,c      * 先取0个元素进行组合,得到不重复的组合列表[],加入list中,list为[[]]      * 再取1个元素进行组合,得到不重复的组合列表[[a],[b],[c]],加入list中,list为[[a],[b],[c]]      * 再取2个元素进行组合,得到不重复的组合列表[[a,b],[a,c],[b,c]],加入list中,list为[[],[a],[b],[c],[a,b],[a,c],[b,c]]      * 再取3个元素进行组合,得到不重复的组合列表[[a,b,c]],加入list中,list为[[],[a],[b],[c],[a,b],[a,c],[b,c],[a,b,c]]      * 然后对其进行过滤,去掉其中为空的列表      * list为[[a],[b],[c],[a,b],[a,c],[b,c],[a,b,c]]      * 最后回到外层的flatMap,会将列表的列表拍扁成列表:      * [a],[b],[c],[a,b],[a,c],[b,c],[a,b,c]      * */    }).map((_, 1))    //到最外面的map,将列表映射为(列表,1)的键值对    /*    * (List(a),1)      (List(b),1)      (List(c),1)      (List(a, b),1)      (List(a, c),1)      (List(b, c),1)      (List(a, b, c),1)      (List(a),1)      (List(b),1)      (List(d),1)      (List(a, b),1)      (List(a, d),1)      (List(b, d),1)      (List(a, b, d),1)      (List(b),1)      (List(c),1)      (List(b, c),1)      (List(b),1)      (List(c),1)      (List(b, c),1)    * */    val combined = patterns.reduceByKey(_ + _)//合并key值相同的键值对    /*    * (List(a, b, c),1)      (List(b),4)      (List(a, b, d),1)      (List(b, d),1)      (List(a, b),2)      (List(a),2)      (List(a, d),1)      (List(b, c),3)      (List(a, c),1)      (List(c),3)      (List(d),1)    *    * */    /*下面开始生成子模式    给定一个频繁模式:(K=List
,V=Frequency) 创建如下的子模式(K2,V2) (K2=K=List
,V2=Tuple(null,V)) 即把K作为K2,Tuple(null,V))作为V2 (K2=List
),V2=Tuple(K,V)) (K2=List
),V2=Tuple(K,V)) ... (K2=List
),V2=Tuple(K,V)) 即把K的每一个元素拿掉一次作为K2,Tuple(K,V))作为V2 */ val subpatterns = combined.flatMap(pattern => { //pattern:(List(a, b, c),1) val result = ListBuffer.empty[Tuple2[List[String], Tuple2[List[String], Int]]] result += ((pattern._1, (Nil, pattern._2)))//即把K作为K2,Tuple(null,V))作为V2 val sublist = for { i <- 0 until pattern._1.size xs = pattern._1.take(i) ++ pattern._1.drop(i + 1) if xs.size > 0 } yield (xs, (pattern._1, pattern._2)) //上段代码等价于: /* for(i <- 0 to pattern._1.size){ val sublist = pattern._1.take(i) ++ pattern._1.drop(i + 1) if(sublist.size > 0) result += new Tuple2(sublist,new Tuple2(pattern._1,pattern._2)) } 即每次去掉一个元素,将剩下的元素集合作为K2 */ result ++= sublist result.toList }) /* * (List(a, b, c),(List(),1)) (List(b, c),(List(a, b, c),1)) (List(a, c),(List(a, b, c),1)) (List(a, b),(List(a, b, c),1)) (List(b),(List(),4)) (List(a, b, d),(List(),1)) (List(b, d),(List(a, b, d),1)) (List(a, d),(List(a, b, d),1)) (List(a, b),(List(a, b, d),1)) (List(b, d),(List(),1)) (List(d),(List(b, d),1)) (List(b),(List(b, d),1)) (List(a, b),(List(),2)) (List(b),(List(a, b),2)) (List(a),(List(a, b),2)) (List(a),(List(),2)) (List(a, d),(List(),1)) (List(d),(List(a, d),1)) (List(a),(List(a, d),1)) (List(b, c),(List(),3)) (List(c),(List(b, c),3)) (List(b),(List(b, c),3)) (List(a, c),(List(),1)) (List(c),(List(a, c),1)) (List(a),(List(a, c),1)) (List(c),(List(),3)) (List(d),(List(),1)) * */ val rules = subpatterns.groupByKey() /* * (List(a, b, c),CompactBuffer((List(),1))) (List(b),CompactBuffer((List(),4), (List(b, d),1), (List(a, b),2), (List(b, c),3))) (List(a, b),CompactBuffer((List(a, b, c),1), (List(a, b, d),1), (List(),2))) (List(b, d),CompactBuffer((List(a, b, d),1), (List(),1))) (List(a, b, d),CompactBuffer((List(),1))) (List(a),CompactBuffer((List(a, b),2), (List(),2), (List(a, d),1), (List(a, c),1))) (List(a, d),CompactBuffer((List(a, b, d),1), (List(),1))) (List(b, c),CompactBuffer((List(a, b, c),1), (List(),3))) (List(a, c),CompactBuffer((List(a, b, c),1), (List(),1))) (List(c),CompactBuffer((List(b, c),3), (List(a, c),1), (List(),3))) (List(d),CompactBuffer((List(b, d),1), (List(a, d),1), (List(),1))) * */ val assocRules = rules.map(in => { println("in=" + in) //in:(List(b),CompactBuffer((List(),4), (List(b, d),1), (List(a, b),2), (List(b, c),3))) val fromCount = in._2.find(p => p._1 == Nil).get//找到[b]的frequency:即(List(),4) println("fromCount=" + fromCount) val toList = in._2.filter(p => p._1 != Nil).toList//将规则集合去掉空的 println("toList=" + toList) //toList:CompactBuffer((List(b, d),1), (List(a, b),2), (List(b, c),3)) if (toList.isEmpty) Nil else { val result = for { t2 <- toList confidence = t2._2.toDouble / fromCount._2.toDouble difference = t2._1 diff in._1 //diff(that: collection.Seq[A]): List[A] 保存列表中那些不在另外一个列表中的元素,即从集合中减去与另外一个集合的交集 } yield (((in._1, difference, confidence))) result } //等价于 /*if (toList.isEmpty) Nil else { val result = ListBuffer.empty[Tuple3[List[String],List[String],Double]] for(t2 <- toList){ println("t2=" + t2) //t2:(List(b, d),1) val confidence = t2._2.toDouble / fromCount._2.toDouble val difference = t2._1 diff in._1 println(Tuple3(in._1, difference, confidence)) result.+=(Tuple3(in._1, difference, confidence)) } result }*/ }) assocRules.foreach(println) /* List() List((List(b),List(d),0.25), (List(b),List(a),0.5), (List(b),List(c),0.75)) List((List(a, b),List(c),0.5), (List(a, b),List(d),0.5)) List((List(b, d),List(a),1.0)) List() List((List(a),List(b),1.0), (List(a),List(d),0.5), (List(a),List(c),0.5)) List((List(a, d),List(b),1.0)) List((List(b, c),List(a),0.3333333333333333)) List((List(a, c),List(b),1.0)) List((List(c),List(b),1.0), (List(c),List(a),0.3333333333333333)) List((List(d),List(b),1.0), (List(d),List(a),1.0)) * */ val formatResult = assocRules.flatMap(f => { f.map(s => (s._1.mkString("[", ",", "]"), s._2.mkString("[", ",", "]"), s._3)) }) /* * ([b],[d],0.25) ([b],[a],0.5) ([b],[c],0.75) ([a,b],[c],0.5) ([a,b],[d],0.5) ([b,d],[a],1.0) ([a],[b],1.0) ([a],[d],0.5) ([a],[c],0.5) ([a,d],[b],1.0) ([b,c],[a],0.3333333333333333) ([a,c],[b],1.0) ([c],[b],1.0) ([c],[a],0.3333333333333333) ([d],[b],1.0) ([d],[a],1.0) * */ formatResult.saveAsTextFile(output) sc.stop() }}

你可能感兴趣的文章
在eclipse上用tomcat部署项目404解决方案
查看>>
web.xml 配置中classpath: 与classpath*:的区别
查看>>
suse如何修改ssh端口为2222?
查看>>
详细理解“>/dev/null 2>&1”
查看>>
suse如何创建定时任务?
查看>>
suse搭建ftp服务器方法
查看>>
centos虚拟机设置共享文件夹并通过我的电脑访问[增加smbd端口修改]
查看>>
文件拷贝(IFileOperation::CopyItem)
查看>>
MapReduce的 Speculative Execution机制
查看>>
大数据学习之路------借助HDP SANDBOX开始学习
查看>>
Hadoop基础学习:基于Hortonworks HDP
查看>>
为什么linux安装程序 都要放到/usr/local目录下
查看>>
Hive安装前扫盲之Derby和Metastore
查看>>
永久修改PATH环境变量的几种办法
查看>>
大数据学习之HDP SANDBOX开始学习
查看>>
Hive Beeline使用
查看>>
Centos6安装图形界面(hdp不需要,hdp直接从github上下载数据即可)
查看>>
CentOS7 中把yum源更换成163源
查看>>
关于yum Error: Cannot retrieve repository metadata (repomd.xml) for repository:xxxxxx.
查看>>
linux下载github中的文件
查看>>