Hbase 基础API
2021-07-10 20:07
阅读:520
标签:clone 顺序 mes 日志 enable long 新版本 ram get
本文参考:https://www.cnblogs.com/skyl/p/4803738.html
package com.Hbase import org.apache.hadoop.hbase._ import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.SingleColumnValueFilter object HbaseDDL { def main(args: Array[String]): Unit = { // 确认表是否存在 def ensureHbaseTableExist(tableName:TableName) = { // 配置 Hbase val hbaseconf = HBaseConfiguration.create() //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183" //hbaseconf.set("hbase.zookeeper.quorum", zkConn) val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val ifExist = adminHbase.tableExists(tableName) ifExist } // 确认表是否存在 测试 // val result = ensureHbaseTableExist("user_info1") // if (result) { // println("表存在") // } else { // println("表不存在") // } /** * Hbase建表 两个参数 * @param tableName 形式为 ns:tb 或者 tb API 创建 namespace 机会不多,一般通过 hbase shell 创建 * @param columnFamilys cf1,cf2,cf3 */ def createHbaseTable(tableName:String, columnFamilys:String) = { // 配置 Hbase val hbaseconf = HBaseConfiguration.create() //val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183" //hbaseconf.set("hbase.zookeeper.quorum", zkConn) val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() // 列簇逗号分隔 val CFS = columnFamilys.split(",") // 表名,判断是否带了namespace,带了则判断是否存在 namespace, 不存在则创建 val nameSpace = tableName.split(":")(0) if (nameSpace != tableName) { adminHbase.createNamespace(NamespaceDescriptor.create(tableName.split(":")(0)).build()) println("NameSpace 创建成功!") } // 判断表是否存在,不存在新建,存在则提示 if (!ensureHbaseTableExist(TableName.valueOf(tableName))) { // 实例化 HTableDescriptor val htable = new HTableDescriptor(TableName.valueOf(tableName)) // 循环添加所有列簇 for ( columnFamily ns:tb or tb [String] * @param columnFamily 列族名 ---> String */ def deleteHbaseColumnFamily(tableName:String, columnFamily:String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) // disable table adminHbase.disableTable(tbName) // get HTableDescriptor val htd = adminHbase.getTableDescriptor(tbName) // delete family htd.removeFamily(columnFamily.getBytes()) // apply htd to table adminHbase.modifyTable(tbName, htd) // enable table adminHbase.enableTable(tbName) println("删除成功") adminHbase.close() } // deleteHbaseColumnFamily("scTable3", "base") /** * 给表增加列族 先得到表的 HTableDescriptor, 然后使用 HColumnDescriptor 初始化 新增列,并设置属性 * 调用 HTableDescriptor 的 addFamily 方法,将初始化好的 HCD 添加到 HTableDescriptor ,然后使用admin 的 modifyTable 方法将修改应用 * @param tableName * @param columnFamily */ def addHbaseColumnFamily(tableName:String, columnFamily:String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) // disable table adminHbase.disableTable(tbName) // get HTableDescriptor val htd = adminHbase.getTableDescriptor(tbName) // val hcd = new HColumnDescriptor(columnFamily) hcd.setMaxVersions(3) // add family htd.addFamily(hcd) // apply htd to table adminHbase.modifyTable(tbName, htd) // enable table adminHbase.enableTable(tbName) println("添加成功") adminHbase.close() } //addHbaseColumnFamily("scTable3", "base") /** * 修改列簇功能 get 到 HTableDescriptor ,再 get 到 Family ,设置 Family ,admin modifyTable 应用 * @param tableName * @param columnFamily */ def modifyHbaseTableColumnFamily(tableName:String, columnFamily:String) ={ val hbaseconf = HBaseConfiguration.create() val HbaseConn = ConnectionFactory.createConnection(hbaseconf) val adminHbase = HbaseConn.getAdmin() val tbName = TableName.valueOf(tableName) adminHbase.disableTable(tbName) val htd = adminHbase.getTableDescriptor(tbName) val modifyCol = htd.getFamily(columnFamily.getBytes()) modifyCol.setMaxVersions(3) adminHbase.modifyTable(tbName,htd) adminHbase.enableTable(tbName) adminHbase.close() println("修改成功!") } //modifyHbaseTableColumnFamily("scTable3", "info") /** * 插入数据,五个参数 * @param tableName * @param columnFamily * @param column * @param rowkey * @param value */ def putDataHbaseTable(tableName:String, columnFamily:String, column:String, rowkey:String, value:String) ={ val hbaseconf = HBaseConfiguration.create() // table val hTable = new HTable(hbaseconf, tableName) // row key val putData = new Put(rowkey.getBytes()) // put value putData.add(columnFamily.getBytes(), column.getBytes(), value.getBytes()) /** * 插入方式 * ASYNC_WAL : 当数据变动时,异步写WAL日志 * SYNC_WAL : 当数据变动时,同步写WAL日志 * FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘 * SKIP_WAL : 不写WAL日志 * USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WAL */ putData.setDurability(Durability.SYNC_WAL) // put data to table hTable.put(putData) println("插入数据成功!") // close table hTable.close() } //putDataHbaseTable("scTable3", "info", "name", "rk0002", "lalala") def deleteDataHbaseTable(tableName: String, rowkey:String, columnFamily:String, column:String = null)={ val hbaseConf = HBaseConfiguration.create() val hTable = new HTable(hbaseConf, tableName) // 初始化 Delete ,表名后可接时间戳 val deletaData = new Delete(rowkey.getBytes()) /** * 1).删除指定列的 最新版本 的数据:Delete addColumn (byte[] family, byte[] qualifier) * 2).删除指定列的 指定版本 的数据:Delete addColumn (byte[] family, byte[] qualifier, long timestamp ) * 3).删除指定列的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier) * 4).删除指定列的,时间戳 小于等于 给定时间戳的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier, long timestamp ) * 5).删除指定列族的所有列的 所有版本 数据:Delete addFamily (byte[] family) 默认使用当前时间的时间戳,时间戳大于当前时间的数据删除不掉 * 6).删除指定列族的所有列中 时间戳 小于等于 指定时间戳 的所有数据:Delete addFamily (byte[] family, long timestamp) * 7).删除指定列族中 所有列的时间戳 等于 指定时间戳的版本数据:Delete addFamilyVersion (byte[] family, long timestamp) */ deletaData.addColumn(columnFamily.getBytes(),column.getBytes()) //deletaData.addColumns(columnFamily.getBytes(),column.getBytes()) //deletaData.addFamily(columnFamily.getBytes()) hTable.delete(deletaData) println("删除成功") hTable.close() } // deleteDataHbaseTable("scTable3", "rk0002", "info") def getDataHbaseTable(tableName:String, rowkey:String, columnFamily:String, column:String = null)={ val hbaseCOnf = HBaseConfiguration.create() val hTable = new HTable(hbaseCOnf, tableName) val getData = new Get(rowkey.getBytes()) /** * 1). Get addFamily(byte[] family) 指定希望获取的列族 * 2). Get addColumn(byte[] family, byte[] qualifier) 指定希望获取的列 * 3). Get setTimeRange(long minStamp, long maxStamp) 设置获取数据的 时间戳范围 * 4). Get setTimeStamp(long timestamp) 设置获取数据的时间戳 * 5). Get setMaxVersions(int maxVersions) 设定获取数据的版本数 * 6). Get setMaxVersions() 设定获取数据的所有版本 * 7). Get setFilter(Filter filter) 为Get对象添加过滤器 * 8). void setCacheBlocks(boolean cacheBlocks) 设置该Get获取的数据是否缓存在内存中 */ //getData.addFamily(columnFamily.getBytes()) //getData.addColumn(columnFamily.getBytes(), column.getBytes()) //getData.setTimeStamp("1535680422860".toLong) getData.setMaxVersions() val results = hTable.get(getData) for (result
Hbase 基础API
标签:clone 顺序 mes 日志 enable long 新版本 ram get
原文地址:https://www.cnblogs.com/mlxx9527/p/9668733.html
评论
亲,登录后才可以留言!