本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
转载自夜明的孤行灯
本文链接地址: https://www.huangyunkun.com/2014/04/02/use_minicluster_quick_develop/
前年的时候用过Hadoop,那时候各种资料缺乏,各种摸索以后写出了能用的东西,然后打包仍服务器就再也没有管过。效率什么的谈不上,但是一直能用。
当时花费了很多时间搭建一个环境,各种xml配置,然后引入依赖包,写好以后打包然后命令行测试一下,想想就耽误事儿。
最近需要改动一下之前的程序,然后就纠结了。Hadoop已经2.3.0了,从MR换到了Yarn,虽然老版本兼容,不过还是随大流升级把。
版本问题还好说,不过实在不想在本机装Hadoop了。
在源码里面翻了翻,找到了MiniCluster,问题就解决了。
Hadoop开发环境
网上有很多安装Hadoop的文章,多是本地单机或者伪分布式模式。如果只是想学习一下,或者集群由专人维护,实在没有必要折腾。
当然也可以下载做好的虚拟机文件,直接启动。
对大部分人而言,只要有一个测试少量数据的方法就行了。
而安装完成开发环境以后需要编写相关程序,然后打包运行。
网络上也能找到不少测试专用的工具,不过始终没有真正的环境保险。
从Hadoop的源码中能够找到一些测试,其中就有测试用的环境。然后在Jira找到了相关记录HADOOP-8009。
这个Improvement是将使用Hadoop Client和建立最小集群的包包括在一起,然后发布到maven库,分别命名为org.apache.hadoop:hadoop-client
和org.apache.hadoop:hadoop-minicluster
。
主要的原因是因为Hadoop由几个块组成(common,hdfs,mapred),它们的版本可以有不同。其次是没有单独的客户端API。最主要的原因就是为了测试,特别是版本变化的时候。
这意味着我们可以使用hadoop-minicluster去启动一个最小集群,并且它可以从maven仓库获取。
从Gradle配置开始
我使用Gradle管理项目,初始化模板如下:
apply plugin:'java'
apply plugin:'idea'
repositories {
mavenLocal()
maven {
url'http://maven.oschina.net/content/groups/public/'
}
}
project.ext {
junitVersion ='4.+'
guavaVersion ='16.0.+'
hadoopVersion ='2.3.+'
guavaVersion ='16.0.+'
}
dependencies {
compile"com.google.guava:guava:$guavaVersion",
"org.apache.hadoop:hadoop-common:$hadoopVersion",
"org.apache.hadoop:hadoop-minicluster:$hadoopVersion"
testCompile"junit:junit:$junitVersion"
}
当然maven仓库可以使用中央仓库,我网速堪忧,所以用的oschina镜像。
运行gradle idea
建立idea项目文件,依赖确实很吓人…
其他的暂时不管,先把项目导入IDE吧。
启动最小集群
Hadoop的两大核心是HDFS和MapReduce,这两部分可以单独启动,不过没有特殊需要还是都启动吧。
核心的类是MiniDFSCluster
和MiniYARNCluster
。
它们可以用在单元测试中,在@BeforeClass或者@Before中启动,不过我喜欢在main中启动,然后用单元测试提交测试任务。因为虽然是最小集群,启动毕竟还是要几秒钟的。
首先启动HDFS,它的路径就使用一个临时路径
Configuration configuration =new Configuration();
File tempDir = Files.createTempDir();
configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tempDir.getAbsolutePath());
new MiniDFSCluster.Builder(configuration).build();
获取临时目录使用的是Google Guava,如果不放心也可以先cleanup一下。
启动效果如下:
Format是自动的,不用管。临时目录可以在最后删除掉。
如果想修改DataNode数量,或者NameNodePort可以直接在Builder后面加
new MiniDFSCluster.Builder(configuration)
.numDataNodes(3)
.nameNodePort(8888)
.build();
接着启动Yarn
MiniYARNCluster yrCluster =new MiniYARNCluster("test",1,1,1);
yrCluster.init(configuration);
yrCluster.start();
可以修改nodeManager的个数,参数分别是
public MiniYARNCluster(java.lang.String testName,
int numNodeManagers,
int numLocalDirs,
int numLogDirs)
因为默认的几个端口都是0,即一个未被占用的端口,如果不知道的话后面的任务提交就麻烦了。所以在MiniYarnCluster启动后把配置写到一个文件中,以便后期读取。
yrCluster.getConfig().writeXml(new FileOutputStream(new File("conf.xml")));
虽然是最小集群,但是所有东西都有,依然可以通过WEB UI查看这个细节
在代码最后加上System.in.read()
来挂起它,最后可以通过回车结束它们,也可以手动关闭。
在最小集群运行WordCount
WordCount绝对算得上Hadoop的Hello World,还有一个更简单的Sleep,但是没有找到。
首先要干的事情是从刚才的conf.xml
读取配置。
Configuration conf =new Configuration();
conf.addResource(new Path(new File("conf.xml").getAbsolutePath().toString()));
然后创建一个Job并配置相关的信息
Job job = Job.getInstance(conf);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
忘记了,还没有复制测试用的数据,直接用FileSystem
FileSystem fs=FileSystem.get(conf);
fs.copyFromLocalFile(false,true,new Path(name), inPath);
路径这些都是随意的,只要前后一致就行了。
运行效果:
改进
首先是输出,因为Logger用的log4j,所以可以用配置关闭绝大部分日志输出。
# Root logger option
log4j.rootLogger=ERROR,stdout
# Level Settings
log4j.logger.org.apache.hadoop.mapreduce.Job =INFO
log4j.logger.com.github.htynkn = INFO
# Direct log messages to stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c:%L - %m%n
这样只有Job信息和其他错误信息会被输出。
对于很多任务的提交代码都是重复的,比如Conf的创建,文件的复制,甚至最后输出的显示,都是可以提取出来的。
建立一个BaseTest
,所有的测试都继承自它。
package com.github.htynkn;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
publicabstractclass BaseTest {
protectedstatic Configuration conf;
protected Path inPath, outPath;
protected FileSystem fs;
protected Job job;
@BeforeClass
publicstaticvoidsetUpBeforeClass()throws Exception {
conf =new Configuration();
conf.addResource(new Path(new File("conf.xml").getAbsolutePath().toString()));
}
@Before
publicvoidsetUp()throws Exception {
job = Job.getInstance(conf);
inPath =new Path("in" + System.currentTimeMillis());
outPath =new Path("out" + System.currentTimeMillis());
fs = FileSystem.get(conf);
fs.delete(inPath,true);
fs.delete(outPath,true);
}
@Test
publicabstractvoiddo_job()throws Exception;
publicvoidcopyFile(String name)throws IOException {
if (new File(name).isDirectory()) {
File[] files =new File(name).listFiles();
for (File file : files) {
fs.copyFromLocalFile(false,true,new Path(file.getAbsolutePath()), inPath);
}
}else {
fs.copyFromLocalFile(false,true,new Path(name), inPath);
}
}
publicvoidcopyFile(String[] filenames)throws IOException {
for (String filename : filenames) {
copyFile(filename);
}
}
public StringgetOutput()throws IOException {
FSDataInputStream inputStream = fs.open(new Path(outPath,"part-r-00000"));
return IOUtils.toString(inputStream);
}
publicvoidprintOutput(int length)throws IOException {
String line ="=================================";
System.out.println(line);
String output = getOutput();
System.out.println(output.length() > length ? output.substring(0, length) : output);
System.out.println(line);
}
publicvoidprintOutput()throws IOException {
printOutput(150);
}
}
复制文件的时候就只需要调用copyFile("data/")
,输出最后结果调用printOutput()
就可以了。
最后附上WordCount和WordCountTest源码
package com.github.htynkn;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.StringTokenizer;
publicclass WordCount {
publicstaticclass TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
privatefinalstatic IntWritable one =new IntWritable(1);
private Text word =new Text();
@Override
protectedvoidmap(Object key, Text value, Context context)throws IOException, InterruptedException {
StringTokenizer itr =new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
publicstaticclass IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result =new IntWritable();
@Override
publicvoidreduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {
int sum =0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
}
package com.github.htynkn;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
publicclass WordCountTest extends BaseTest {
publicvoiddo_job()throws Exception {
copyFile("data/");
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
job.waitForCompletion(true);
printOutput();
}
}
写在最后
其实还有可以直接在命令行运行hadoop-minicluster,不过这样需要开个一个命令行。
如果在main中运行最小集群,在单元测试中提交任务,就只需要开一个IDE就行了。
参考资料
How to Build a Hadoop Data Application
Create hadoop-client and hadoop-minicluster artifacts for downstream projects
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
转载自夜明的孤行灯
本文链接地址: https://www.huangyunkun.com/2014/04/02/use_minicluster_quick_develop/