HBaseのJava Clientを試す
HBaseのJava clientを試したメモです。
以前HBase shellからHBaseを触ってみたので、
HBaseのJava clientをKotlinで試してみました。
Apache HBase ™ Reference Guide
HBaseはclouderaのVirtualBox用のディストリビューションを使用し、
clientはWindow10上からアクセスしてみます。
下記バージョンで試してみます。
・Windows10
・VirtualBox 5.2.16
・cdh5.13.0 (HBase 1.2.0)
・hbase-client 1.2.0-cdh5.13.0
・Kotlin 1.2.61
Dependency
Gradleで試します。
リポジトリにclouderaを追加します。
build.gradle
repositories { mavenCentral() maven { url 'https://repository.cloudera.com/cloudera/cloudera-repos/' } }
下記の依存関係を追加しました。
dependencies { compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.0-cdh5.13.0' }
Connection
HBaseに接続してみます。
Configurationクラスを作成し、zookeeperのIPとportをセットします。
Configurationクラスを指定してConnectionFactoryでコネクションを作成し、
文字列でネームスペース:テーブル名を指定してTableNameインスタンスを作成します。
getTable()でTableの実装を取得して、テーブルに対して操作していきます。
※HConncection, HTableは1.0以降は@deperecated
val conf= HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "127.0.0.1") conf.set("hbase.zookeeper.property.clientPort","2181") ConnectionFactory.createConnection(conf).use { val tableName = TableName.valueOf("myspace:people") it.getTable(tableName).use { table -> // do something } }
put
putでデータを投入してみます。
RowKeyをbyteで指定して、Putインスタンスを作成します。
addColumn()でcolumn family, column, valueをbyte[]で指定します。
Table.put()でPutインスタンスを指定するとputされます。
hbase clientでは色々な引数をbyte[ ]で指定する必要があるので、
Bytes Utiltyクラスを使用してbyte[ ]に変換しています。
頻繁に使用するのでtoBytes()などはstatic importしておくと楽です。
val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20)) table.put(put)
get
getでrowを取得してみます。
RowKeyをbyteで指定して、Getインスタンスを作成します。
addColumn()でcolumn family, columnをbyte[ ]で指定します。
結果はResultオブジェクトで取得します。
getValue()でcolumn family, columnを指定して値を取得出来ます。
結果の値もbyte[ ]なのでBytes Utiltyクラスを使用して適切な型に変換します。
val get = Get(Bytes.toBytes("row1")) get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name")) get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age")) val getResult = table.get(get) val name = Bytes.toString(getResult.getValue(Bytes.toBytes("f"), Bytes.toBytes("name"))) val age = Bytes.toInt(getResult.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) println("row1 name: $name") println("row1 age: $age")
結果
row1 name: Alice
row1 age: 20
exists
existsでrowが存在するか確認してみます。
getの時と同様にGetインスタンスを作成し、table.exists()でbooleanの結果が返ります。
addColumn()でcolumn family, columnを指定するとカラムの存在を確認出来ます。
val getRow = Get(Bytes.toBytes("row1")) val existsRow = table.exists(getRow) println("row1 exists?: $existsRow") getRow.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name")) val existsCol = table.exists(getRow) println("row1 f name exists?: $existsCol")
結果
row1 exists?: true row1 f name exists?: true
delete
deleteでrowとcolumnを削除してみます。
RowKeyをbyteで指定して、Deleteインスタンスを作成します。
Table.delete()で対象rowを削除します。
// prepare val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20)) table.put(put) // delete (row) val deleteRow = Delete(Bytes.toBytes("row1")) table.delete(deleteRow)
addColumn()でcolumn family, columnを指定します。
Table.delete()で対象columnを削除します。
// prepare val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20)) table.put(put) // delete (column) val delete = Delete(Bytes.toBytes("row1")) delete.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name")) table.delete(delete)
scan
scanで複数のrowを取得してみます。
Scanインスタンスを生成し、scan結果をResultScannerで取得します。
ResultScannerをイテレートするとResultインスタンスを取得できるので、ループして各rowの結果を取得します。
// prepare val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20)) table.put(put) val put2 = Put(Bytes.toBytes("row2")) put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Bobby")) put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(33)) table.put(put2) val put3 = Put(Bytes.toBytes("row3")) put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Cindy")) put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(19)) table.put(put3) // scan val scan = Scan() val resultScanner = table.getScanner(scan) resultScanner.forEach { result -> val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name"))) val scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) println("scan name: $scanName") println("scan age: $scanAge") }
結果
scan name: Alice scan age: 20 scan name: Bobby scan age: 33 scan name: Cindy scan age: 19
filter
filterで取得する際に条件でフィルタリング出来ます。
row filter
row filterでrowでフィルタリング出来ます。
RowFilterのインスタンスを作成します。第1引数に比較の演算子のenumを指定し、第2引数に何で比較するかをComparatorで指定します。
等しいことを比較するためにCompareFilter.CompareOp.EQUALを指定し、byte[ ]で比較するためにBinaryComparatorにrowKeyを指定します。
あとはScanやGetのインスタンスにフィルタをセットして、取得します。
// prepare val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(22)) table.put(put) val put2 = Put(Bytes.toBytes("row2")) put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Bobby")) put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(33)) table.put(put2) val put3 = Put(Bytes.toBytes("row3")) put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Cindy")) put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(19)) table.put(put3) // row filter val scan = Scan() val rowFilter = RowFilter(CompareFilter.CompareOp.EQUAL, BinaryComparator(Bytes.toBytes("row2"))) scan.filter = rowFilter val resultRowFilter = table.getScanner(scan) resultRowFilter.forEach { result -> val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name"))) val scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) println("scan name(row filter): $scanName") println("scan age(row filter): $scanAge") }
結果
scan name(row filter): Bobby scan age(row filter): 33
qualifier filter
qualifier filterでcolumnでフィルタリング出来ます。
QualifierFilterのインスタンスを作成します。
等しいことを比較するためにCompareFilter.CompareOp.EQUALを指定し、byte[ ]で比較するためにBinaryComparatorにcolumnを指定します。
同様にScanにフィルタをセットして、取得します。
// qualifier filter val qualifierFilter = QualifierFilter(CompareFilter.CompareOp.EQUAL, BinaryComparator(Bytes.toBytes("name"))) scan.filter = qualifierFilter val resultQualifierFilter = table.getScanner(scan) resultQualifierFilter.forEach { result -> val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name"))) var scanAge: Int? = null if (result.containsColumn(Bytes.toBytes("f"), Bytes.toBytes("age"))) { scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) } println("scan name(qualifier filter): $scanName") println("scan age(qualifier filter): $scanAge") }
結果
scan name(qualifier filter): Alice scan age(qualifier filter): null scan name(qualifier filter): Bobby scan age(qualifier filter): null scan name(qualifier filter): Cindy scan age(qualifier filter): null
value filter
value filterでvalueでフィルタリング出来ます。
ValueFilterのインスタンスを作成します。
指定した値より大きいことを比較するためにCompareFilter.CompareOp.GREATERを指定し、byte[ ]で比較するためにBinaryComparatorにcolumnを指定します。
同様にScanにフィルタをセットして、取得します。
// value filter val valueFilter = ValueFilter(CompareFilter.CompareOp.GREATER, BinaryComparator(Bytes.toBytes(20))) scan.filter = valueFilter val resultValueFilter = table.getScanner(scan) resultValueFilter.forEach { result -> val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name"))) var scanAge: Int? = null if (result.containsColumn(Bytes.toBytes("f"), Bytes.toBytes("age"))) { scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) } println("scan name(value filter): $scanName") println("scan age(value filter): $scanAge") }
結果
scan name(value filter): Alice scan age(value filter): 22 scan name(value filter): Bobby scan age(value filter): 33 scan name(value filter): Cindy scan age(value filter): null
row mutaition
row mutaitionで1つのrowに対して複数の処理をatomicに実行出来ます。
RowKeyをbyteで指定して、RowMutationsインスタンスを作成します。
add()でatomicに処理したい操作を追加します。
Table.mutateRow()で実行されます。
val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20)) val delete = Delete(Bytes.toBytes("row1")) delete.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name")) // mutateRow val rowMutations = RowMutations(Bytes.toBytes("row1")) rowMutations.add(put) rowMutations.add(delete) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("country"), Bytes.toBytes("Japan")) rowMutations.add(put) table.mutateRow(rowMutations) // scan val scan = Scan() val resultMutateRow = table.getScanner(scan) resultMutateRow.forEach { result -> val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name"))) val scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) val scanCountry = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("country"))) println("scan name: $scanName") println("scan age: $scanAge") println("scan country: $scanCountry") }
結果
scan name: null
scan age: 20
scan country: Japan
increment
incrementで指定した数インクリメント、デクリメント出来ます。
RowKeyをbyteで指定して、Incrementインスタンスを作成します。
addColumn()でcolumn family, column, インクリメントする値を指定します。
マイナスの値を指定するとデクリメントされます。
// prepare val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(33L)) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("stamina"), Bytes.toBytes(100L)) table.put(put) // increment val increment = Increment(Bytes.toBytes("row1")) increment.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), 1L) increment.addColumn(Bytes.toBytes("f"), Bytes.toBytes("stamina"), -10L) val result1 = table.increment(increment) val age1 = Bytes.toLong(result1.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) val stamina1 = Bytes.toLong(result1.getValue(Bytes.toBytes("f"), Bytes.toBytes("stamina"))) println("age: $age1") println("stamina: $stamina1") table.increment(increment) val result2 = table.increment(increment) val age2 = Bytes.toLong(result2.getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) val stamina2 = Bytes.toLong(result2.getValue(Bytes.toBytes("f"), Bytes.toBytes("stamina"))) println("age: $age2") println("stamina: $stamina2")
結果
age: 34 stamina: 90 age: 36 stamina: 70
cas
casで現在の値を確認し、特定の値の場合に更新、削除の操作が出来ます。
更新する値で更新用のPutインスタンスを作成します。
Table.checkAndPut()で、rowKey, column family, column, 期待する値, 更新用のPut, を指定します。
現在の値が引数で指定した期待する値の場合、Putインスタンスで指定した値に更新されます。
// prepare val put = Put(Bytes.toBytes("row1")) put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20)) table.put(put) // check and put val update = Put(Bytes.toBytes("row1")) update.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(18)) val updateResult = table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20), update) println("update result: $updateResult") val get = Get(Bytes.toBytes("row1")) get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age")) val age = Bytes.toInt(table.get(get).getValue(Bytes.toBytes("f"), Bytes.toBytes("age"))) println("row1 age: $age")
Table.checkAndDelete()で同様に期待する値の場合に削除出来ます。
// check and delete val delete = Delete(Bytes.toBytes("row1")) val deleteResult = table.checkAndDelete(Bytes.toBytes("row1"), Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(18), delete) println("delete result: $deleteResult") val exists = table.exists(get) println("row1 exists?: $exists")
結果
update result: true row1 age: 18 delete result: true row1 exists?: false
サンプルコードは下記に上げました。
*****************************************************************************************************
[メモ]
Windowsからのアクセス
Windows10からvirtual box上のcloudera VMにアクセスするのに苦労したのでメモです。
上記のプログラムを実行しようとすると下記の様に接続でストップしてしまいました。
18/09/19 02:38:45 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x2f943d71 connecting to ZooKeeper ensemble=127.0.0.1:2181 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:host.name=DESKTOP-KD61VDN 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_111 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.home=C:\Program Files\Java\jdk1.8.0_111\jre 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.class.path=C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar; ・・・・ 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.library.path=C:\Program Files\Java\jdk1.8.0_111\bin;C:\WINDOWS\Sun\Java\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\WinAVR-20100110\bin;・・・・ : : : : 18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=90000 watcher=hconnection-0x2f943d710x0, quorum=127.0.0.1:2181, baseZNode=/hbase 18/09/19 02:38:50 INFO zookeeper.ClientCnxn: Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) 18/09/19 02:38:50 INFO zookeeper.ClientCnxn: Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session 18/09/19 02:38:50 INFO zookeeper.ClientCnxn: Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x1657787b4230099, negotiated timeout = 40000
アクセスできるように、/etc/hostsに下記設定を追記します。
これで接続出来るようになります。
/etc/hosts
127.0.0.1 quickstart.cloudera
終わり。