一直觉得Spark的算子调用超级厉害,弄不懂怎么样让传入map的函数作用在一个个值之上的,比如Spark的算子调用模式:
rdd.map(word=>(word,1)).reduceByKey((a,b)=>a+b).foreach(println)
两年来,我一直搞不明白,为何xxx.map(word=>(word,1))就可以把xxx里面的内容全部都变成map返回。可能是因为做惯了面向对象java,对这种面相函数的编程有点晕。现在想想,真的是太简单的,不就是:word=>(word,1) 这个匿名函数处理xxx对象中的属性嘛!
这里用Scala模仿SparkContext 的map,filter 算子调用写一个例子,让我们体会下Scala面向函数编程的快感:
import scala.collection.mutable.ListBuffer/*** 定义SparkContext*/class SparkContext {//传入一个k可变的List,创建RDDdef creatRDD(list:ListBuffer[Int]):RDD={return new RDD(list)}}/*** 定义RDD* @param list*/class RDD(list:ListBuffer[Int]){/*** 定义map算子* @param f map算子传入的处理函数* @return*/def map(f:Int=>Int):RDD={var li = ListBuffer[Int]();for(i<-list){val j =f(i)li+=j;}return new RDD(li);}/*** 定义过来filter算子* @param f* @return*/def filter(f:Int=>Boolean):RDD={var li = ListBuffer[Int]();for(i<-list){val j =f(i)if(j){li+=i;}}return new RDD(li);}/*** 遍历方法*/def show():Unit={for(l<-list){println(l)}}}object SparkContext{def apply(): SparkContext = new SparkContext()def main(args: Array[String]): Unit = {var sc = SparkContext()val list = ListBuffer(1,2,3,4,5)val rdd = sc.creatRDD(list)println("--------------初始值------------")rdd.show();println("--------------每个元素+1------------")val rdd2 = rdd.map((x:Int)=>x+1);rdd2.show()println("---------------每个元素+9-----------")val rdd3 = rdd.map((x:Int)=>x+9);rdd3.show()println("----------------取出大于等于4的元素----------")val rdd4 = rdd.filter((x:Int)=>x>=4)rdd4.show()}}
当然,可能用的参数不太正确,我用的是可变的List,不过也成功实现了map和filter的调用模式。
运行结果如下
--------------初始值------------12345--------------每个元素+1------------23456---------------每个元素+9-----------1011121314----------------取出大于等于4的元素----------45Process finished with exit code 0

