本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
转载自夜明的孤行灯
本文链接地址: https://www.huangyunkun.com/2014/12/31/translate-from-mapreduce-to-apache-spark/
Spark是一款通用集群计算框架,和Hadoop的MapReduce类似。由于其提供的抽象更简单,性能和功能上比Hadoop强不少。
它已经越来越流行了。
如果是新的项目,或者是为了学习,那么选择Spark完全没问题。
不过对于一些使用了MapReduce的项目来说,迁移就稍微复杂一些了。
Hadoop自身
Hadoop的使用在不断扩大,但同时越来越多的实践证实了MapReduce并不是通用计算范例。
Hadoop的架构本身为其他可能的替代方案提供了场所,比如Impala项目等等。
而对于Hadoop来说,有一部分Hadoop的实现本身和MapReduce本身的抽象并不一致。
- Mapper和Reducer总是使用键值对作为输入输出
- Reducer处理的级别是键
- Mapper和Reducer的对象的生命周期跨越了多个map()和reduce(),同时还支持了setup()和cleanup()
Spark
Spark也是众多替代方案中的一种。
但是对于已经部署在生产环境的项目而言,一句替代方案是不够的,对于一些实时计算的系统更是这样。
好在利用Spark实现类似MapReduce的模型是完全可行的。同时实现本身还可以更简单,并在大部分情况下更快。
对于MapReduce模型本身,使用Spark来实现反而显得更亲近,毕竟Scala的编码风格和API对于本身源于LISP的MapReduce抽象更接近。
键值对和元组
从最基础的例子来看,如果需要计算一个大型文本文件每一行的长度,对于Hadoop MapReduce来说由于输出是键值对,那么就会使用长度作为键,以1为值。
publicclass LineLengthMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
protectedvoidmap(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(line.getLength()),new IntWritable(1));
}
}
LineLengthMapper
由TextInputFormat
提供了输入,对于每一个键值对而言,一行为值,而位置为键。
当然,虽然我很少用到,但是我相信有些任务还是需要用到这个键的。
在Spark
中,对应的功能看起来就是这样的
lines.map(line => (line.length,1))
Spark的核心抽象是RDD,这里的输入仅仅是字符串,而不是键值对。而Scala中的元组(Tuple)和上面的键值对很类似。
map()的输出是一个(int,int)的元组。当RDD包含元组的时候,可以调用一些额外的方法,比如reduceByKey()
,这也是实现MapReduce的一个重要部分。
Spark的reduceByKey
为了得到长度的输出,Hadoop MapReduce还需要一个Reducer来处理。
publicclass LineLengthReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
protectedvoidreduce(IntWritable length, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum =0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(length,new IntWritable(sum));
}
}
而在Spark中对应的是
val counts =lines.map(line => (line.length,1)).reduceByKey(_ + _)
Spark的API中包含了reduce()
方法,不过这个方式是将键值对处理为一个单值,这不是Hadoop MapReduce的行为。相比之下reduceByKey
更接近。
setup() and cleanup()
在MapReduce中,mapper和reducer都可以包含一个setup和cleanup方法。
比如处理数据库的连接
publicclass SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Connection dbConnection;
protectedvoidsetup(Context context) {
dbConnection = ...;
}
protectedvoidcleanup(Context context) {
dbConnection.close();
}
}
而Spark的map()和flatMap()并没有这种东西,我暂时采用的办法是在调用map()前后处理,比如
lines.mapPartitions { valueIterator =>
if (valueIterator.isEmpty) {
Iterator[...]()
}else {
val dbConnection = ...
valueIterator.map { item =>
val transformedItem = ...
if (!valueIterator.hasNext) {
dbConnection.close()
}
transformedItem
}
}
}
其他概念
Hadoop MapReduce中还有一个隐含的概念Combiner,并且还可以一次触发多个输出。
另外一点是MapReduce的Writeable序列化是独有的。
Spark中有一些替代品,比如accumulators
,groupBy
,而序列化可以考虑Java自身的或者使用Kryo等等。
MapReduce抽象本身是强力的,只不过在Hadoop的实现中有了一些特殊化,同时函数式语言更加胜任这种工作。
参考
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
转载自夜明的孤行灯
本文链接地址: https://www.huangyunkun.com/2014/12/31/translate-from-mapreduce-to-apache-spark/