hadoop之IO操作

前言

学习hadoop的IO

main

  • 数据完整性检查
  • 数据压缩
  • 数据序列化
  • 基于文件的数据结构

数据完整性

hadoop采用CRC-32校验和检查数据完整性

本地文件系统的数据完整性

在创建文件a的同时,会创建隐藏的文件a.crc记录了文件的校验和,默认每512个字节就产生32位的校验和

可以自定义配置core-site.xml

1
2
3
4
<property>   
<name>io.bytes.per.checksum</name>  
<value>512</value>  
</property>

也可以自定义是否启用校验和

1
2
3
4
5
6
7
8
9
<property>   
<name>fs.file.impl</name>  
<value>org.apache.hadoop.fs.LocalFileSystem</value>  
</property>
<!--上面启用校验和,下面禁用-->
<property>   
<name>fs.file.impl</name>  
<value>org.apache.hadoop.fs.RawLocalFileSystem</value>  
</property>

任何在配置文件中修改的配置都可以通过Java代码在Configuration类中局部的配置

HDFS的数据完整性

HDFS使用ChecksumFileSystem类来校验

RawLocalFileSystem和ChecksumFileSystem一起使用,可以达到LocalFileSystem的效果

Java中可以通过函数setVerifyChecksum(false)来局部的禁止校验

数据压缩

hadoop支持的压缩算法

hadoop_007

其中LZO不在hadoop的发布版本中,需要单独下载

  • 利用CompressionCodec压缩和解压流
  • 利用CompressionCodecFactory推断压缩算法

在MapReduce中使用压缩

  • 如果输入数据被压缩,通过使用CompressionCodecFactory自动推断Codec对象,MapReduce程序会自动将输入数据解压
  • 如果希望MapReduce输出压缩文件,设置属性mapred.output.compresstrue,将mapred.output.compression.codec属性设置为要使用的Codec类

IO序列化(重要)

  • 序列化是将内存中的对象转化为字节流,反序列化则是将字节流恢复到内存中
  • 系列化的目的,1是进程间的通信,2是数据的持久化
  • hadoop利用RPC实现进程间的通信
  • 由于Java本身序列化的弊端,hadoop实现了一套自己的序列化机制

序列化的核心,Writable接口

1
2
3
4
5
6
public interface Writable {
//将对象序列化到输出流,DataOutput、DataInput均为二进制流
void write(DataOutput out) throws IOException;
//输入流反序列化
void readFields(DataInput in) throws IOException;
}

接口的继承关系图

hadoop_008

由图可以看出,hadoop的基本数据类型都是继承于WritableComparable接口,即这种类型既是可序列化的,又是可比较大小的。而其它的存储数据结构ArrayWritable和MapWritable只继承于Writable接口

Comparable和Comparator

1
2
3
4
public interface Comparable<T>{
//隐含了一个this指针,this和o对象比较
public int compareTo(T o);
}

1
2
3
4
public interface Comparator<T>{
//是一个工具类,实现两个对象的比较
int compare(T o1,T o2);
}

直接在字节流层面上进行比较

1
2
3
public interface RawComparator<T> extends Comparator<T> {
int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

hadoop数据类型

hadoop数据类型就是对Java数据类型的封装,如下图所示:

hadoop_009

自定义数据类型,实现WritableComparable接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class TextPair implements WritableComparable<TextPair> {
private Text first;
private Text second;
public TextPair(Text f, Text s) {
first = f;
second = s;
}
public int compareTo(TextPair o) {
int cmp = first.compareTo(o.first);
if (cmp != 0) {
return cmp;
}
return second.compareTo(o.second);
}
public void write(DataOutput dataOutput) throws IOException {
first.write(dataOutput);
second.write(dataOutput);
}
public void readFields(DataInput dataInput) throws IOException {
first.readFields(dataInput);
first.readFields(dataInput);
}
}

序列化框架

  • hadoop支持不同的序列化框架,可以在配置文件中配置
  • 实现序列化框架需要实现Serialization接口
  • 默认的序列框架是org.apache.hadoop.io.serializer.WritableSerialization

基于文件的数据接口

  • 解决hadoop中小文件太多浪费资源的问题,SequenceFile
  • 小文件的文件名为key,文件内容为value写入SequenceFile文件