--- layout: post title: 基于MapReduce新的API的编程Demo-wordCount category: 技术 tags: Hadoop keywords: description: 该MapReduce是基于MR的新API开发,基于hadoop2.7环境运行 --- {:toc} ## 概述 进行MapReduce开发暂且认为分为8步,依次为: 1. 获取输入输出路径参数 1. 删除已经存在输出目录 1. 根据系统类型进行configuration的配置 1. 获取Job实例 1. 为Job配置Jar,MapperClass,CombinerClas,ReducerClass 1. 为Job配置输出的key和value类型 1. 设置文件格式化 1. 提交Job并等待完成 ## 准备环境 - hadoop2.7.2 集群环境(三个节点,h2m1,h2s1,h2s2) - jdk 1.7.0_75版本 - centos6.5系统 **该MR代码支持输入源为多个文件或多个目录,不可以文件和目录混合作为输入源** ## 搭建程序 使用eclipse新建maven程序,开发在window环境,运行在linux环境 在maven的pom.xml文件中配置 ``` 4.0.0 brief-hadoop-demo 2.7.2 org.apache.hadoop hadoop-client ${hadoop.version} junit junit org.slf4j jcl-over-slf4j org.slf4j slf4j-api org.slf4j slf4j-log4j12 log4j log4j ``` 新建三个类:`WordCountV2.java`,`WordCountMapperV2.java`,`WordCountReduceV2.java`,`HDFSOper.java` 四个类的内容: `WordCountV2.java` ```java package cn.followtry.hadoop.demo.v2.mr; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import cn.followtry.hadoop.demo.hdfs.HDFSOper; /** * * brief-hadoop-demo/cn.followtry.hadoop.demo.v2.mr.WordCountV2 * @author * jingzz * @since * 2016年12月14日 上午10:03:48 */ public class WordCountV2 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs == null || otherArgs.length < 2) { System.out.println("用法:\n" + " 至少需要两个参数,最后一个为输出目录,其他为输入文件路径"); System.exit(-1); } StringBuilder inputPaths = new StringBuilder(); String outpathDir; int len = otherArgs.length - 1; for (int i = 0; i < len; i++) { inputPaths.append(otherArgs[i]); if (i < len - 1) { inputPaths.append(","); } } outpathDir = otherArgs[len]; //检查输出目录是否存在,存在则直接删除目录 HDFSOper.rmExistsOutputDir(outpathDir); Job job = Job.getInstance(conf, "wordCount v2 demo"); job.setJarByClass(WordCountV2.class); job.setMapperClass(WordCountMapV2.class); job.setCombinerClass(WordCountReduceV2.class); job.setReducerClass(WordCountReduceV2.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, inputPaths.toString()); FileOutputFormat.setOutputPath(job, new Path(outpathDir)); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ``` `WordCountMapperV2.java`文件 ```java package cn.followtry.hadoop.demo.v2.mr; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * brief-hadoop-demo/cn.followtry.hadoop.demo.v2.mr.WordCountMapV2 * @author * jingzz * @since * 2016年12月14日 上午10:25:07 */ public class WordCountMapV2 extends Mapper{ private static final int ONE = 1; @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException { String line = value.toString(); if (StringUtils.isNotEmpty(line)) { String[] words = line.split(" "); for (String word : words) { context.write(new Text(word), new IntWritable(ONE)); } } } } ``` `WordCountReduceV2.java`文件 ```java package cn.followtry.hadoop.demo.v2.mr; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * brief-hadoop-demo/cn.followtry.hadoop.demo.v2.mr.WordCountReduceV2 * @author * jingzz * @since * 2016年12月14日 上午10:25:28 */ public class WordCountReduceV2 extends Reducer { private static final Logger LOGGER = LoggerFactory.getLogger(WordCountReduceV2.class); @Override protected void reduce(Text key, Iterable values, Reducer.Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable intValue : values) { count += intValue.get(); } LOGGER.info("统计{}的次数为{}", key, count); context.write(key, new IntWritable(count)); } } ``` `HDFSOper.java`文件 ```java package cn.followtry.hadoop.demo.hdfs; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * brief-hadoop-demo/cn.followtry.hadoop.demo.hdfs.HDFSOper * @author * jingzz * @since * 2016年12月14日 上午10:26:23 */ public class HDFSOper { private static final Logger LOGGER = LoggerFactory.getLogger(HDFSOper.class); /** * 删除指定的目录 * @author jingzz * @param outpathDir 指定要删除的目录路径 * @return 删除返回true,不存在或没有删除返回false * @throws FileNotFoundException 文件未找到异常 * @throws IOException IO异常 */ public static boolean rmExistsOutputDir(String outpathDir) throws FileNotFoundException, IOException { boolean hasDel = false; // 将本地文件上传到hdfs。 Configuration config = new Configuration(); FileSystem fs = FileSystem.get(URI.create("webhdfs://h2m1:50070"), config); Path output = new Path(outpathDir); if (hasDel = fs.exists(output)) { LOGGER.info("目录{}已经存在,正在删除...", outpathDir); System.out.println("目录" + outpathDir + "已经存在,正在删除..."); if (hasDel = fs.delete(output, true)) { System.out.println("目录" + outpathDir + "已经删除"); LOGGER.info("目录{}已经删除", outpathDir); } else { System.out.println("目录" + outpathDir + "删除失败"); LOGGER.info("目录{}删除失败", outpathDir); } } else { System.out.println("目录" + outpathDir + "不存在"); LOGGER.info("目录{}不存在", outpathDir); } return hasDel; } } ``` ## 打包发布 ### 打包 项目(右键) --> Export --> java(jar file) --> next --> jar file(browse,指定输出位置) --> finish。 ### 上传到hadoop linux服务器 ## 创建并将输入文件上传到hdfs 比如: 输入文件`file1.txt`内容如下: ``` hello world hello world hello world2 hello world2 hello world3 hello world4 hello world5 hello world5 hello world5 hello world6 hello world7 hello world8 hello world8 ``` 执行`hdfs dfs -put -f file1.txt /user/root/input/file1.txt`命令,上传输入文件 ## 执行 `hadoop jar wordcount.jar cn.followtry.hadoop.demo.v2.mr.WordCountV2 /user/root/input/file1.txt /user/root/output/` 或者 `hadoop jar wordcount.jar cn.followtry.hadoop.demo.v2.mr.WordCountV2 viewfs://hadoop-cluster-jingzz/user/root/input/file1.txt /user/root/output/` 输入为全路径,`hadoop-cluster-jingzz`为RM的集群名称。 部分执行日志显示: ``` 16/12/13 18:10:07 INFO hdfs.HDFSOper: 目录/user/root/output已经存在,正在删除... 目录/user/root/output已经存在,正在删除... 目录/user/root/output已经删除 16/12/13 18:10:07 INFO hdfs.HDFSOper: 目录/user/root/output已经删除 16/12/13 18:10:08 INFO input.FileInputFormat: Total input paths to process : 2 16/12/13 18:10:08 INFO mapreduce.JobSubmitter: number of splits:2 16/12/13 18:10:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1481615539888_0016 16/12/13 18:10:09 INFO impl.YarnClientImpl: Submitted application application_1481615539888_0016 16/12/13 18:10:09 INFO mapreduce.Job: The url to track the job: http://h2m1:8088/proxy/application_1481615539888_0016/ 16/12/13 18:10:09 INFO mapreduce.Job: Running job: job_1481615539888_0016 16/12/13 18:10:18 INFO mapreduce.Job: Job job_1481615539888_0016 running in uber mode : false 16/12/13 18:10:18 INFO mapreduce.Job: map 0% reduce 0% 16/12/13 18:10:28 INFO mapreduce.Job: map 100% reduce 0% 16/12/13 18:10:36 INFO mapreduce.Job: map 100% reduce 100% 16/12/13 18:10:36 INFO mapreduce.Job: Job job_1481615539888_0016 completed successfully 16/12/13 18:10:36 INFO mapreduce.Job: Counters: 54 File System Counters FILE: Number of bytes read=160 FILE: Number of bytes written=356056 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=436 HDFS: Number of bytes written=99 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 VIEWFS: Number of bytes read=0 VIEWFS: Number of bytes written=0 VIEWFS: Number of read operations=0 VIEWFS: Number of large read operations=0 VIEWFS: Number of write operations=0 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=13091 Total time spent by all reduces in occupied slots (ms)=5876 Total time spent by all map tasks (ms)=13091 Total time spent by all reduce tasks (ms)=5876 Total vcore-seconds taken by all map tasks=13091 Total vcore-seconds taken by all reduce tasks=5876 Total megabyte-seconds taken by all map tasks=13405184 Total megabyte-seconds taken by all reduce tasks=6017024 Map-Reduce Framework Map input records=15 Map output records=30 Map output bytes=314 Map output materialized bytes=166 Input split bytes=242 Combine input records=30 Combine output records=12 Reduce input groups=11 Reduce shuffle bytes=166 Reduce input records=12 Reduce output records=11 Spilled Records=24 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=268 CPU time spent (ms)=2090 Physical memory (bytes) snapshot=520069120 Virtual memory (bytes) snapshot=2547580928 Total committed heap usage (bytes)=281157632 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=0 ``` 执行命令:`hf -cat /user/root/output/part-r-00000` 显示执行结果: ``` hello 13 world 2 world2 2 world3 1 world4 1 world5 3 world6 1 world7 1 world8 2 ```