从Hadoop MapReduce到Spark

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

转载自夜明的孤行灯

本文链接地址: https://www.huangyunkun.com/2014/12/31/translate-from-mapreduce-to-apache-spark/

Spark是一款通用集群计算框架,和Hadoop的MapReduce类似。由于其提供的抽象更简单,性能和功能上比Hadoop强不少。
它已经越来越流行了。

如果是新的项目,或者是为了学习,那么选择Spark完全没问题。

不过对于一些使用了MapReduce的项目来说,迁移就稍微复杂一些了。

Hadoop自身

Hadoop的使用在不断扩大,但同时越来越多的实践证实了MapReduce并不是通用计算范例。

Hadoop的架构本身为其他可能的替代方案提供了场所,比如Impala项目等等。

而对于Hadoop来说,有一部分Hadoop的实现本身和MapReduce本身的抽象并不一致。

  • Mapper和Reducer总是使用键值对作为输入输出
  • Reducer处理的级别是键
  • Mapper和Reducer的对象的生命周期跨越了多个map()和reduce(),同时还支持了setup()和cleanup()

Spark

Spark也是众多替代方案中的一种。

但是对于已经部署在生产环境的项目而言,一句替代方案是不够的,对于一些实时计算的系统更是这样。

好在利用Spark实现类似MapReduce的模型是完全可行的。同时实现本身还可以更简单,并在大部分情况下更快。

对于MapReduce模型本身,使用Spark来实现反而显得更亲近,毕竟Scala的编码风格和API对于本身源于LISP的MapReduce抽象更接近。

键值对和元组

从最基础的例子来看,如果需要计算一个大型文本文件每一行的长度,对于Hadoop MapReduce来说由于输出是键值对,那么就会使用长度作为键,以1为值。

publicclass LineLengthMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable> {
protectedvoidmap(LongWritable lineNumber, Text line, Context context)
throws IOException, InterruptedException {
context.write(new IntWritable(line.getLength()),new IntWritable(1));
}
}

LineLengthMapperTextInputFormat提供了输入,对于每一个键值对而言,一行为值,而位置为键。

当然,虽然我很少用到,但是我相信有些任务还是需要用到这个键的。

Spark中,对应的功能看起来就是这样的

lines.map(line => (line.length,1))

Spark的核心抽象是RDD,这里的输入仅仅是字符串,而不是键值对。而Scala中的元组(Tuple)和上面的键值对很类似。

map()的输出是一个(int,int)的元组。当RDD包含元组的时候,可以调用一些额外的方法,比如reduceByKey(),这也是实现MapReduce的一个重要部分。

Spark的reduceByKey

为了得到长度的输出,Hadoop MapReduce还需要一个Reducer来处理。

publicclass LineLengthReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> {
protectedvoidreduce(IntWritable length, Iterable<IntWritable> counts, Context context)
throws IOException, InterruptedException {
int sum =0;
for (IntWritable count : counts) {
sum += count.get();
}
context.write(length,new IntWritable(sum));
}
}

而在Spark中对应的是

val counts =lines.map(line => (line.length,1)).reduceByKey(_ + _)

Spark的API中包含了reduce()方法,不过这个方式是将键值对处理为一个单值,这不是Hadoop MapReduce的行为。相比之下
reduceByKey更接近。

setup() and cleanup()

在MapReduce中,mapper和reducer都可以包含一个setup和cleanup方法。

比如处理数据库的连接

publicclass SetupCleanupMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Connection dbConnection;

protectedvoidsetup(Context context) {
dbConnection = ...;
}

protectedvoidcleanup(Context context) {
dbConnection.close();
}
}

而Spark的map()和flatMap()并没有这种东西,我暂时采用的办法是在调用map()前后处理,比如

lines.mapPartitions { valueIterator =>
if (valueIterator.isEmpty) {
Iterator[...]()
}else {
val dbConnection = ...
valueIterator.map { item =>
val transformedItem = ...
if (!valueIterator.hasNext) {
dbConnection.close()
}
transformedItem
}
}
}

其他概念

Hadoop MapReduce中还有一个隐含的概念Combiner,并且还可以一次触发多个输出。

另外一点是MapReduce的Writeable序列化是独有的。

Spark中有一些替代品,比如accumulatorsgroupBy,而序列化可以考虑Java自身的或者使用Kryo等等。

MapReduce抽象本身是强力的,只不过在Hadoop的实现中有了一些特殊化,同时函数式语言更加胜任这种工作。

参考

RDD
accumulators

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

转载自夜明的孤行灯

本文链接地址: https://www.huangyunkun.com/2014/12/31/translate-from-mapreduce-to-apache-spark/

发表评论