Hadoop序列化怎么实现

50次阅读
没有评论

这篇文章主要讲解了“Hadoop 序列化怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“Hadoop 序列化怎么实现”吧!

Hadoop I/OData Integrity

Hdfs: % hadoop fs -cat hdfs://namenode/data/a.txt

LocalFS: % hadoop fs -cat file:///tmp/a.txt

generate crc check sum file

%hadoop fs -copyToLocal -crc /data/a.txt file:///data/a.txt

check sum file: .a.txt.crc is a hidden file.

Ref: CRC-32, 循环冗余校验算法,error-detecting.

io.bytes.per.checksum is deprecated, it s dfs.bytes-per-checksum, default is 512, Must not be larger than dfs.stream-buffer-size,which is the size of buffer to stream files. The size of this buffer should probably be a multiple of hardware page size (4096 on Intel x86), and it determines how much data is buffered during read and write operations.

Data Compression

常用算法

读书时,hadoop 支持四种压缩算法, 如果调解空间和效率的话,-1 ~ -9, 代表从最优速度到最优空间. 压缩算法支持在 org.apache.hadoop.io.compress.*.

deflate (.deflate), 就是常用的 gzip, package ..DefaultCodec

Gzip (.gz), 在 deflate 格式加了文件头和尾. 压缩速度 (适中), 解压速度 (适中), 压缩效率 (适中),package ..GzipCodec, both of java and native

bzip2 (.bz2), 压缩速度 (最差), 解压速度 (最差), 压缩效率 (最好),特点是支持可切分 (splitable),对 map-red 非常友好。,package ..BZip2Codec,java only

LZO (.lzo), 压缩速度 (最快), 解压速度 (最快), 压缩效率 (最差),,package com.hadoop.compressiojn.lzo.lzopCodec, native only

如果禁用原生库, 使用 hadoop.native.lib.

如果使用原生库, 可能对象创建的成本较高, 所以可以使用 CodecPool,重复使用这些对象。

对于一个非常大的数据文件,存储如下方案:

使用支持切分的 bzip2

手动切分,并使压缩后的 part 接近于 block size.

使用 Sequence File, 它支持压缩和切分

使用 Avro 数据文件,它也支持压缩和切分,而且增加了很多编程语言的可读写性。

如果 Map-Red 的 output 自动压缩:

conf.setBoolean (mared.output.compress ,true);
conf.setClass(mapred.output.compression.codec ,GzipCodec.class,CompressionCodec.class);

如果 Map-Red 的中间结果的自动压缩:

//or conf.setCompressMapOutput(true);
conf.setBoolean (mared.compress.map.output ,true);
//or conf.setMapOutputComressorClass(GzipCodec.class)
conf.setClass(mapred.map.output.compression.codec ,GzipCodec.class,CompressionCodec.class);

序列化 (Serialization/Deserialization)Writable and WritableComparable

// core class for hadoop
public interface Writable{ void write(DataOutput out) throw IOException;
 void readFields(DataInput in) throw IOException;
public interface Comparable T { int compareTo(T o);
//core class for map-reduce shuffle
public interface WritableComparable T  extends Writable, Comparable T  {
// Sample
public class MyWritableComparable implements WritableComparable {
 // Some data
 private int counter;
 private long timestamp;
 
 public void write(DataOutput out) throws IOException { out.writeInt(counter);
 out.writeLong(timestamp);
 }
 
 public void readFields(DataInput in) throws IOException { counter = in.readInt();
 timestamp = in.readLong();
 }
 
