hadoop之MapReduce

前言

学习MapReduce

输入输出

  • map,输入(k1,v1>),输出(k2,v2)
  • reduce,输入就是将map的输出按key值group起来,(k2,list(v2)),输出需要得到的结果(k3,v3)

对于WordCound程序而言,例如文件

1
2
3
hello hadoop
hello map
hello reduce

默认按行读数据,即每读入一行数据,调用一次map方法,map输入的key值意义不大,可以是行号,可以是字符偏移,默认是字符偏移,即map的输入:

1
2
3
<0,"hello hadoop">
<13,"hello map">
<23,"hello reduce">

即要调用3次map函数,对于WC程序而言,就是统计每个单词出现的次数,那么map的输出就是

1
2
3
<"hello",1>,<"hadoop",1>
<"hello",1>,<"map",1>
<"hello",1>,<"reduce",1>

reduce的输入就是将map输出的值相同的key值group成list,输入

1
2
3
4
<"hello",list(1,1,1)>
<"hadoop",list(1)>
<"map",list(1)>
<"reduce",list(1)>

输出的结果就是WC的计算结果,将list的数据累加就行

由此可见map和reduce的接口很抽象,实现的key,value全靠程序员

代码

Mapper接口,要实现这个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//泛型,map的输入、输出键值对
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
//重写map方法
protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
//context为内部类Context
context.write(key, value);
}
//重要,定义了一个内部类Context对象继承于MapContext
public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context() {
}
}
......
}

Reducer接口,需要实现这个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//输入输出
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
//重写reduce方法
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
Iterator i$ = values.iterator();
while(i$.hasNext()) {
VALUEIN value = i$.next();
context.write(key, value);
}
}
//内部类Context继承于ReduceContext
public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Context() {
}
}
......
}

combine方法

减少reduce的IO开销,将map产生的结果在本地先进行一个合并处理

对于WC程序而言,CombinerClass和ReducerClass一样

1
2
3
4
job.setMapperClass(TokenizerMapper.class);
//CombinerClass和ReducerClass一样
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

partition方法

将reduce的输出结果分不同的文件存放,它的计算只依赖于key值

MapReduce的输入输出

  • 能处理很多格式的数据,text、数据库等
  • 输入被分成InputSplit,每个Mapper处理一个InputSplit
  • 每个InputSplit被分成多条记录record,每一个record产生一个key-value

默认的输入处理类是TextInputFormat

  • 处理固定数量的记录,NLineIputFormat
  • 处理二进制数据,BinaryInputFormat
  • 多类不同类型的文件,MultipleInputs
  • 数据库文件,DBInputFormat

输出同理,和输入一样分这么多类型