hadoop

43次阅读
没有评论

丸趣 TV 小编给大家分享一下 hadoop-reduce 的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!

Map 的结果,会通过 partition 分发到 Reducer 上,Reducer 做完 Reduce 操作后,通过 OutputFormat,进行输出。

* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce;
import java.io.IOException;
 * Reduces a set of intermediate values which share a key to a smaller set of
public class Reducer KEYIN,VALUEIN,KEYOUT,VALUEOUT  {
 public class Context 
 extends ReduceContext KEYIN,VALUEIN,KEYOUT,VALUEOUT  {
 public Context(Configuration conf, TaskAttemptID taskid,
 RawKeyValueIterator input, 
 Counter inputKeyCounter,
 Counter inputValueCounter,
 RecordWriter KEYOUT,VALUEOUT  output,
 OutputCommitter committer,
 StatusReporter reporter,
 RawComparator KEYIN  comparator,
 Class KEYIN  keyClass,
 Class VALUEIN  valueClass
 ) throws IOException, InterruptedException {
 super(conf, taskid, input, inputKeyCounter, inputValueCounter,
 output, committer, reporter, 
 comparator, keyClass, valueClass);
 }
 }
 /**
 * Called once at the start of the task.
 */
 protected void setup(Context context
 ) throws IOException, InterruptedException {
 // NOTHING
 }
 /**
 * This method is called once for each key. Most applications will define
 * their reduce class by overriding this method. The default implementation
 * is an identity function.
 */
 @SuppressWarnings(unchecked)
 protected void reduce(KEYIN key, Iterable VALUEIN  values, Context context
 ) throws IOException, InterruptedException { for(VALUEIN value: values) { context.write((KEYOUT) key, (VALUEOUT) value);
 }
 }
 /**
 * Called once at the end of the task.
 */
 protected void cleanup(Context context
 ) throws IOException, InterruptedException {
 // NOTHING
 }
 /**
 * Advanced application writers can use the 
 * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
 * control how the reduce task works.
 */
 public void run(Context context) throws IOException, InterruptedException { setup(context);
 while (context.nextKey()) { reduce(context.getCurrentKey(), context.getValues(), context);
 }
 cleanup(context);
 }
}

Mapper 的结果,可能送到可能的 Combiner 做合并,Combiner 在系统中并没有自己的基类,而是用 Reducer 作为 Combiner 的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。

Mapper 最终处理的结果对 key, value,是需要送到 Reducer 去合并的,合并的时候,有相同 key 的键 / 值对会送到同一个 Reducer 那,哪个 key 到哪个 Reducer 的分配过程,是由 Partitioner 规定的,它只有一个方法,输入是 Map 的结果对 key, value 和 Reducer 的数目,输出则是分配的 Reducer(整数编号)。系统缺省的 Partitioner 是 HashPartitioner,它以 key 的 Hash 值对 Reducer 的数目取模,得到对应的 Reducer。

* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce;
 * Partitions the key space.
public abstract class Partitioner KEY, VALUE  {
 
 /** 
 * Get the partition number for a given key (hence record) given the total 
 * number of partitions i.e. number of reduce-tasks for the job.
 * 
 *  p Typically a hash function on a all or a subset of the key. /p 
 *
 * @param key the key to be partioned.
 * @param value the entry value.
 * @param numPartitions the total number of partitions.
 * @return the partition number for the  code key /code .
 */
 public abstract int getPartition(KEY key, VALUE value, int numPartitions);
 
 * Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce.lib.partition;
import org.apache.hadoop.mapreduce.Partitioner;
/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner K, V  extends Partitioner K, V  { /** Use {@link Object#hashCode()} to partition. */
 public int getPartition(K key, V value,
 int numReduceTasks) { return (key.hashCode()   Integer.MAX_VALUE) % numReduceTasks;
 }
}

Reducer 是所有用户定制 Reducer 类的基类,和 Mapper 类似,它也有 setup,reduce,cleanup 和 run 方法,其中 setup 和 cleanup 含义和 Mapper 相同,reduce 是真正合并 Mapper 结果的地方,它的输入是 key 和这个 key 对应的所有 value 的一个迭代器,同时还包括 Reducer 的上下文。系统中定义了两个非常简单的 Reducer,IntSumReducer 和 LongSumReducer,分别用于对整形 / 长整型的 value 求和。

* Licensed to the Apache Software Foundation (ASF) under one
package org.apache.hadoop.mapreduce.lib.reduce;
import java.io.IOException;
public class IntSumReducer Key  extends Reducer Key,IntWritable,
 Key,IntWritable  { private IntWritable result = new IntWritable();
 public void reduce(Key key, Iterable IntWritable  values, 
 Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) { sum += val.get();
 }
 result.set(sum);
 context.write(key, result);
 }
}

Reduce 的结果,通过 Reducer.Context 的方法 collect 输出到文件中,和输入类似,Hadoop 引入了 OutputFormat。OutputFormat 依赖两个辅助接口:RecordWriter 和 OutputCommitter,来处理输出。RecordWriter 提供了 write 方法,用于输出 key, value 和 close 方法,用于关闭对应的输出。OutputCommitter 提供了一系列方法,用户通过实现这些方法,可以定制 OutputFormat 生存期某些阶段需要的特殊操作。我们在 TaskInputOutputContext 中讨论过这些方法(明显,TaskInputOutputContext 是 OutputFormat 和 Reducer 间的桥梁)。

OutputFormat 和 RecordWriter 分别对应着 InputFormat 和 RecordReader,系统提供了空输出 NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter 只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件 FileOutputFormat 的 SequenceFileOutputFormat 和 TextOutputFormat 输出。

基于文件的输出 FileOutputFormat 利用了一些配置项配合工作,包括 mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat 还依赖于 FileOutputCommitter,通过 FileOutputCommitter 提供一些和 Job,Task 相关的临时文件管理功能。如 FileOutputCommitter 的 setupJob,会在输出路径下创建一个名为_temporary 的临时目录,cleanupJob 则会删除这个目录。

SequenceFileOutputFormat 输出和 TextOutputFormat 输出分别对应输入的 SequenceFileInputFormat 和 TextInputFormat

以上是“hadoop-reduce 的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注丸趣 TV 行业资讯频道!