Phoenix 提供了 MapReduce 工作任务的支持,并且官方给出了例子,一起来编码运行一下。

1新建 maven 工程

在 pom.xml 中添加以下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependencies>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.7.0-HBase-1.1</version>
</dependency>

</dependencies>

<repositories>
<repository>
<id>apache</id>
<url>http://maven.apache.org</url>
</repository>
</repositories>

2新建一个 StockWritable 的 Java 类

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

/**
* Created by dq on 10/7/16.
*/
public class StockWritable implements DBWritable, Writable {


private String stockName;

public String getStockName() {
return stockName;
}

public void setStockName(String stockName) {
this.stockName = stockName;
}

public int getYear() {
return year;
}

public void setYear(int year) {
this.year = year;
}

public double[] getRecordings() {
return recordings;
}

public void setRecordings(double[] recordings) {
this.recordings = recordings;
}

public double getMaxPrice() {
return maxPrice;
}

public void setMaxPrice(double maxPrice) {
this.maxPrice = maxPrice;
}

private int year;
private double[] recordings;
private double maxPrice;


@Override
public void write(DataOutput dataOutput) throws IOException {

}

@Override
public void readFields(DataInput dataInput) throws IOException {

}

@Override
public void write(PreparedStatement preparedStatement) throws SQLException {

preparedStatement.setString(1, this.stockName);
preparedStatement.setDouble(2, this.maxPrice);

}

@Override
public void readFields(ResultSet resultSet) throws SQLException {

this.stockName = resultSet.getString("STOCK_NAME");
this.year = resultSet.getInt("RECORDING_YEAR");
this.recordings = (double[]) resultSet.getArray("RECORDINGS_QUARTER").getArray();

}
}

3Mapper, Reducer

新建一个 PhoenixMapRed 类,实现 map(), reduce(), 并且添加主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;

import java.io.IOException;

/**
* Created by dq on 10/7/16.
*/
public class PhoenixMapRed {


public static class StockMapper extends Mapper<NullWritable, StockWritable, Text, DoubleWritable> {


Text stock = new Text();
DoubleWritable price = new DoubleWritable();


@Override
protected void map(NullWritable key, StockWritable value, Context context) throws IOException, InterruptedException {

String name = value.getStockName();
double[] recordings = value.getRecordings();

double maxRecording = Double.MIN_VALUE;

for (double d : recordings) {
maxRecording = Math.max(maxRecording, d);
}

stock.set(name);
price.set(maxRecording);

context.write(stock, price);

}

}

public static class StockReducer extends Reducer<Text, DoubleWritable, NullWritable, StockWritable> {


@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {

double maxPrice = Double.MIN_VALUE;

for (DoubleWritable val : values) {
maxPrice = Math.max(maxPrice, val.get());
}
StockWritable stockWritable = new StockWritable();
stockWritable.setStockName(key.toString());
stockWritable.setMaxPrice(maxPrice);

context.write(NullWritable.get(), stockWritable);


}
}


public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = HBaseConfiguration.create();

Job job = Job.getInstance(conf, "stock");

String selectQuery = "SELECT STOCK_NAME,RECORDING_YEAR,RECORDINGS_QUARTER FROM STOCK ";


PhoenixMapReduceUtil.setInput(job, StockWritable.class, "STOCK", selectQuery);
PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");


job.setMapperClass(StockMapper.class);
job.setReducerClass(StockReducer.class);

job.setOutputFormatClass(PhoenixOutputFormat.class);


job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);

job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StockWritable.class);


TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true);

}
}

注意,代码中的PhoenixMapReduceUtil.setOutput(job, "STOCK_STATS", "STOCK_NAME,MAX_RECORDING");一句与官方文档不一样,官方上的应该是老代码,运行时有错误。

4数据表建立,数据导入,略。

5编译

编译的过程是一个比较复杂的过程,官方的例子并没有给出过程,在使用javac编译前,需要将phoenix-4.8.0-HBase-1.1-client.jar添加到CLASSPATH中,编译后会生成4个文件,再使用 jar命令生成 .jar 文件。

1
2
3
4
5
6
export CLASSPATH=/usr/local/hbase/lib/phoenix-4.8.0-HBase-1.1-client.jar:$CLASSPATH

javac PhoenixMapRed.java
#生成4个文件 PhoenixMapRed.class PhoenixMapRed$StockMapper.class PhoenixMapRed$StockReducer.class StockWritable.class

jar -cvf PhoenixMapRed.jar PhoenixMapRed.class PhoenixMapRed$StockMapper.class PhoenixMapRed$StockReducer.class StockWritable.class

6运行

如果现在直接执行hadoop jar PhoenixMapRed.jar PhoenixMapRed的话,会抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/HBaseConfiguration
at PhoenixMapRed.main(PhoenixMapRed.java:73)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.HBaseConfiguration
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

这次需要将phoenix-4.8.0-HBase-1.1-client.jar添加到 HADOOP_CLASSPATH中,然后再执行任务。

1
2
3
export HADOOP_CLASSPATH=/usr/local/hbase/lib/phoenix-4.8.0-HBase-1.1-client.jar:$HADOOP_CLASSPATH
hadoop jar PhoenixMapRed.jar PhoenixMapRed
#...执行正常

7结果查看

在phoenix 的 sqlline 环境中执行查询,会看到结果

1
2
3
4
5
6
7
8
9
10
11
0: jdbc:phoenix:localhost> select * from stock_stats;
+-------------+----------------+
| STOCK_NAME | MAX_RECORDING |
+-------------+----------------+
| AAPL | 200.26 |
| CSCO | 27.98 |
| GOOG | 697.37 |
| MSFT | 35.96 |
| YHOO | 41.22 |
+-------------+----------------+
5 rows selected (0.298 seconds)

至此,该例子成功运行。

参考

1.Phoenix Map Reduce
2.MapReduce (Hadoop-2.6.0)+ HBase-1.0.1.1 class not found exception