HBase coprocessor development coding example

The Observer coprocessor usually happens before or after a specific event (such as Get or Put), which is equivalent to a trigger in the RDBMS. The Endpoint coprocessor is similar to the stored procedure in the RDBMS because it allows you to perform custom calculations on the RegionServer instead of performing calculations on the client.

1 Introduction to Coprocessor

If you want to count the data in HBase, such as the maximum value of a field, the number of records that meet certain conditions, the characteristics of various records, and the classification according to the characteristics of the record, the conventional approach is to put the entire table in HBase. The data is scanned out, or a Filter is added, some preliminary filtering is performed, and then statistical processing is performed on the client. But doing so will have a lot of side effects, such as taking up a lot of network bandwidth (the amount of big data is particularly obvious), and the pressure of RPC is not to be underestimated.

The most frequently criticized features of HBase as a columnar database include the inability to easily create "secondary indexes" and the difficulty of performing summation, counting, sorting, and so on. For example, in the old version ("0.92) HBase, the total number of rows in the statistics table, you need to use the Counter method, you can get a MapReduce Job. Although HBase integrates MapReduce in the data storage layer, it can effectively perform distributed calculation of data tables. However, in many cases, when doing some simple addition or aggregation calculation, if you directly put the calculation process on the server side, you can Reduce network overhead and achieve good performance gains. As a result, HBase introduced coprocessors after 0.92, enabling some exciting new features: easy to create secondary indexes, complex filters, and access control.

In simple understanding, the coprocessor is a mechanism for HBase to let the user's part of the logic calculate on the data storage side, that is, the HBase server. It allows the user to run his own code on the HBase server.

2 Classification of coprocessors

There are two types of coprocessors: the system coprocessor can globally import all data tables on the Region Server, and the table coprocessor is a coprocessor that the user can specify for a table. The coprocessor framework provides two different aspects of plugins to better support the flexibility of its behavior. One is the observer (Observer), similar to the trigger of the relational database. The other is the Endpoint, and the dynamic terminal is a bit like a stored procedure.

The design of Observer is to allow the user to reload the upcall method of the coprocessor framework by inserting code, and the specific event-triggered callback method is executed by the core code of HBase. The coprocessor framework handles all callback call details, and the coprocessor itself only needs to insert add or change functionality.

Endpoint is the interface to the dynamic RPC plugin, and its implementation code is installed on the server side, so that it can wake up via HBase RPC. The client class library provides a very convenient way to call these dynamic interfaces, they can call a terminal at any time, their implementation code will be executed remotely by the target Region, and the result will be returned to the terminal. Users can combine these powerful plug-in interfaces to add new features to HBase.

3 Use of Protocol Buffer

Since the following Endpoint encoding example uses Google's mixed-language data standard Protocol Buffer, let's first take a look at the tools commonly used in RPC systems.

3.1 Introduction to ProtocolBuffer

Protocol Buffer is a lightweight and efficient structured data storage format that can be used for structured data serialization, and is well suited for data storage or RPC data exchange formats. It can be used in language-independent, platform-independent, and scalable serialized structured data formats for communication protocols, data storage, and more. Currently available APIs in C++, Java, and Python.

Why use Protocol Buffer? Let's look at a system scenario that we often encounter in actual development: our client program is developed in Java and may run on different platforms, such as Linux, Windows or Android, and our server program is usually based on Linux. The platform was developed using C++. There are several ways to design a message format when communicating data between the two programs, such as:

1. Directly pass the byte-aligned structure data in the C/C++ language. As long as the declaration of the structure is a fixed-length format, this method is very convenient for the C/C++ program, and only needs to receive the received data. The structure type can be forcibly converted. In fact, it is not very troublesome for a variable length structure. When sending data, you only need to define a structure variable and set the value of each member variable, and then send the binary data to the remote end as char*. On the contrary, this method is very cumbersome for Java developers. First, you need to store the received data in the ByteBuffer, and then read each field one by one according to the agreed byte order, and then read the value again. Assign values ​​to domain variables in another value object to facilitate the writing of other code logic in the program. For this type of program, the benchmark of the joint debugging is that both the client and the server must complete the preparation of the message message building program, and the design method will directly lead to the slow progress of the Java program development. Even in the Debug phase, you will often encounter small errors in various domain field splicing in Java programs.

2. The SOAP protocol (WebService) is used as the format carrier of the message packet. The message generated by this method is based on the text format, and there is also a large amount of XML description information, which will greatly increase the burden of the network IO. Due to the complexity of XML parsing, this will also greatly reduce the performance of message parsing. In summary, the use of this design will result in a significant reduction in the overall operational performance of the system.

Protocol Buffer can be well solved for the problems caused by the above two methods. In addition, Protocol Buffer has a very important advantage to ensure compatibility between new and old versions of the same message.

3.2 Installing Protocol Buffer

// Unzip to the specified directory after downloading protobuf-2.6.1.tar.gz at https://developers.google.com/protocol-buffers/docs/downloads

$ tar -xvf protobuf-2.6.1.tar.gz -C app/

// delete the zip file

$ rm protobuf-2.6.1.tar.gz

// Install the C++ compiler related package

$ sudo apt-get install g++

/ / Compile and install protobuf

$ cd app/protobuf-2.6.1/

$ . /configure

$ make

$ make check

$ sudo make install

// added to lib

$ vim ~/.bashrc

Export LD_LIBRARY_PATH=$LD_LIBRARY_PATH: /usr/local/lib

$ source ~/.bashrc

// Verify that the installation was successful

$ protoc --version

3.3 Writing proto files

First you need to write a proto file that defines the structured data that needs to be processed in the program. Proto files are very similar to data definitions in Java or C. The following code gives the contents of the endpoint.proto file that defines the RPC interface in the example:

[plain] view plain copy// Define common options

opTIon java_package = "com.hbase.demo.endpoint"; //Specify the package name of the generated Java code

opTIon java_outer_classname = "Sum"; / / specify the external class name to generate Java code

opTIon java_generic_services = true; //Generate abstract service code based on service definition

opTIon optimize_for = SPEED; //Specify the optimization level

/ / Define the request package

Message SumRequest {

Required string family = 1; //column family

Required string column = 2; //column name

}

/ / Define a reply package

Message SumResponse {

Required int64 sum = 1 [default = 0]; //summation result

}

/ / Define the RPC service

Service SumService {

/ / Get the summation result

Rpc getSum(SumRequest)

Returns (SumResponse);

}

3.4 Compiling proto files

/ / Compile the proto file to generate java code

$ protoc endpoint.proto --java_out=. /

// The generated file Sum.java is shown below:

HBase coprocessor development coding example

4 Endpoint encoding example

The business logic such as summation, sorting and other functions are placed on the server side. After the server completes the calculation, the result is sent to the client, which can reduce the amount of data transmission. The following example will generate an RPC service on the server side of HBase, that is, the server will sum the specified column values ​​of the specified table and return the calculation result to the client. The client calls the RPC service and outputs the response result.

4.1 server code

First, import the RPC interface file Sum.java generated by Protocol Buffer into the project, and then create a new class SumEndPoint in the project to write the server code:

[java] view plain copypackage com.hbase.demo.endpoint;

Import java.io.IOException;

Import java.util.ArrayList;

Import java.util.List;

Import org.apache.hadoop.hbase.Cell;

Import org.apache.hadoop.hbase.CellUtil;

Import org.apache.hadoop.hbase.Coprocessor;

Import org.apache.hadoop.hbase.CoprocessorEnvironment;

Import org.apache.hadoop.hbase.client.Scan;

Import org.apache.hadoop.hbase.coprocessor.CoprocessorException;

Import org.apache.hadoop.hbase.coprocessor.CoprocessorService;

Import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

Import org.apache.hadoop.hbase.protobuf.ResponseConverter;

Import org.apache.hadoop.hbase.regionserver.InternalScanner;

Import org.apache.hadoop.hbase.util.Bytes;

Import com.google.protobuf.RpcCallback;

Import com.google.protobuf.RpcController;

Import com.google.protobuf.Service;

Import com.hbase.demo.endpoint.Sum.SumRequest;

Import com.hbase.demo.endpoint.Sum.SumResponse;

Import com.hbase.demo.endpoint.Sum.SumService;

/**

* @author developer

* Description: server code for hbase coprocessor endpooint

* Function: Inherit the rpc interface generated by the protocol buffer, perform the summation operation after the server obtains the data of the specified column, and finally return the result to the client.

*/

Public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService {

Private RegionCoprocessorEnvironment env; // Define the environment

@Override

Public Service getService() {

Return this;

}

@Override

Public void getSum(RpcController controller, SumRequest request, RpcCallback"SumResponse" done) {

/ / Define the variable

SumResponse response = null;

InternalScanner scanner = null;

/ / Set the scan object

Scan scan = new Scan();

scan.addFamily(Bytes.toBytes(request.getFamily()));

scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));

// scan each region, sum after value

Try {

Scanner = env.getRegion().getScanner(scan);

List "Cell" results = new ArrayList "Cell" ();

Boolean hasMore = false;

Long sum = 0L;

Do {

hasMore = scanner.next(results);

For (Cell cell : results) {

Sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));

}

Results.clear();

} while (hasMore);

