手里有一张txt数据表,现在要入库(Phoenix + HBase),因为需要对数据做一定的处理后再写入数据库,加上数据量不小,所以就想着用 MapReduce 来加速一下入库过程。

拿到一条数据处理后就可以直接入库了,那就没有必要写 Reducer 了,直接全部在 Mapper 里边完成,所以很自然地定义 DBWritable 类,作为 Mapper 的 OutputKeyClass 就行,如下这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* Created by dq on 10/27/16.
*/
class TestDBWritable implements Writable, DBWritable {

int id;
String name;

//getter and setter ...

@Override
public void write(DataOutput dataOutput) throws IOException {

}

@Override
public void readFields(DataInput dataInput) throws IOException {

}

@Override
public void write(PreparedStatement preparedStatement) throws SQLException {

preparedStatement.setInt(1, id);
preparedStatement.setString(2, name);

}

@Override
public void readFields(ResultSet resultSet) throws SQLException {

}

}

坑在这里:其中,write(DataOutput dataOutput), readFields(DataInput dataInput) 是 Writable 接口里边的函数,由于我是写数据库的,就简单的想着没有必要实现了,留空不处理,我也不读数据库,readFields(Result resultSet)就也不写啦。

Mapper 类是这样写的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static class TestMapper extends Mapper<LongWritable, Text,NullWritable, TestDBWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

TestDBWritable t = new TestWritable();

//....

context.write(NullWritable.get(), t);

//....

}
}

然后是定义 Job

1
2
3
4
//....
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(TestDBWritable.class);
//....

满心欢喜地赶紧运行,还好没错,好有成就感地去查一下数据库,发现里边只是多了一个 id 为0,name 为 NULL 的记录罢了, 擦,发生了什么?还好之前搭建的是可以本地调试的环境,打个断点,看一下TestDBWritable 类的 write() 函数执行过程,发现 id 只是初始值,name 全部为NULL! 顺着context.write()一路查下去,也没有发现 map 输出的时候把值给丢了呀,但进去 write 就变空值,这是肿么回事?

后来想了一下是不是因为 Writable 的两个函数没有实现的原因,带着侥幸心理实现这两个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void write(DataOutput dataOutput) throws IOException {

dataOutput.writeInt(id);
dataOutput.writeUTF(name);

}

@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readInt();
this.name = dataInput.readUTF();

}

为了验证一下是不是执行了这两个函数,打上了断点调试一下,发现果然,在执行 context.write()后,执行的值 Writable 的 write 函数,然而并没有接着执行 DBWritable 的 write 函数,而是直到所有的输入数据都执行完毕后,首先执行 readFields 函数,然后又 write。单看此执行路径,虽然没有定义 reducer,但 Hadoop 还是给我们执行了一个默认的,所以在 map 输出的时候会执行写入缓冲区的 write()函数,在 reducer 里边会执行 read() 。所以如果不实现那两个函数,Reducer 取到的就是空值。

在网上搜索了一写其他资料,发现情况确实如此,可以参看[1]。

因为 Mapper 的 outkey 是 NullWritable,所有的数据都会发送到同一个节点上进行 Reduce[2],速度非常慢,这无疑是与初衷相背的,更好的解决办法是参照[1]中的第四条,将 reduce 的数量设置为0,这样 mapper 就不再执行 Writable 的两个函数,会直接写入数据库。

1
job.setNumReduceTasks(0);

总结:
1.如果不设置 Reducer,Hadoop 还是会默认执行一个的,所以最好明确设置 Reducer 的数量为0,这样可以在 Map 阶段就执行输出。
2.Map 的 OutputKeyClass 为 NullWritable 的时候,所有的 Reduce 都会发送给一个结点计算,所以不要用 NullWritable 作为Map 的OutputKeyClass.

参考:

[1]:Mapreduce不设置reduce,只执行map的输出结果
[2]:What’s the <key, value> output of map function if I use context.write (NullWritable.get(),new Text(1))?