 public int compareTo(MyWritableComparable o) {
 int thisValue = this.value;
 int thatValue = o.value;
 return (thisValue   thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
 }
 public int hashCode() {
 final int prime = 31;
 int result = 1;
 result = prime * result + counter;
 result = prime * result + (int) (timestamp ^ (timestamp   32));
 return result
 }
//optimize for stream comparasion
public interface RawComparator T  extends Comparator T {
 // s1 start position, l1, length of bytes
 public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2);
public class WritableComparator implements RawComparator{}

Comparator RawComparator WritableComparator

WritableComparator 提供了原始 compator 的 compare 反序列化对象的实现,性能较差。不过它作为 RawComparator 实例的工厂:

RawComparator IntWritable comparator = WritableComparator.get(IntWritable.class);

// 注册一个经过优化的比较算子。Register an optimized comparator for a WritableComparable implementation.

static void define(Class c, WritableComparator comparator);

// 获得一个 WritableComparable 的比较算子. Get a comparator for a WritableComparable implementation.

static WritableComparator get(Class ? extends WritableComparable

public MyWritableComparator extends WritableComparator{
 static{ define(MyWritableComparable.class, new MyWritableComparator());
 }
 public MyWritableComparator { super(MyWritableComparable.class);
 }
 @Override
 public int compare(byte[] b1, int s1,int l1,byte[] b2,int s2,int l2){ }
}

 

注: 要使 static initializer 被调用,除非有该类的实例被创建,或某静态方法或成员被访问。或者直接强制,代码如:

Class.forName(package.yourclass 它会强制初始化静态 initializer.

Java Primitive Data Type wrapped by WritableExtends from WritableComparable

BooleanWritable, 1

ByteWritable, 1,

BytesWritable,

IntWritable,4

VIntWritable,1~5

FloatWritable,4,

LongWritable,8,

VLongWritable,1~9

DoubleWritable,8

NullWritable,Immutable singletone.

Text,4~

MD5Hash,

ObjectWritable,

GenericWritable

Extends from Writable only

ArrayWritable

TwoDArrayWritable

AbstractMapWritable

     MapWritable

     SortedMapWritable

[Text]

值得一提的是 Text 的序列化方式是 Zero-compressed encoding, 这个看过一些资料,其实是一种编码方式,意图是省略掉高位 0 所占用的空间,对于小数,它能节省空间,对于大数会额外占用空间。相比压缩,它能比较快速。其实类似于 VIntWritable, VLongWritable 的编码方式。

– 如何选择变长和定长数值呢?

1. 定长适合分布非常均匀的数值(如 hash),变长适合分布非常不均匀的数值。

2. 变长可以节省空间,而且可以在 VIntWritable 和 VLongWritable 之间转换。

– Text 和 String 的区别

1。String 是 char 序列,Text 是 UTF- 8 的 byte 序列.

UTF- 8 类不能对字符串大于 32767 的进行 utf- 8 编码。

(Indexing) 索引:对于 ASCII 来说,Text 和 String 是一样的,对于 Unicode 就不同了。String 类的长度是其所含 char 编码单元的长度,然而 Text 是 UTF- 8 的字节码的长度。CodePointAt 表示一个真正的 Unicode 字符,它可以是 2char,4bytes 的 unicode。

Iteration(迭代): 将 Text 转换 ByteBuffer, 然后反复调用 bytesToCodePoint() 静态方法,可以取到整型的 Unicode.

Mutable(易变性):  可以 set,类似 writable 和 StringBuffer,getLength() 返回有效字串长度,getbytes().length, 返回空间大小。

[BytesWritable]

这是二进制数组的封装,类似于 windows 下的 BSTR,都是前面一个整型表示字节长度,后面是字节的二进制流。

它也是 mutable,getLength() != getBytes().length

[NullWritable]

NullWritable 是 Writable 的一个特殊类型。它的序列化长度为 0,其实只是一个占位符,既不读入,也不写出。只是存在于程序体中。

Immutable, 是一个 singleton。

[ObjectWritable] 

ObjectWritable 是 Java 的 Array, String, 以及 Primitive 类型的通用封装 (注:不包含 Integer)。它的序列化则使用 java 的类型序列化,写入类型信息等,比较占用空间。

通过两个特殊的构造:

public ObjectWritable(Object instance);

public ObjectWritable(Class declaredClass,Object instance);

举例子:

ObjectWritable objectw = new ObjectWritable(int.class,5);

[GenericWritable]

首先这是一个抽象类,需要被具象化才能使用。

观察下面这个实列,它以一种 Union 方式,显示的代理一个 Writable 实例,解决了 Reduce 函数的参数声明问题。

public class MyGenericWritable extends GenericWritable { private static Class ? extends Writable [] CLASSES = null;
 static { CLASSES = (Class ? extends Writable []) new Class[] {
 IntWritable.class,
 Text.class
 //add as many different Writable class as you want
 };
 }

 @Override  protected Class ? extends Writable [] getTypes() {  return CLASSES;  }  @Override  public String toString() { return  MyGenericWritable [getTypes()=  + Arrays.toString(getTypes()) +  ]  }  // override hashcode(); public class Reduce extends Reducer Text, MyGenericWritable, Text, Text  { public void reduce(Text key, Iterable MyGenericWritable  values, Context context) throws IOException, InterruptedException {}

[ArrayWritable /TwoDArrayWritable]

ArrayWritable aw = new ArrayWriable(Text.class);

[MapWritable / SortedMapWritable]

实现了 java.util.Map Writable,Writable 和 SortedMap…

它的 serialize, 使用先写 map classname,id , 然后后边每个类的类型,以 id 来替代,节省空间。这些都在父类 AbstractMapWritable 中实现。

集合小结:

1. 如果是单类型的列表, 使用 ArrayWritable 就足够了

2。如果是把不同类型的 Writable 存储在一个列表中:

— 可以使用 GenerickWritable, 把元素封装在一个 ArrayWritable,这个貌似只能同一类型。

 public class MyGenericWritable extends GenericWritable { private static Class ? extends Writable [] CLASSES = null;
 static { CLASSES = (Class ? extends Writable []) new Class[] {
 ArrayWritable.class,
 //add as many different Writable class as you want
 };
 }

 @Override  protected Class ? extends Writable [] getTypes() {  return CLASSES;  }

— 可以使用写一个仿照 MapWritable 的 ListWritable

    // 注意实现 hashcode,equals,toString, comparTo (if possible)

    //hashcode 尤其重要,HashPartitioner 通常用 hashcode 来选择 reduce 分区,所以为你的类写一个比较好的 hashcode 非常必要。

    public class ListWritable extends ArrayList Writable implements Writable {

    }

/**
 * @author cloudera
 *
 */
public class ListWritable extends ArrayList Writable  implements Writable {
 private List Writable  list = new ArrayList Writable 
 public void set(Writable writable){list.add(writable);
 @Override
 public void readFields(DataInput in) throws IOException {int nsize = in.readInt();
 Configuration conf = new Configuration();
 Text className = new Text();
 while(nsize-- 0){
 Class theClass = null;
 try {className.readFields(in);
 theClass = Class.forName(className.toString());
 } catch (ClassNotFoundException e) {
 // TODO Auto-generated catch block
 e.printStackTrace();
 Writable w = (Writable)ReflectionUtils.newInstance(theClass,conf);
 w.readFields(in);
 add(w);
 @Override
 public void write(DataOutput out) throws IOException {
 Writable w = null;
 out.writeInt(size());
 for(int i = 0;i size();i++){w = get(i);
 new Text(w.getClass().getName()).write(out);
 w.write(out);
}

感谢各位的阅读,以上就是“Hadoop 序列化怎么实现”的内容了,经过本文的学习后,相信大家对 Hadoop 序列化怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!