HBaseのJava Clientを試す

HBaseのJava clientを試したメモです。


以前HBase shellからHBaseを触ってみたので、
HBaseのJava clientをKotlinで試してみました。


Apache HBase ™ Reference Guide

HBaseはclouderaのVirtualBox用のディストリビューションを使用し、
clientはWindow10上からアクセスしてみます。

下記バージョンで試してみます。
・Windows10
VirtualBox 5.2.16
・cdh5.13.0 (HBase 1.2.0)
・hbase-client 1.2.0-cdh5.13.0
・Kotlin 1.2.61

Dependency

Gradleで試します。
リポジトリにclouderaを追加します。

build.gradle

repositories {
    mavenCentral()
    maven {
        url 'https://repository.cloudera.com/cloudera/cloudera-repos/'
    }
}

下記の依存関係を追加しました。

dependencies {
    compile group: 'org.apache.hbase', name: 'hbase-client', version: '1.2.0-cdh5.13.0'
}

Connection

HBaseに接続してみます。

Configurationクラスを作成し、zookeeperのIPとportをセットします。

Configurationクラスを指定してConnectionFactoryでコネクションを作成し、
文字列でネームスペース:テーブル名を指定してTableNameインスタンスを作成します。
getTable()でTableの実装を取得して、テーブルに対して操作していきます。
※HConncection, HTableは1.0以降は@deperecated

    val conf= HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "127.0.0.1")
    conf.set("hbase.zookeeper.property.clientPort","2181")

    ConnectionFactory.createConnection(conf).use {
        val tableName = TableName.valueOf("myspace:people")
        it.getTable(tableName).use { table ->
            // do something
        }
    }

put

putでデータを投入してみます。
RowKeyをbyteで指定して、Putインスタンスを作成します。
addColumn()でcolumn family, column, valueをbyte[]で指定します。
Table.put()でPutインスタンスを指定するとputされます。

hbase clientでは色々な引数をbyte[ ]で指定する必要があるので、
Bytes Utiltyクラスを使用してbyte[ ]に変換しています。
頻繁に使用するのでtoBytes()などはstatic importしておくと楽です。

    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20))
    table.put(put)

get

getでrowを取得してみます。
RowKeyをbyteで指定して、Getインスタンスを作成します。
addColumn()でcolumn family, columnをbyte[ ]で指定します。

結果はResultオブジェクトで取得します。
getValue()でcolumn family, columnを指定して値を取得出来ます。
結果の値もbyte[ ]なのでBytes Utiltyクラスを使用して適切な型に変換します。

    val get = Get(Bytes.toBytes("row1"))
    get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"))
    get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"))
    val getResult = table.get(get)
    val name = Bytes.toString(getResult.getValue(Bytes.toBytes("f"), Bytes.toBytes("name")))
    val age = Bytes.toInt(getResult.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
    println("row1 name: $name")
    println("row1 age: $age")

結果

row1 name: Alice
row1 age: 20

exists

existsでrowが存在するか確認してみます。
getの時と同様にGetインスタンスを作成し、table.exists()でbooleanの結果が返ります。

addColumn()でcolumn family, columnを指定するとカラムの存在を確認出来ます。

    val getRow = Get(Bytes.toBytes("row1"))
    val existsRow = table.exists(getRow)
    println("row1 exists?: $existsRow")
    getRow.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"))
    val existsCol = table.exists(getRow)
    println("row1 f name exists?: $existsCol")

結果

row1 exists?: true
row1 f name exists?: true

delete

deleteでrowとcolumnを削除してみます。

RowKeyをbyteで指定して、Deleteインスタンスを作成します。
Table.delete()で対象rowを削除します。

    // prepare
    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20))
    table.put(put)

    // delete (row)
    val deleteRow = Delete(Bytes.toBytes("row1"))
    table.delete(deleteRow)

addColumn()でcolumn family, columnを指定します。
Table.delete()で対象columnを削除します。

    // prepare
    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20))
    table.put(put)

    // delete (column)
    val delete = Delete(Bytes.toBytes("row1"))
    delete.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"))
    table.delete(delete)

scan

scanで複数のrowを取得してみます。
Scanインスタンスを生成し、scan結果をResultScannerで取得します。
ResultScannerをイテレートするとResultインスタンスを取得できるので、ループして各rowの結果を取得します。

    // prepare
    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20))
    table.put(put)
    val put2 = Put(Bytes.toBytes("row2"))
    put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Bobby"))
    put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(33))
    table.put(put2)
    val put3 = Put(Bytes.toBytes("row3"))
    put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Cindy"))
    put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(19))
    table.put(put3)

    // scan
    val scan = Scan()
    val resultScanner = table.getScanner(scan)
    resultScanner.forEach { result ->
        val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name")))
        val scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
        println("scan name: $scanName")
        println("scan age: $scanAge")
    }