/ / Set the return result

Response = SumResponse.newBuilder().setSum(sum).build();

} catch (IOException e) {

ResponseConverter.setControllerException(controller, e);

} finally {

If (scanner != null) {

Try {

Scanner.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

/ / Return the rpc result to the client

Done.run(response);

}

// method called when the coprocessor is initialized

@Override

Public void start(CoprocessorEnvironment env) throws IOException {

If (env instanceof RegionCoprocessorEnvironment) {

This.env = (RegionCoprocessorEnvironment)env;

} else {

Throw new CoprocessorException("no load region");

}

}

// method called when the coprocessor ends

@Override

Public void stop(CoprocessorEnvironment env) throws IOException {

}

}

4.2 client code

Create a new class SumClient in the project as a client test program that calls the RPC service. The code is as follows:

[java] view plain copypackage com.hbase.demo.endpoint;

Import java.io.IOException;

Import java.util.Map;

Import org.apache.hadoop.conf.Configuration;

Import org.apache.hadoop.hbase.HBaseConfiguration;

Import org.apache.hadoop.hbase.TableName;

Import org.apache.hadoop.hbase.client.Connection;

Import org.apache.hadoop.hbase.client.ConnectionFactory;

Import org.apache.hadoop.hbase.client.HTable;

Import org.apache.hadoop.hbase.client.coprocessor.Batch;

Import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;

Import com.google.protobuf.ServiceException;

Import com.hbase.demo.endpoint.Sum.SumRequest;

Import com.hbase.demo.endpoint.Sum.SumResponse;

Import com.hbase.demo.endpoint.Sum.SumService;

/**

* @author developer

* Description: client code for hbase coprocessor endpooint

* Function: Get the summation result of the data of the specified column of the hbase table from the server

*/

Public class SumClient {

Public static void main(String[] args) throws ServiceException, Throwable {

Long sum = 0L;

// Configure HBse

Configuration conf = HBaseConfiguration.create();

Conf.set("hbase.zookeeper.quorum", "localhost");

Conf.set("hbase.zookeeper.property.clientPort", "2222");

/ / Establish a database connection

Connection conn = ConnectionFactory.createConnection(conf);

// Get the table

HTable table = (HTable) conn.getTable(TableName.valueOf("sum_table"));

/ / Set the request object

Final SumRequest request = SumRequest.newBuilder().setFamily("info").setColumn("score").build();

/ / Get the return value

Map "byte[], Long" result = table.coprocessorService(SumService.class, null, null,

New Batch.Call "SumService, Long"() {

@Override

Public Long call(SumService service) throws IOException {

BlockingRpcCallback "SumResponse" rpcCallback = new BlockingRpcCallback "SumResponse" ();

service.getSum(null, request, rpcCallback);

SumResponse response = (SumResponse) rpcCallback.get();

Return response.hasSum() ? response.getSum() : 0L;

}

});

/ / Iteratively add the return value

For (Long v : result.values()) {

Sum += v;

}

// result output

System.out.println("sum: ” + sum);

// close the resource

Table.close();

Conn.close();

}

}

4.3 Loading Endpoint

// Package the Sum and SumEndPoint classes and upload them to HDFS

$ hadoopfs -put endpoint_sum.jar /input

/ / Modify the hbase configuration file, add configuration

$ vimapp/hbase-1.2.0-cdh5.7.1/conf/hbase-site.xml

[html] view plain copy "property"

"name" hbase.table.sanity.checks "/name"

"value" false "/value"

"/property"

// restart hbase

$stop-hbase.sh

$start-hbase.sh

// Start the hbase shell

$hbase shell

/ / Create a table sum_table

》 create'sum_table', 'info'

// insert test data

》 put'sum_table', 'rowkey01', 'info:score', '95'

》 put'sum_table', 'rowkey02', 'info:score', '98'

》 put'sum_table', 'rowkey02', 'info:age', '20'

/ / View data

》 scan'sum_table'

HBase coprocessor development coding example

// load the coprocessor

》disable 'sum_table'

Alter'sum_table', METHOD = "'table_att', 'coprocessor' = "'hdfs://localhost:9000/input/endpoint_sum.jar|com.hbase.demo.endpoint.SumEndPoint|100'

》enable 'sum_table'

HBase coprocessor development coding example

// If you want to uninstall the coprocessor, you can first check the coprocessor name in the table and then uninstall it by command.

》disable 'sum_table'

" describe'sum_table'

》 alter'sum_table', METHOD = "'table_att_unset', NAME="'coprocessor$1'

》 enable'sum_table'

HBase coprocessor development coding example

4.4 Testing

Run the client program SumClient in eclipse, the output is 193, just in line with expectations, as shown below:

HBase coprocessor development coding example

5 Observer encoding example

In general, indexing a database often requires a separate data structure to store the indexed data. In the hbase table, in addition to using rowkey index data, you can also create an additional index table, query the index table first, and then query the data table with the query results. The following example shows how to use the Observer coprocessor to generate the secondary index of the HBase table: the value of the column info:name in the data table ob_table is used as the rowkey of the index table index_ob_table, and the value of the column info:score in the data table ob_table is used as the index table. The index_ob_table column info: score value establishes a secondary index. When the user inserts data into the data table, the index table will automatically insert the secondary index, which facilitates querying the business data.

5.1 code

In the project, create a new class PutObserver as the Observer coprocessor application logic class, the code is as follows:

[java] view plain copypackage com.hbase.demo.observer;

Import java.io.IOException;

Import java.util.List;

Import org.apache.hadoop.hbase.Cell;

Import org.apache.hadoop.hbase.CellUtil;

Import org.apache.hadoop.hbase.TableName;

Import org.apache.hadoop.hbase.client.Durability;

Import org.apache.hadoop.hbase.client.HTableInterface;

Import org.apache.hadoop.hbase.client.Put;

Import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;

Import org.apache.hadoop.hbase.coprocessor.ObserverContext;

Import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

Import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

Import org.apache.hadoop.hbase.util.Bytes;

/**

* @author developer

* Description: application logic code of hbase coprocessor observer

* Function: In the hbase table to which the observer is applied, all put operations will use the info:name column value of each row of data as the rowkey and info:score column values ​​as the value.

* Write another secondary index table index_ob_table to improve query efficiency for specific fields

*/

@SuppressWarnings("deprecation")

Public class PutObserver extends BaseRegionObserver{

@Override

Public void postPut(ObserverContext "RegionCoprocessorEnvironment" e,

Put put, WALEdit edit, Durability durability) throws IOException {

/ / Get the secondary index table

HTableInterface table = e.getEnvironment().getTable(TableName.valueOf("index_ob_table"));

// get the value

List "Cell" cellList1 = put.get(Bytes.toBytes("info"), Bytes.toBytes("name"));

List "Cell" cellList2 = put.get(Bytes.toBytes("info"), Bytes.toBytes("score"));

/ / Insert data into the secondary index table

For (Cell cell1 : cellList1) {

// column info:name value as the rowkey of the secondary index table

Put indexPut = new Put(CellUtil.cloneValue(cell1));

For (Cell cell2 : cellList2) {

// column info: score value as the value of the column info:score in the secondary index table

indexPut.add(Bytes.toBytes("info"), Bytes.toBytes("score"), CellUtil.cloneValue(cell2));

}

// Data is inserted into the secondary index table

Table.put(indexPut);

}

// close the resource

Table.close();

}

}

5.2 Loading Observer

// Package the PutObserver class and upload it to HDFS

$ hadoopfs -put ovserver_put.jar /input

// Start the hbase shell

$hbase shell

/ / Create a data table ob_table

》 create'ob_table', 'info'

/ / Create a secondary index table ob_table

》 create'index_ob_table', 'info'

// load the coprocessor

》disable 'ob_table'

Alter'ob_table', METHOD = "'table_att', 'coprocessor' = "'hdfs://localhost:9000/input/observer_put.jar|com.hbase.demo.observer.PutObserver|100'

》 enable'ob_table'

/ / View data table ob_table

" describe'ob_table'

HBase coprocessor development coding example

5.3 Testing

// Write a client in the eclipse project and insert test data into the data table ob_table

[java] view plain copypackage com.hbase.demo.observer;

Import java.io.IOException;

Import org.apache.hadoop.conf.Configuration;

Import org.apache.hadoop.hbase.HBaseConfiguration;

Import org.apache.hadoop.hbase.TableName;

Import org.apache.hadoop.hbase.client.Connection;

Import org.apache.hadoop.hbase.client.ConnectionFactory;

Import org.apache.hadoop.hbase.client.HTable;

Import org.apache.hadoop.hbase.client.Put;

Import org.apache.hadoop.hbase.util.Bytes;

Public class Test {

Public static void main(String[] args) throws IOException {

// Configure HBse

Configuration conf = HBaseConfiguration.create();

Conf.set("hbase.zookeeper.quorum", "localhost");

Conf.set("hbase.zookeeper.property.clientPort", "2222");

/ / Establish a database connection

Connection conn = ConnectionFactory.createConnection(conf);

// Get the table

HTable table = (HTable) conn.getTable(TableName.valueOf("ob_table"));

// insert test data

Put put = new Put(Bytes.toBytes("rowkey01"));

put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("carl"));

put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("score"), Bytes.toBytes("92"));

Table.put(put);

// close the resource

Table.close();

Conn.close();

}

}

// After inserting the data, view the data in the data table ob_table in the hbase shell

$hbase shell

》 scan'ob_table'

HBase coprocessor development coding example

/ / View the data in the secondary index table index_ob_table in the hbase shell

》 scan'index_ob_table'

HBase coprocessor development coding example

M29 Series Connector

power off protection connector,street light connector,Road Lamp Protection Connector

Guangdong Ojun Technology Co., Ltd. , https://www.ojunconnector.com