Hadoop中Map

53次阅读
没有评论

这篇文章将为大家详细讲解有关 Hadoop 中 Map-Reduce 如何配置、测试和调试,丸趣 TV 小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

Map-Reduce 之 配置, 测试, 调试

Evironvemnt:

cdh6.1

Configuration  配置文件位置

使用 cdh6.1,该文件位于

/etc/hadoop/conf,其实 /etc/hadoop 下面有几种目录,比如 conf.dist,conf.pseudo,conf.impala

文件列表

hadoop-env.sh,可以控制全局环境变量

core-site.xml, 最重要的是参数 fs.defaultFS

1.value = File:\\\home\, 这个是单机模式 (single-node standalone),hadoop daemon 运行在一个 jvm 进程。主要方便调试。

2.value = hdfs://localhost:8020, 这个是伪分布式 (Pseudo-distributes), 就是每个 daemon 运行在单独的 jvm 进程,但还是都在一台主机上。主要用于学习测试调试等。

3.value = hdfs://host:8020, 集群模式.

hdfs-site.xml, 最重要的是参数 dfs.replication

除了集群模式是 3, 一般都设置为 1.

dfs.namenode.replication.min = 1, 块复制的底线

mapred-site.xml, 最重要的是参数 mapred.job.tracker

也就是 jobtracker 运行在那一台机器上。

yarn-site.xml, 主要用来配置 resourcemanager。

hadoop-metrics.properties, 如果配置了 Ambari, 需要配置此文件,以便于发射监控指标给 Ambari 服务器。

log4j.properties

如果有多个配置文件加载,那么一般情况下,后加载的配置覆盖相同的早加载的配置文件。为了防止不期望的覆盖,配置文件中有 final 的关键字,它可以防止后面的覆盖。

conf 和 jvm 的配置,我们可以把某些配置写入 jvm properties,如果这样做,它是最高优先级的,比 conf 高。

hadoop jar -Ddfs.replication=1

Map-Reduce Sample

首先说主程序,MyWordCount 继承于 Tool 和 Configured, Configured 主要用来帮助 Tool 实现 Configurable.

interface Tool extends Configurable

Configured extends Configurable

一般都会调用 ToolRunner 来运行程序,ToolRunner 内部会调用 GenericOptionsParser,所以你的程序可以添加参数的能力。

这里和 hadoop1 的不同在于 org.apache.hadoop.mapreduce,我记得 1.0,好像是 mapred.

/**
 * write by jinbao
 */
