关于 HBase 协处理器 Coprocessor 是什么,作用是什么,就不在多说,很多博、文档都有相关说明,Coprocessor 分为两类,一类是 Observer,另一类是 Endpoint,在看来很多博客后,发现都是比较老的一些内容,在学习过程中有不少疑惑,所以总结下来。此文只是针对 EndPoint 给出一个流程。

基本环境

Hbase: v1.1.2
JDK: v1.8
Maven: v3.3.9
protobuf: v2.6.1

EndPoint 的定义在 Hbase 0.96后采用的是 Google 的开源protobuf RPC, 因此本文的说明只适用于高于 0.96 的 HBase 版本,老的版本请看对应的教程。

Mac 下 Protobuf 安装

采用 HomeBrew 即可轻松安装,命令

1
home brew install protobuf

定义 RPC

新建一个 *.proto 文件,键入以下内容,本文的目的是实现一个建立关于 Mac 列的索引,简单起见,直接返回值 buildOk = 1 表示建立成功。关于 protobuf 的更多定义可以参见官方文档https://developers.google.com/protocol-buffers/.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
option java_package = "lreis.bigdata.indoor.index.coprocessor";

option java_outer_classname = "BuildMacIndex";
option java_generic_services = true;
option optimize_for = SPEED;

message buildMacIndexRequest{
int64 onlyForPresent ;
}

message buildMacIndexResponse {
optional int64 buildOk = 1;
}


service BuildMacIndexService {
rpc buildMacIndex(buildMacIndexRequest)
returns(buildMacIndexResponse);
}

定以好之后就可以编译该文件了, 运行此命令会生成一个 BuildMacIndex 的类。

1
protoc --java_out = your_project_dest your.proto

pom.xml

由于本人使用 Maven 建立的工程,所以要完成EndPoint 的开发,需要加入以下一些依赖才可以。** 需要注意的是编译目标的版本是1.6,否则Hbase 不能正确加载生成的 Endpoint. **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hbase.version>1.1.2</hbase.version>
</properties>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>

EndPoint 实现

** EndPoint 必须要继承自 protobuf 中定义的 BuildMacIndexService 类,并且实现 CoprocessorService 和 Coprocessor 接口,实现其中的四个方法即可。**

  • start() 方法
    协处理器在 Region 打开的时候被 RegionServer 自动加载,并会调用器 start 接口,完成初始化工作。一般的该接口函数中仅仅需要将协处理器的运行上下文环境变量 CoprocessorEnviorment保存到本地即可。

  • stop()方法
    Coprocessor 接口还定义了 stop() 接口函数。该函数在 Region 被关闭时调用,用来进行协处理器的清理工作。在本文中,我们没有任何清理工作,因此该方法一般不需要做什么。

  • getService()方法
    oprocessorService 接口。该接口仅仅定义了一个接口函数 getService()。我们仅需要将本实例返回即可。HBase 的 RegionServer 在接受到客户端的调用请求时,将调用该接口获取实现了 RPC Service 的实例,因此本函数一般情况下就是返回自身实例即可。

  • 当 Endpoint 类继承自 RPC Service 类的时候,会需要实现 proto 文件中定义buildMacIndex方法,这也是该 EndPoint 作用代码实现的方法,并且最后完成的Response 必须满足 proto 中定义的类型. 简单起见,只简单返回一个值。

1
2
3
4
5
6
7
8
9
10
11
12
13
public void buildMacIndex(RpcController controller, BuildMacIndex.buildMacIndexRequest request, RpcCallback<BuildMacIndex.buildMacIndexResponse> done) {


Long l = request.getOnlyForPresent();// proto 中 request 里边定义xxx,就用 getXxx 得到调用时候传过来的参数

// .... 其他逻辑

BuildMacIndex.buildMacIndexResponse.Builder resBuilder = BuildMacIndex.buildMacIndexResponse.newBuilder();

resBuilder.setBuildOK(100L); //返回 xxx,就用 SetXxx 进行赋值

done.run(resBuilder.build());//完成
}

以上就完成了一个 EndPoint 的定义及实现,运行 mvn clean compile 就可以打包成 jar 文件了。

JAR文件上传及加载到 Table

将编译的 jar 文件上传到 $HBASE_HOME/lib 目录,如果是完全分布式,每个节点都需要有哦。

配置文件全局加载(所有 Table 都有效)

编辑 hbase-site.xml, 添加以下内容,如果加载多个,用分号隔开每个类就行了,最后需要重启 HBase 集群。

1
<property> <name>hbase.coprocessor.region.classes</name> <value>lreis.bigdata.indoor.index.coprocessor.server.BuildMacIndexEndpoint</value> </property>

命令行加载方式

1
import 'lreis.bigdata.indoor.index.coprocessor.server.BuildMacIndexEndpoint' #记住,有引号的

如果显示

1
=> Java::LreisBigdataIndoorIndexCoprocessorServer::BuildMacIndexEndpoint

则说明加载成功,接着运行下面的命令

1
alter 'idx_mac', 'coprocessor' => '|lreis.bigdata.indoor.index.coprocessor.server.BuildMacIndexEndpoint|1001|'

如果显示如下,则说明修改表节定义成功

1
2
3
4
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 2.3210 seconds
  • 最后就可以在客户端或者 shell 的方式调用 Coprocessor 了, 简单给一个 Java API 的调用方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
conn = DbcFactory.getConnection().getConnection();
Admin admin = conn.getAdmin();
Table table = conn.getTable(TableName.valueOf("idx_mac"));
final BuildMacIndex.buildMacIndexRequest req = BuildMacIndex.buildMacIndexRequest.newBuilder().build();

Map<byte[], Long> tmpRet = table.coprocessorService(BuildMacIndex.BuildMacIndexService.class, null, null, new Batch.Call<BuildMacIndex.BuildMacIndexService, Long>() {
@Override
public Long call(BuildMacIndex.BuildMacIndexService instance) throws IOException {

ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<BuildMacIndex.buildMacIndexResponse> rpc = new BlockingRpcCallback<BuildMacIndex.buildMacIndexResponse>();
instance.buildMacIndex(controller, req, rpc);

BuildMacIndex.buildMacIndexResponse resp = rpc.get();
return resp.getRowCount();
}
});
long ret = 0;
for (long l : tmpRet.values())
ret += l;
System.out.println("lines: " + ret);
} catch (IOException e) {
e.printStackTrace();
}

参考