我们搭建好集群后,也运行了hadoop本身自带提供的单词测试程序,现在我们用Eclipse和mavenlai8手动编写一下单词计数程序并提交到hadoop上运行。
一、环境准备
参考我之前的博文搭建好hadoop完全分布式环境并且启动。主备eclipse和maven.
二、新建一个maven项目
用eclipse新建一个maven羡慕,在pom.xml中添加如下依赖:
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.8.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.8.5</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.8.5</version></dependency>
因为要打包成可执行jar并且有第三方依赖,需要添加如下build
<build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>1.4</version><configuration><createDependencyReducedPom>false</createDependencyReducedPom></configuration><executions><execution><!-- 执行package的phase --><phase>package</phase><!-- 为这个phase绑定goal --><goals><goal>shade</goal></goals><configuration><!-- 过滤掉以下文件,不打包 :解决包重复引用导致的打包错误--><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><!-- 打成可执行的jar包 的主方法入口--><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.suibibk.App</mainClass></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
注意修改主方法入口,也就是main方法所在类,这样子程序就可以直接maven install打包了。
三、编写Mapper、Reducer和启动类
Mapreduce程序围绕着分而治之的思想来的,分就是Mapper程序,治就是Reducer程序,然后用一个启动类将job提交给集群运行即可。
1、项目结构

2、启动类
package com.suibibk;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class App {/*** 1. 业务逻辑相关信息通过job对象定义与实现 2. 将绑定好的job提交给集群去运行*/public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(App.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);// 设置业务逻辑Mapper类的输出key和value的数据类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 指定要处理的数据所在的位置FileSystem fs = FileSystem.get(conf);String inputPath = args[0];Path input = new Path(inputPath);if(fs.exists(input)) {FileInputFormat.addInputPath(job, input);}// 指定处理完成之后的结果所保存的位置String outputPath = args[1];Path output = new Path(outputPath);//需要先删除,不然第二次执行会报错fs.delete(output, true);FileOutputFormat.setOutputPath(job, output);// 向yarn集群提交这个jobboolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);}}
注意在hadoop2中FileInputFormat所属的包为: org.apache.hadoop.mapreduce.lib.input.FileInputFormat。out也一样,不要搞错了,我这里直接把导入的包也黏贴上来。
3、Mapper
package com.suibibk;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{// map方法的生命周期: 框架每传一行数据就被调用一次protected void map(LongWritable key, Text value,Context context) throws IOException ,InterruptedException {String line = value.toString(); // 行数据转换为stringString[] words = line.split(" "); // 行数据分隔单词for (String word : words) { // 遍历数组,输出<单词,1>context.write(new Text(word), new IntWritable(1));}}}
4、Reducer
package com.suibibk;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{// 生命周期:框架每传递进来一个kv 组,reduce方法被调用一次@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {int count = 0; // 定义一个计数器for (IntWritable value : values) { // 遍历所有v,并累加到count中count += value.get();}context.write(key, new IntWritable(count));}}
四、提交测试
1、项目右键执行maven install(package也可以)
然后再target中获得jar包。
2、上传到hadoop集群的一台机中
我这里是上传到worker1中。
4、执行测试
测试之前得先准备一下输入文件,这里用file2.txt来,然后执行如下命令:
hadoop jar wordcount-0.0.1-SNAPSHOT.jar /input/file2.txt /output
执行成功后查看结果:
hadoop hdfs -cat /output/*
会发现跟hadoop提供的例子结果一样。
完成。