package com.jinbao.hadoop.mapred.unittest;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

 */ public class MyWordCount extends Configured implements Tool {  * @param args  * @throws Exception   */ public static void main(String[] args) throws Exception { try {ToolRunner.run(new MyWordCount(), args); } catch (Exception e) {e.printStackTrace(); @Override public int run(String[] args) throws Exception {if(args.length != 2){System.err.printf( usage: %s, [generic options]  input   output  \n ,getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; Configuration conf = new Configuration(); Job job = Job.getInstance(conf, word counting job.setJarByClass(MyWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1]));
 */ public static class TokenizerMapper extends Mapper Object,Text,Text,IntWritable  {private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString()); while ( itr.hasMoreTokens()){word.set(itr.nextToken()); context.write(word, one); public static class SumReducer extends Reducer Text,IntWritable, Text, IntWritable {private static IntWritable result = new IntWritable(); public void reduce(Text key, Iterable IntWritable  values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val:values){sum += val.get(); result.set(sum); context.write(key, result); }

MapReduce Web UI
MRv1: http://jobtracker-host:50030
MRv2: http://resourcemgr-host:8088/cluster
  application 细节, 可以到 job history 里边去看。

单元测试 -MRUnit

这是一个专门针对 map-reduce 单元测试的工具包

需要下载依赖

1. junit, 这个 eclipse 已经自带了,hadoop 的 lib 下面也有。

2. mockito,这个下面的包里有。

3. powermock,下载连接 here

4. MRUnit, 去 apache 家找 here。

下面上我的程序:

package com.jinbao.hadoop.mapred.unittest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.io.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;

public class MyWordCountTest { private MapDriver Object, Text, Text, IntWritable  mapDriver; private ReduceDriver Text, IntWritable, Text, IntWritable  reduceDriver; private MapReduceDriver Object, Text, Text, IntWritable, Text, IntWritable  mapReduceDriver; @Before public void setUp() {MyWordCount.TokenizerMapper mapper = new MyWordCount.TokenizerMapper(); MyWordCount.SumReducer reducer = new MyWordCount.SumReducer(); mapDriver = MapDriver.newMapDriver(mapper); reduceDriver = ReduceDriver.newReduceDriver(reducer); mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer); @Test public void testMapper() throws IOException {mapDriver.withInput(new LongWritable(), new Text(test input from unit test)); ArrayList Pair Text,IntWritable  outputRecords = new ArrayList Pair Text,IntWritable (); outputRecords.add( new Pair Text,IntWritable (new Text( test),new IntWritable(1) ) ); outputRecords.add( new Pair Text,IntWritable (new Text( input),new IntWritable(1) ) ); outputRecords.add( new Pair Text,IntWritable (new Text( from),new IntWritable(1) ) ); outputRecords.add( new Pair Text,IntWritable (new Text( unit),new IntWritable(1) ) ); outputRecords.add( new Pair Text,IntWritable (new Text( test),new IntWritable(1) ) ); mapDriver.withAllOutput(outputRecords); mapDriver.runTest(); @Test public void testReducer() throws IOException {reduceDriver.withInput(new Text( input), new ArrayList IntWritable (Arrays.asList(new IntWritable(1), new IntWritable(3))) ); reduceDriver.withOutput(new Text( input), new IntWritable(4)); reduceDriver.runTest(); @Test public void testMapperReducer() throws IOException {mapReduceDriver.withInput(new LongWritable(), new Text(test input input input input input test) ); ArrayList Pair Text,IntWritable  outputRecords = new ArrayList Pair Text,IntWritable (); outputRecords.add( new Pair Text,IntWritable (new Text( input),new IntWritable(5) ) ); outputRecords.add( new Pair Text,IntWritable (new Text( test),new IntWritable(2) ) ); mapReduceDriver.withAllOutput(outputRecords); mapReduceDriver.runTest();}

Run MRUnit

上图直接运行 @Test 方法就可以解决 90% 以上的问题, 否则你的 UnitTest 覆盖率太低, 那么后期在 cluster 出问题, 就 debug 成本比较高了.

Run Locally

Eclipse 里边配置 Debug Configuration:
/home/cloudera/workspace/in /home/cloudera/workspace/out
注意:job runner 运行的都是本地目录, 使用 toolrunner 默认是启动一个 standalone 的 jvm 来运行 hadoop, 另外, 只能有 0 或 1 个 reduce. 这个不是问题, 只要非常方便的调试就可以了.

YARN 里边默认是 mapreduce.framework.name 必须设置为 local,不过这都是默认的,不需要管它。

Run in Cluster

导出 jar, 我都是用 eclipse 来干,用 ant,命令行等都可以,看喜好了。
如果你的 jar 包有依赖, 那么也要把依赖包到处在某个 lib 里边, 并且 minifest 里边配置 main class 是哪一个. 这个 package 和 war 打包没什么区别

%hadoop fs -copyFromLocal /home/cloudera/word.txt data/in
%hadoop jar wordcount.jar data/in data/out

IsolationRunner and Remote Debugger

 前提:keep.failed.task.files,该选项默认为  false,表示对于失败的 task,其运行的临时数据和目录是不会被保存的。这是一个 per job 的配置,运行 job 的时候加上这个选项。如何重跑: 
  当 fail 的 task 环境具备以后,就可以对单独的 task 进行重跑了。重跑的方式为:
1.  上到 task 出错的 tasktracker 机器   上
2.  在该 tasktracker 上找到 fail 的 task 运行时的目录环境  1.  在  tasktracker 中,对于每一个 task 都会有一个单独的执行环境,其中包括其 work 目录,其对应的中间文件,以及其运行时需要用到的配置文件等
2.  这些   目录是由 tasktracker 的配置决定,配置选项为: mapred.local.dir.  该选项可能是一个逗号分隔的路径 list,每个  list 都是 tasktracker 对在其上执行的 task 建立工作目录的根目录。比如如果 mapred.local.dir=/disk1 /mapred/local,/disk2/mapred/local,那么 task 的执行环境就是 mapred.local.dir /taskTracker/jobcache/job-ID/task-attempt-ID
3.  找到该 task 的执行工作目录后,就可以进入到   该目录下,然后其中就会有该 task 的运行环境,通常包括一个 work 目录,一个 job.xml 文件,以及一个 task 要进行操作的数据文件 (对 map 来   说是 split.dta,对 reduce 来说是 file.out)。4.  找到环境以后,就可以重跑 task 了。 1. cd work
2. hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
• ◦  这样,IsolationRunner 就会读取 job.xml 的配置(这里的 job.xml 相当   于提交客户端的 hadoop-site.xml 配置文件与命令行 - D 配置的接合),然后对该 map 或者 reduce 进行重新运行。1.  到这里为止,已经实现了 task 单独重跑,但是还是没有解决对其进行单步断点 debug。这里利用到的其实是 jvm 的远程  debug 的功能。方式如下: 1.  在重跑 task 之前,export 一个环境变   量:export HADOOP_OPTS= -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888 
2.  这   样,hadoop 的指令就会通过 8888 端口将 debug 信息发送出去
3.  然后在自己本地的开发环境 IDE 中(比如  eclipse),launch 一个远程调试,并在代码中打一个断点,就可以对在 tasktracker 上运行的独立 map 或者 reduce task 进行远程单步调试了。详细可以去到这个 blog 看看。http://blog.csdn.net/cwyspy/article/details/10004995

Note: 非常不幸,在最近的版本里面,IsolationRunner 已经不能使用,所以在 hadoop2 里边,需要找到失败节点后,把问题文件拷贝出来,进行单机调试。

合并结果集

根据 Reduce 个数, 可以会有多个 part 的结果集, 那么可以使用下面命令来合并

% hadoop fs -getmerge max-temp max-temp-local

% sort max-temp-local | tail

Tuning a Job

Number of mappers

Number of reducers

Combiners

Intermediate compression

Custom serialization

Shuffle tweaks

MapReduce Workflows

In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.

ChainMapper and ChainReducer

It s a Map*/Reduce model, which means multiple mappers work as a chain, and after last mapper, output will go to reducer. this sounds reduced network IO.

Though called ChainReducer , actually only a Reducer working for ChainMapper, so gets the name.

Mapper1- Mapper2- MapperN- Reducer

JobControl

MR has a class JobControl, but as I test it s really not maintained well. 

Simply to use:

if(Run(job1)

   Run(job2)

Apache Oozie

Oozie 是一种 Java Web 应用程序,它运行在 Java servlet 容器——即 Tomcat——中,并使用数据库来存储以下内容:

工作流定义

当前运行的工作流实例,包括实例的状态和变量

Oozie 工作流是放置在控制依赖 DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop 的 Map/Reduce 作业、Pig 作业等),其中指定了动作执行的顺序。我们会使用 hPDL(一种 XML 流程定义语言)来描述这个图。

hPDL 是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end 和 fail 节点)以及控制工作流执行路径的机制(decision、fork 和 join 节点)。动作节点是一些机制,通过它们工作流会触发执行计算或者处理任务。Oozie 为以下类型的动作提供支持:Hadoop map-reduce、Hadoop 文件系统、Pig、Java 和 Oozie 的子工作流(SSH 动作已经从 Oozie schema 0.2 之后的版本中移除了)

关于“Hadoop 中 Map-Reduce 如何配置、测试和调试”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。