結果

scan name: Alice
scan age: 20
scan name: Bobby
scan age: 33
scan name: Cindy
scan age: 19

filter

filterで取得する際に条件でフィルタリング出来ます。

row filter

row filterでrowでフィルタリング出来ます。
RowFilterのインスタンスを作成します。第1引数に比較の演算子enumを指定し、第2引数に何で比較するかをComparatorで指定します。
等しいことを比較するためにCompareFilter.CompareOp.EQUALを指定し、byte[ ]で比較するためにBinaryComparatorにrowKeyを指定します。

あとはScanやGetのインスタンスにフィルタをセットして、取得します。

    // prepare
    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(22))
    table.put(put)
    val put2 = Put(Bytes.toBytes("row2"))
    put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Bobby"))
    put2.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(33))
    table.put(put2)
    val put3 = Put(Bytes.toBytes("row3"))
    put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Cindy"))
    put3.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(19))
    table.put(put3)

    // row filter
    val scan = Scan()
    val rowFilter = RowFilter(CompareFilter.CompareOp.EQUAL, BinaryComparator(Bytes.toBytes("row2")))
    scan.filter = rowFilter
    val resultRowFilter = table.getScanner(scan)
    resultRowFilter.forEach { result ->
        val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name")))
        val scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
        println("scan name(row filter): $scanName")
        println("scan age(row filter): $scanAge")
    }

結果

scan name(row filter): Bobby
scan age(row filter): 33
qualifier filter

qualifier filterでcolumnでフィルタリング出来ます。
QualifierFilterのインスタンスを作成します。
等しいことを比較するためにCompareFilter.CompareOp.EQUALを指定し、byte[ ]で比較するためにBinaryComparatorにcolumnを指定します。

同様にScanにフィルタをセットして、取得します。

    // qualifier filter
    val qualifierFilter = QualifierFilter(CompareFilter.CompareOp.EQUAL, BinaryComparator(Bytes.toBytes("name")))
    scan.filter = qualifierFilter
    val resultQualifierFilter = table.getScanner(scan)
    resultQualifierFilter.forEach { result ->
        val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name")))
        var scanAge: Int? = null
        if (result.containsColumn(Bytes.toBytes("f"), Bytes.toBytes("age"))) {
            scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
        }
        println("scan name(qualifier filter): $scanName")
        println("scan age(qualifier filter): $scanAge")
    }

結果

scan name(qualifier filter): Alice
scan age(qualifier filter): null
scan name(qualifier filter): Bobby
scan age(qualifier filter): null
scan name(qualifier filter): Cindy
scan age(qualifier filter): null
value filter

value filterでvalueでフィルタリング出来ます。
ValueFilterのインスタンスを作成します。
指定した値より大きいことを比較するためにCompareFilter.CompareOp.GREATERを指定し、byte[ ]で比較するためにBinaryComparatorにcolumnを指定します。

同様にScanにフィルタをセットして、取得します。

    // value filter
    val valueFilter = ValueFilter(CompareFilter.CompareOp.GREATER, BinaryComparator(Bytes.toBytes(20)))
    scan.filter = valueFilter
    val resultValueFilter = table.getScanner(scan)
    resultValueFilter.forEach { result ->
        val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name")))
        var scanAge: Int? = null
        if (result.containsColumn(Bytes.toBytes("f"), Bytes.toBytes("age"))) {
            scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
        }
        println("scan name(value filter): $scanName")
        println("scan age(value filter): $scanAge")
    }

結果

scan name(value filter): Alice
scan age(value filter): 22
scan name(value filter): Bobby
scan age(value filter): 33
scan name(value filter): Cindy
scan age(value filter): null

row mutaition

row mutaitionで1つのrowに対して複数の処理をatomicに実行出来ます。
RowKeyをbyteで指定して、RowMutationsインスタンスを作成します。
add()でatomicに処理したい操作を追加します。
Table.mutateRow()で実行されます。

    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("Alice"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20))
    val delete = Delete(Bytes.toBytes("row1"))
    delete.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"))

    // mutateRow
    val rowMutations = RowMutations(Bytes.toBytes("row1"))
    rowMutations.add(put)
    rowMutations.add(delete)
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("country"), Bytes.toBytes("Japan"))
    rowMutations.add(put)
    table.mutateRow(rowMutations)

    // scan
    val scan = Scan()
    val resultMutateRow = table.getScanner(scan)
    resultMutateRow.forEach { result ->
        val scanName = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("name")))
        val scanAge = Bytes.toInt(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
        val scanCountry = Bytes.toString(result.getValue(Bytes.toBytes("f"), Bytes.toBytes("country")))
        println("scan name: $scanName")
        println("scan age: $scanAge")
        println("scan country: $scanCountry")
    }

