使用MiniCluster快速配置Hadoop开发环境

2 4月

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

转载自夜明的孤行灯

本文链接地址: https://www.huangyunkun.com/2014/04/02/use_minicluster_quick_develop/

前年的时候用过Hadoop,那时候各种资料缺乏,各种摸索以后写出了能用的东西,然后打包仍服务器就再也没有管过。效率什么的谈不上,但是一直能用。

当时花费了很多时间搭建一个环境,各种xml配置,然后引入依赖包,写好以后打包然后命令行测试一下,想想就耽误事儿。

最近需要改动一下之前的程序,然后就纠结了。Hadoop已经2.3.0了,从MR换到了Yarn,虽然老版本兼容,不过还是随大流升级把。

版本问题还好说,不过实在不想在本机装Hadoop了。

在源码里面翻了翻,找到了MiniCluster,问题就解决了。

Hadoop开发环境

网上有很多安装Hadoop的文章,多是本地单机或者伪分布式模式。如果只是想学习一下,或者集群由专人维护,实在没有必要折腾。

当然也可以下载做好的虚拟机文件,直接启动。

对大部分人而言,只要有一个测试少量数据的方法就行了。

而安装完成开发环境以后需要编写相关程序,然后打包运行。

网络上也能找到不少测试专用的工具,不过始终没有真正的环境保险。

从Hadoop的源码中能够找到一些测试,其中就有测试用的环境。然后在Jira找到了相关记录HADOOP-8009

这个Improvement是将使用Hadoop Client和建立最小集群的包包括在一起,然后发布到maven库,分别命名为org.apache.hadoop:hadoop-clientorg.apache.hadoop:hadoop-minicluster

主要的原因是因为Hadoop由几个块组成(common,hdfs,mapred),它们的版本可以有不同。其次是没有单独的客户端API。最主要的原因就是为了测试,特别是版本变化的时候。

这意味着我们可以使用hadoop-minicluster去启动一个最小集群,并且它可以从maven仓库获取。

从Gradle配置开始

我使用Gradle管理项目,初始化模板如下:

apply plugin:'java'
apply plugin:'idea'

repositories {
mavenLocal()
maven {
url'http://maven.oschina.net/content/groups/public/'
}
}

project.ext {
junitVersion ='4.+'
guavaVersion ='16.0.+'
hadoopVersion ='2.3.+'
guavaVersion ='16.0.+'
}

dependencies {
compile"com.google.guava:guava:$guavaVersion",
"org.apache.hadoop:hadoop-common:$hadoopVersion",
"org.apache.hadoop:hadoop-minicluster:$hadoopVersion"
testCompile"junit:junit:$junitVersion"
}

当然maven仓库可以使用中央仓库,我网速堪忧,所以用的oschina镜像。

运行gradle idea建立idea项目文件,依赖确实很吓人…

依赖

其他的暂时不管,先把项目导入IDE吧。

启动最小集群

Hadoop的两大核心是HDFS和MapReduce,这两部分可以单独启动,不过没有特殊需要还是都启动吧。

核心的类是MiniDFSClusterMiniYARNCluster

它们可以用在单元测试中,在@BeforeClass或者@Before中启动,不过我喜欢在main中启动,然后用单元测试提交测试任务。因为虽然是最小集群,启动毕竟还是要几秒钟的。

首先启动HDFS,它的路径就使用一个临时路径

Configuration configuration =new Configuration();
File tempDir = Files.createTempDir();
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
new MiniDFSCluster.Builder(configuration).build();

获取临时目录使用的是Google Guava,如果不放心也可以先cleanup一下。

启动效果如下:

Format是自动的,不用管。临时目录可以在最后删除掉。

如果想修改DataNode数量,或者NameNodePort可以直接在Builder后面加

new MiniDFSCluster.Builder(configuration)
.numDataNodes(3)
.nameNodePort(8888)
.build();

接着启动Yarn

MiniYARNCluster yrCluster =new MiniYARNCluster("test",1,1,1);
yrCluster.init(configuration);
yrCluster.start();

可以修改nodeManager的个数,参数分别是

public MiniYARNCluster(java.lang.String testName,
int numNodeManagers,
int numLocalDirs,
int numLogDirs)

因为默认的几个端口都是0,即一个未被占用的端口,如果不知道的话后面的任务提交就麻烦了。所以在MiniYarnCluster启动后把配置写到一个文件中,以便后期读取。

yrCluster.getConfig().writeXml(new FileOutputStream(new File("conf.xml")));

