Hbase 基础API

2021-07-10 20:07

阅读:458

标签:clone   顺序   mes   日志   enable   long   新版本   ram   get   

 

本文参考:https://www.cnblogs.com/skyl/p/4803738.html

package com.Hbase

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter

object HbaseDDL {

  def main(args: Array[String]): Unit = {

    // 确认表是否存在

    def ensureHbaseTableExist(tableName:TableName) = {
      // 配置 Hbase
      val hbaseconf = HBaseConfiguration.create()
      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val ifExist = adminHbase.tableExists(tableName)

      ifExist
    }

    // 确认表是否存在 测试
//    val result = ensureHbaseTableExist("user_info1")
//    if (result) {
//      println("表存在")
//    } else {
//      println("表不存在")
//    }


    /**
      * Hbase建表  两个参数
      * @param tableName   形式为 ns:tb  或者  tb    API 创建 namespace 机会不多,一般通过 hbase shell 创建
      * @param columnFamilys  cf1,cf2,cf3
      */
    def createHbaseTable(tableName:String, columnFamilys:String) = {

      // 配置 Hbase
      val hbaseconf = HBaseConfiguration.create()
      //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
      //hbaseconf.set("hbase.zookeeper.quorum", zkConn)
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      // 列簇逗号分隔
      val CFS = columnFamilys.split(",")

      // 表名,判断是否带了namespace,带了则判断是否存在 namespace, 不存在则创建
      val nameSpace = tableName.split(":")(0)


      if (nameSpace != tableName) {
        adminHbase.createNamespace(NamespaceDescriptor.create(tableName.split(":")(0)).build())
        println("NameSpace 创建成功!")
      }
      // 判断表是否存在,不存在新建,存在则提示
      if (!ensureHbaseTableExist(TableName.valueOf(tableName))) {

        // 实例化 HTableDescriptor
        val htable = new HTableDescriptor(TableName.valueOf(tableName))

        // 循环添加所有列簇
        for ( columnFamily    ns:tb   or    tb   [String]
      * @param columnFamily   列族名  --->   String
      */
    def deleteHbaseColumnFamily(tableName:String, columnFamily:String) ={

      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)
      // disable table
      adminHbase.disableTable(tbName)

      // get HTableDescriptor

      val htd = adminHbase.getTableDescriptor(tbName)

      // delete family
      htd.removeFamily(columnFamily.getBytes())

      // apply htd to table
      adminHbase.modifyTable(tbName, htd)

      // enable table
      adminHbase.enableTable(tbName)

      println("删除成功")

      adminHbase.close()

    }

   // deleteHbaseColumnFamily("scTable3", "base")


    /**
      * 给表增加列族    先得到表的 HTableDescriptor, 然后使用 HColumnDescriptor 初始化 新增列,并设置属性
      * 调用 HTableDescriptor 的 addFamily 方法,将初始化好的 HCD 添加到  HTableDescriptor ,然后使用admin 的 modifyTable 方法将修改应用
      * @param tableName
      * @param columnFamily
      */
    def addHbaseColumnFamily(tableName:String, columnFamily:String) ={

      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)
      // disable table
      adminHbase.disableTable(tbName)

      // get HTableDescriptor
      val htd = adminHbase.getTableDescriptor(tbName)

      //
      val hcd = new HColumnDescriptor(columnFamily)
      hcd.setMaxVersions(3)

      // add family
      htd.addFamily(hcd)

      // apply htd to table
      adminHbase.modifyTable(tbName, htd)

      // enable table
      adminHbase.enableTable(tbName)

      println("添加成功")

      adminHbase.close()

    }

    //addHbaseColumnFamily("scTable3", "base")


    /**
      * 修改列簇功能  get 到  HTableDescriptor ,再 get 到 Family ,设置 Family ,admin modifyTable  应用
      * @param tableName
      * @param columnFamily
      */
    def modifyHbaseTableColumnFamily(tableName:String, columnFamily:String)  ={
      val hbaseconf = HBaseConfiguration.create()
      val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
      val adminHbase = HbaseConn.getAdmin()

      val tbName = TableName.valueOf(tableName)

      adminHbase.disableTable(tbName)

      val htd = adminHbase.getTableDescriptor(tbName)

      val modifyCol  = htd.getFamily(columnFamily.getBytes())
      modifyCol.setMaxVersions(3)

      adminHbase.modifyTable(tbName,htd)

      adminHbase.enableTable(tbName)

      adminHbase.close()

      println("修改成功!")
    }

    //modifyHbaseTableColumnFamily("scTable3", "info")

    /**
      * 插入数据,五个参数
      * @param tableName
      * @param columnFamily
      * @param column
      * @param rowkey
      * @param value
      */
    def putDataHbaseTable(tableName:String, columnFamily:String, column:String,
                          rowkey:String, value:String) ={
      val hbaseconf = HBaseConfiguration.create()

      // table
      val hTable = new HTable(hbaseconf, tableName)
      // row key
      val putData = new Put(rowkey.getBytes())
      // put value
      putData.add(columnFamily.getBytes(), column.getBytes(), value.getBytes())

      /**
        * 插入方式
        * ASYNC_WAL : 当数据变动时,异步写WAL日志
        * SYNC_WAL : 当数据变动时,同步写WAL日志
        * FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
        * SKIP_WAL : 不写WAL日志
        * USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WAL
        */
      putData.setDurability(Durability.SYNC_WAL)

      // put data to table
      hTable.put(putData)

      println("插入数据成功!")
      // close table
      hTable.close()
    }

    //putDataHbaseTable("scTable3", "info", "name", "rk0002", "lalala")

    def deleteDataHbaseTable(tableName: String, rowkey:String, columnFamily:String,
                             column:String = null)={

      val hbaseConf = HBaseConfiguration.create()

      val hTable = new HTable(hbaseConf, tableName)

      // 初始化 Delete ,表名后可接时间戳
      val deletaData = new Delete(rowkey.getBytes())

      /**
        *   1).删除指定列的 最新版本 的数据:Delete addColumn (byte[] family, byte[] qualifier)
        *   2).删除指定列的 指定版本 的数据:Delete addColumn (byte[] family, byte[] qualifier, long timestamp )
        *   3).删除指定列的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier)
        *   4).删除指定列的,时间戳 小于等于 给定时间戳的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier, long timestamp )
        *   5).删除指定列族的所有列的 所有版本 数据:Delete addFamily (byte[] family)    默认使用当前时间的时间戳,时间戳大于当前时间的数据删除不掉
        *   6).删除指定列族的所有列中 时间戳 小于等于 指定时间戳 的所有数据:Delete addFamily (byte[] family, long timestamp)
        *   7).删除指定列族中 所有列的时间戳 等于 指定时间戳的版本数据:Delete addFamilyVersion (byte[] family, long timestamp)
        */
      deletaData.addColumn(columnFamily.getBytes(),column.getBytes())
      //deletaData.addColumns(columnFamily.getBytes(),column.getBytes())
      //deletaData.addFamily(columnFamily.getBytes())
      hTable.delete(deletaData)

      println("删除成功")
      hTable.close()

    }

   // deleteDataHbaseTable("scTable3", "rk0002", "info")

    def getDataHbaseTable(tableName:String, rowkey:String, columnFamily:String, column:String = null)={
      val hbaseCOnf =  HBaseConfiguration.create()

      val hTable = new HTable(hbaseCOnf, tableName)

      val getData = new Get(rowkey.getBytes())

      /**
        *    1). Get addFamily(byte[] family) 指定希望获取的列族
        *    2). Get addColumn(byte[] family, byte[] qualifier) 指定希望获取的列
        *    3). Get setTimeRange(long minStamp, long maxStamp) 设置获取数据的 时间戳范围
        *    4). Get setTimeStamp(long timestamp) 设置获取数据的时间戳
        *    5). Get setMaxVersions(int maxVersions) 设定获取数据的版本数
        *    6). Get setMaxVersions() 设定获取数据的所有版本
        *    7). Get setFilter(Filter filter) 为Get对象添加过滤器
        *    8). void setCacheBlocks(boolean cacheBlocks) 设置该Get获取的数据是否缓存在内存中
        */
      //getData.addFamily(columnFamily.getBytes())

      //getData.addColumn(columnFamily.getBytes(), column.getBytes())

      //getData.setTimeStamp("1535680422860".toLong)

      getData.setMaxVersions()
      val results = hTable.get(getData)

      for (result 

  

Hbase 基础API

标签:clone   顺序   mes   日志   enable   long   新版本   ram   get   

原文地址:https://www.cnblogs.com/mlxx9527/p/9668733.html


评论


亲,登录后才可以留言!