結果

scan name: null
scan age: 20
scan country: Japan

increment

incrementで指定した数インクリメント、デクリメント出来ます。
RowKeyをbyteで指定して、Incrementインスタンスを作成します。
addColumn()でcolumn family, column, インクリメントする値を指定します。
マイナスの値を指定するとデクリメントされます。

    // prepare
    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(33L))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("stamina"), Bytes.toBytes(100L))
    table.put(put)

    // increment
    val increment = Increment(Bytes.toBytes("row1"))
    increment.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), 1L)
    increment.addColumn(Bytes.toBytes("f"), Bytes.toBytes("stamina"), -10L)

    val result1 = table.increment(increment)
    val age1 = Bytes.toLong(result1.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
    val stamina1 = Bytes.toLong(result1.getValue(Bytes.toBytes("f"), Bytes.toBytes("stamina")))
    println("age: $age1")
    println("stamina: $stamina1")

    table.increment(increment)
    val result2 = table.increment(increment)
    val age2 = Bytes.toLong(result2.getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
    val stamina2 = Bytes.toLong(result2.getValue(Bytes.toBytes("f"), Bytes.toBytes("stamina")))
    println("age: $age2")
    println("stamina: $stamina2")

結果

age: 34
stamina: 90
age: 36
stamina: 70

cas

casで現在の値を確認し、特定の値の場合に更新、削除の操作が出来ます。
更新する値で更新用のPutインスタンスを作成します。
Table.checkAndPut()で、rowKey, column family, column, 期待する値, 更新用のPut, を指定します。
現在の値が引数で指定した期待する値の場合、Putインスタンスで指定した値に更新されます。

    // prepare
    val put = Put(Bytes.toBytes("row1"))
    put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20))
    table.put(put)

    // check and put
    val update = Put(Bytes.toBytes("row1"))
    update.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(18))
    val updateResult = table.checkAndPut(Bytes.toBytes("row1"), Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(20), update)
    println("update result: $updateResult")

    val get = Get(Bytes.toBytes("row1"))
    get.addColumn(Bytes.toBytes("f"), Bytes.toBytes("age"))
    val age = Bytes.toInt(table.get(get).getValue(Bytes.toBytes("f"), Bytes.toBytes("age")))
    println("row1 age: $age")

Table.checkAndDelete()で同様に期待する値の場合に削除出来ます。

    // check and delete
    val delete = Delete(Bytes.toBytes("row1"))
    val deleteResult = table.checkAndDelete(Bytes.toBytes("row1"), Bytes.toBytes("f"), Bytes.toBytes("age"), Bytes.toBytes(18), delete)
    println("delete result: $deleteResult")

    val exists = table.exists(get)
    println("row1 exists?: $exists")

結果

update result: true
row1 age: 18
delete result: true
row1 exists?: false

サンプルコードは下記に上げました。

github.com

*****************************************************************************************************

[メモ]

Windowsからのアクセス

Windows10からvirtual box上のcloudera VMにアクセスするのに苦労したのでメモです。

上記のプログラムを実行しようとすると下記の様に接続でストップしてしまいました。

18/09/19 02:38:45 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x2f943d71 connecting to ZooKeeper ensemble=127.0.0.1:2181
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:host.name=DESKTOP-KD61VDN
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_111
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.home=C:\Program Files\Java\jdk1.8.0_111\jre
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.class.path=C:\Program Files\Java\jdk1.8.0_111\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_111\jre\lib\deploy.jar; ・・・・
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Client environment:java.library.path=C:\Program Files\Java\jdk1.8.0_111\bin;C:\WINDOWS\Sun\Java\bin;C:\WINDOWS\system32;C:\WINDOWS;C:\WinAVR-20100110\bin;・・・・
                            :
                            :
                            :
                            :
18/09/19 02:38:49 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=90000 watcher=hconnection-0x2f943d710x0, quorum=127.0.0.1:2181, baseZNode=/hbase
18/09/19 02:38:50 INFO zookeeper.ClientCnxn: Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
18/09/19 02:38:50 INFO zookeeper.ClientCnxn: Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
18/09/19 02:38:50 INFO zookeeper.ClientCnxn: Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x1657787b4230099, negotiated timeout = 40000

アクセスできるように、/etc/hostsに下記設定を追記します。
これで接続出来るようになります。

/etc/hosts

127.0.0.1 quickstart.cloudera

https://stackoverflow.com/questions/31427077/unknown-host-exception-when-using-spring-data-hadoop-to-connect-to-cloudera-quic


終わり。