虽然是最小集群,但是所有东西都有,依然可以通过WEB UI查看这个细节

在代码最后加上System.in.read()来挂起它,最后可以通过回车结束它们,也可以手动关闭。

在最小集群运行WordCount

WordCount绝对算得上Hadoop的Hello World,还有一个更简单的Sleep,但是没有找到。

首先要干的事情是从刚才的conf.xml读取配置。

Configuration conf =new Configuration();
conf.addResource(new Path(new File("conf.xml").getAbsolutePath().toString()));

然后创建一个Job并配置相关的信息

Job job = Job.getInstance(conf);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);

忘记了,还没有复制测试用的数据,直接用FileSystem

FileSystem fs=FileSystem.get(conf);
fs.copyFromLocalFile(false,true,new Path(name), inPath);

路径这些都是随意的,只要前后一致就行了。

运行效果:

运行效果

改进

首先是输出,因为Logger用的log4j,所以可以用配置关闭绝大部分日志输出。

# Root logger option
log4j.rootLogger=ERROR,stdout

# Level Settings
log4j.logger.org.apache.hadoop.mapreduce.Job =INFO
log4j.logger.com.github.htynkn = INFO

# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c:%L - %m%n

这样只有Job信息和其他错误信息会被输出。

对于很多任务的提交代码都是重复的,比如Conf的创建,文件的复制,甚至最后输出的显示,都是可以提取出来的。

建立一个BaseTest,所有的测试都继承自它。

package com.github.htynkn;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.IOException;

publicabstractclass BaseTest {

protectedstatic Configuration conf;
protected Path inPath, outPath;
protected FileSystem fs;
protected Job job;

@BeforeClass
publicstaticvoidsetUpBeforeClass()throws Exception {
conf =new Configuration();
conf.addResource(new Path(new File("conf.xml").getAbsolutePath().toString()));
}

@Before
publicvoidsetUp()throws Exception {
job = Job.getInstance(conf);

inPath =new Path("in" + System.currentTimeMillis());
outPath =new Path("out" + System.currentTimeMillis());
fs = FileSystem.get(conf);

fs.delete(inPath,true);
fs.delete(outPath,true);
}

@Test
publicabstractvoiddo_job()throws Exception;

publicvoidcopyFile(String name)throws IOException {
if (new File(name).isDirectory()) {
File[] files =new File(name).listFiles();
for (File file : files) {
fs.copyFromLocalFile(false,true,new Path(file.getAbsolutePath()), inPath);
}
}else {
fs.copyFromLocalFile(false,true,new Path(name), inPath);
}
}

publicvoidcopyFile(String[] filenames)throws IOException {
for (String filename : filenames) {
copyFile(filename);
}
}

public StringgetOutput()throws IOException {
FSDataInputStream inputStream = fs.open(new Path(outPath,"part-r-00000"));
return IOUtils.toString(inputStream);
}

publicvoidprintOutput(int length)throws IOException {
String line ="=================================";
System.out.println(line);
String output = getOutput();
System.out.println(output.length() > length ? output.substring(0, length) : output);
System.out.println(line);
}

publicvoidprintOutput()throws IOException {
printOutput(150);
}
}

复制文件的时候就只需要调用copyFile("data/"),输出最后结果调用printOutput()就可以了。

最后附上WordCount和WordCountTest源码

package com.github.htynkn;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.StringTokenizer;

publicclass WordCount {
publicstaticclass TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
privatefinalstatic IntWritable one =new IntWritable(1);
private Text word =new Text();

@Override
protectedvoidmap(Object key, Text value, Context context)throws IOException, InterruptedException {
StringTokenizer itr =new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

publicstaticclass IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result =new IntWritable();

@Override
publicvoidreduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
int sum =0;

for (IntWritable val : values) {
sum += val.get();
}

result.set(sum);
context.write(key, result);
}
}
}
package com.github.htynkn;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

publicclass WordCountTest extends BaseTest {
publicvoiddo_job()throws Exception {
copyFile("data/");

job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);

printOutput();
}
}

写在最后

其实还有可以直接在命令行运行hadoop-minicluster,不过这样需要开个一个命令行。

如果在main中运行最小集群,在单元测试中提交任务,就只需要开一个IDE就行了。

参考资料

How to Build a Hadoop Data Application
Create hadoop-client and hadoop-minicluster artifacts for downstream projects

本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

转载自夜明的孤行灯

本文链接地址: https://www.huangyunkun.com/2014/04/02/use_minicluster_quick_develop/

发表评论

电子邮件地址不会被公开。