hadoop中如何利用mapreduce实现wordcount和电影评分预测

12次阅读
没有评论

这篇文章将为大家详细讲解有关 hadoop 中如何利用 mapreduce 实现 wordcount 和电影评分预测,文章内容质量较高,因此丸趣 TV 小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

mapreduce 中 map 指映射,map 指的是归约。

mapreduce 是一个 key-value 来处理数据的编程模型,它使用 map 将一组 key-value 映射为另一组 key-value

通过底层传递给 reduce,在 reduce 中,它将所有 map 过程传递过来的 key-value 进行归约,相同的 key 值,value 值会放在一起。mapreduce 内部还会对 reduce 过程中的 key 值进行一次排序。

一.WordCount

public class WordCount
 //
 public static final String HDFS =  hdfs://localhost:8888 
 public static final Pattern DELIMITER = Pattern.compile(\\b([a-zA-Z]+)\\b 
 
 // 自定义 Map 类型执行   映射 这一部分
 public static class Map extends Mapper LongWritable, Text, Text, IntWritable 
 {
 //mapreduce 中,Text 相当于 String 类型,IntWritable 相当于 Int 类型
 //LongWritable 是实现了 WritableComparable 的一个数据类型。 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();
 
 @Override
 // 重写父类 map() 函数
 public void map(LongWritable key, Text value,
 Context context)
 throws IOException, InterruptedException
 {
 // 读取一行数据
 String line = value.toString();
 // 将该行字符全部变为小写
 line = line.toLowerCase();
 // 根据定义好的正则表达式拆分一行字符串。 Matcher matcher = DELIMITER.matcher(line);
 while(matcher.find()){
 // 将分解的一个个单词类型转化为 Text。 word.set(matcher.group());
 // 将相应的 key-value 值传入。key 值为单词,value 值为 1.
 context.write(word,one);
 }
 }
 }
 
 // 自定义 Combine 过程先对本地进行的 map 进行一次 reduce 过程,减少传递给主机的数据量.
 public static class Combine extends Reducer  Text, IntWritable, Text, IntWritable 
 {
 @Override
 public void reduce(Text key, Iterable IntWritable  values, Context context) throws IOException, InterruptedException {
 int sum = 0;
 // 遍历同一个 key 值的所有 value, 所有的 value 放在同一个 Iterable 中。 for (IntWritable line : values)
 { sum += line.get();
 }
 IntWritable value = new IntWritable(sum);
 // 将 key-value 按照指定的输出格式输出。 context.write(key, value);
 }
 }
 
 public static class Reduce extends Reducer  Text, IntWritable, Text, IntWritable 
 {
 @Override
 public void reduce(Text key, Iterable IntWritable  values, Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable line : values)
 { sum += line.get();
 }
 IntWritable value = new IntWritable(sum);
 context.write(key, value);
 
 
 }
 }
 public static void main(String[] args) throws Exception
 { JobConf conf = WordCount.config();
 String input =  data/1.txt 
 String output = HDFS +  /user/hdfs/wordcount 
 // 自定义 HDFS 文件操作工具类
 HdfsDAO hdfs = new HdfsDAO(WordCount.HDFS, conf);
 // 移除存在的文件否则会报文件生成文件已存在的错误
 hdfs.rmr(output);
 Job job = new Job(conf);
 job.setJarByClass(WordCount.class);
 
 // 设置输出的 key 值类型
 job.setOutputKeyClass(Text.class);
 // 设置输出的 value 值类型
 job.setOutputValueClass(IntWritable.class);
 
 job.setMapperClass(WordCount.Map.class);
 job.setCombinerClass(WordCount.Combine.class);
 job.setReducerClass(WordCount.Reduce.class);
 
 job.setInputFormatClass(TextInputFormat.class);
 // 设置输出的格式,这里使用的是自定义的 FileOutputFormat 类,见下文。 job.setOutputFormatClass(ParseTextOutputFormat.class);
 FileInputFormat.setInputPaths(job, new Path(input));
 FileOutputFormat.setOutputPath(job, new Path(output));
 
 
 
 
 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
 
 
 public static JobConf config() { JobConf conf = new JobConf(WordCount.class);
 conf.setJobName( WordCount 
 conf.addResource( classpath:/hadoop/core-site.xml 
 conf.addResource( classpath:/hadoop/hdfs-site.xml 
 conf.addResource( classpath:/hadoop/mapred-site.xml 
// conf.set( io.sort.mb ,  
 return conf;
 }
 

}

自定义文件输出格式

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;

public class ParseTextOutputFormat K, V  extends FileOutputFormat K, V {  protected static class LineRecordWriter K, V  extends RecordWriter K, V  {  private static final String utf8 =  UTF-8  private static final byte[] newline;  static {  try { newline =  \n .getBytes(utf8);  } catch (UnsupportedEncodingException uee) {  throw new IllegalArgumentException( can t find   + utf8 +   encoding  }  }  protected DataOutputStream out;  private final byte[] keyValueSeparator;  public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {  this.out = out;  try { this.keyValueSeparator = keyValueSeparator.getBytes(utf8);  } catch (UnsupportedEncodingException uee) {  throw new IllegalArgumentException( can t find   + utf8 +   encoding  }  }  public LineRecordWriter(DataOutputStream out) {  this(out,  \t  }  /**  * Write the object to the byte stream, handling Text as a special  * case.  * @param o the object to print  * @throws IOException if the write throws, we pass it on  */  private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text) o;  out.write(to.getBytes(), 0, to.getLength());  } else { out.write(o.toString().getBytes(utf8));  }  }  public synchronized void write(K key, V value)  throws IOException {  boolean nullKey = key == null || key instanceof NullWritable;  boolean nullValue = value == null || value instanceof NullWritable;  if (nullKey   nullValue) {  return;  }  if (!nullKey) { writeObject(key);  }  if (!(nullKey || nullValue)) { out.write(keyValueSeparator);  }  if (!nullValue) { writeObject(value);  }  out.write(newline);  }  public synchronized   void close(TaskAttemptContext context) throws IOException { out.close();  }  }  public RecordWriter K, V    getRecordWriter(TaskAttemptContext job  ) throws IOException, InterruptedException { Configuration conf = job.getConfiguration();  boolean isCompressed = getCompressOutput(job);  String keyValueSeparator= conf.get( mapred.textoutputformat.separator ,   :  CompressionCodec codec = null;  String extension =   if (isCompressed) {  Class ? extends CompressionCodec  codecClass =   getOutputCompressorClass(job, GzipCodec.class);  codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);  extension = codec.getDefaultExtension();  }  Path file = getDefaultWorkFile(job, extension);  FileSystem fs = file.getFileSystem(conf);  if (!isCompressed) { FSDataOutputStream fileOut = fs.create(file, false);  return new LineRecordWriter K, V (fileOut, keyValueSeparator);  } else { FSDataOutputStream fileOut = fs.create(file, false);  return new LineRecordWriter K, V (new DataOutputStream  (codec.createOutputStream(fileOut)),  keyValueSeparator);  }  }    }

二. 电影评分预测

整个算法的实现中使用了 slop one 算法来预测评分,此处自定义的输出类与上文一致。

输入文件格式为 userId::movieId::score

package main.java.org.conan.myhadoop.recommend;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
import main.java.org.conan.myhadoop.hdfs.HdfsDAO;
public class Recommend {
 public static final String HDFS =  hdfs://localhost:8888 
 public static final Pattern DELIMITER = Pattern.compile([\t,] 
 
 public static final Pattern STRING = Pattern.compile([\t,:] 
 
// public final static int movieListLength = 100000;
// public static int []movieList = new int[movieListLength];
 public static List movieList = new ArrayList();
 
 public static Map userScore = new HashMap();
 public static void main(String[] args) throws Exception {
 Map String, String  path = new HashMap String, String 
 String in =  logfile/4.txt 
 String out = HDFS +  /user/hdfs/recommend  +  /step5 
 
// path.put( data ,  logfile/small.csv 
 
// path.put( data ,  logfile/ratings.dat 
 if(args.length == 2){ in = args[0];
 out = HDFS + args[1];
 System.out.println(out);
 }
 // 设置数据输入路径
 path.put(data , in);
 
 // 设置第一步输入文件路径
 path.put( Step1Input , HDFS +  /user/hdfs/recommend 
 
 // 设置第一步结果输出路径
 path.put(Step1Output , path.get( Step1Input) +  /step1 
 
 // 设置第二步输入文件路径
 path.put(Step2Input , path.get( Step1Output));
 
 // 设置第二步结果输出路径
 path.put(Step2Output , path.get( Step1Input) +  /step2 
 
 // 设置第三步输入文件路径
 path.put(Step3Input1 , path.get( data));
// path.put( Step3Input2 ,  logfile/movie/movies.dat 
 // 设置第三步结果输出路径
 path.put(Step3Output , path.get( Step1Input) +  /step3 
// path.put(Step3Input2 , path.get( Step2Output));
// path.put(Step3Output2 , path.get( Step1Input) +  /step3_2 
// 
 // 设置第四步输入文件路径 1
 path.put(Step4Input1 , path.get( Step2Output));
 
 // 设置第四步输入文件路径 2
 path.put(Step4Input2 , path.get( Step3Output));
 // 设置第四步结果输出路径
 path.put(Step4Output , path.get( Step1Input) +  /step4 
// 
 // 设置第五步输入文件路径
 path.put(Step5Input , path.get( Step4Output));
// path.put(Step5Input2 , path.get( Step3Output2));
 // 设置第五步结果输出路径
 path.put(Step5Output , out);
 
 // 第一步,根据给出的用户评分文件,求出每个用户对物品的评分矩阵
 Step1.run(path);
 
 // 根据第一步的输出结果计算物品评分的同现矩阵
 Step2.run(path);
 
 // 获取所有用户评过分的电影,并输出每位用户对每部电影的评分,未评过则记为 0
 Step3.run(path);
 
 // 根据第二步与第三步的结果计算出每位用户对每部电影的评分
 Step4.run(path);
 
 // 整理输出格式。 Step5.run(path);
 
 System.exit(0);
 }
 public static JobConf config() { JobConf conf = new JobConf(Recommend.class);
 conf.setJobName( Recommand 
 conf.addResource( classpath:/hadoop/core-site.xml 
 conf.addResource( classpath:/hadoop/hdfs-site.xml 
 conf.addResource( classpath:/hadoop/mapred-site.xml 
// conf.set( io.sort.mb ,  
 return conf;
 }
}
// 求出用户对物品的评分矩阵,即得出用户对电影   的评分矩阵 
// 每一行数据代表一个用户对所有打分电影的结果
//key 值为 userId, value 值为 movieID:score,movieId:score