博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HBase读写的几种方式(一)java篇
阅读量:4089 次
发布时间:2019-05-25

本文共 6788 字,大约阅读时间需要 22 分钟。

1.HBase读写的方式概况

主要分为:

  1. 纯Java API读写HBase的方式;
  2. Spark读写HBase的方式;
  3. Flink读写HBase的方式;
  4. HBase通过Phoenix读写的方式;

第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

注意:

这里我们使用HBase2.1.2版本,以下代码都是基于该版本开发的。

2. 纯Java API读写HBase

2.1 连接HBase

这里我们采用静态方式连接HBase,不同于2.1.2之前的版本,无需创建HBase线程池,HBase2.1.2提供的代码已经封装好,只需创建调用即可:

/**  * 声明静态配置  */static Configuration conf = null;static Connection conn = null;static {       conf = HBaseConfiguration.create();       conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");       conf.set("hbase.zookeeper.property.client", "2181");       try{           conn = ConnectionFactory.createConnection(conf);       }catch (Exception e){           e.printStackTrace();       }}

2.2 创建HBase的表

创建HBase表,是通过Admin来执行的,表和列簇则是分别通过TableDescriptorBuilder和ColumnFamilyDescriptorBuilder来构建。

/** * 创建只有一个列簇的表 * @throws Exception */public static void createTable() throws Exception{    Admin admin = conn.getAdmin();    if (!admin.tableExists(TableName.valueOf("test"))){        TableName tableName = TableName.valueOf("test");        //表描述器构造器        TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName);        //列族描述器构造器        ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user"));        //获得列描述器        ColumnFamilyDescriptor cfd = cdb.build();        //添加列族        tdb.setColumnFamily(cfd);        //获得表描述器        TableDescriptor td = tdb.build();        //创建表        admin.createTable(td);    }else {        System.out.println("表已存在");    }    //关闭连接}

2.3 HBase表添加数据

通过put api来添加数据

/** * 添加数据(多个rowKey,多个列族) * @throws Exception */public static void insertMany() throws Exception{    Table table = conn.getTable(TableName.valueOf("test"));    List
puts = new ArrayList
(); Put put1 = new Put(Bytes.toBytes("rowKey1")); put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd")); Put put2 = new Put(Bytes.toBytes("rowKey2")); put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25")); Put put3 = new Put(Bytes.toBytes("rowKey3")); put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg")); Put put4 = new Put(Bytes.toBytes("rowKey4")); put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男")); puts.add(put1); puts.add(put2); puts.add(put3); puts.add(put4); table.put(puts); table.close();}

2.4 删除HBase的列簇或列

/** * 根据rowKey删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列 * @param tableName * @param rowKey * @throws Exception */public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{    Table table = conn.getTable(tableName);    Delete delete = new Delete(Bytes.toBytes(rowKey));    //①根据rowKey删除一行数据    table.delete(delete);        //②删除某一行的某一个列簇内容    delete.addFamily(Bytes.toBytes(columnFamily));        //③删除某一行某个列簇某列的值    delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));    table.close();}

2.5 更新HBase表的列

使用Put api直接替换掉即可

/** * 根据RowKey , 列簇, 列名修改值 * @param tableName * @param rowKey * @param columnFamily * @param columnName * @param columnValue * @throws Exception */public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{    Table table = conn.getTable(tableName);    Put put1 = new Put(Bytes.toBytes(rowKey));    put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue));    table.put(put1);    table.close();}

2.6 HBase查询

HBase查询分为get、scan、scan和filter结合。filter过滤器又分为RowFilter(rowKey过滤器)、SingleColumnValueFilter(列值过滤器)、ColumnPrefixFilter(列名前缀过滤器)。

/** * 根据rowKey查询数据 * @param tableName * @param rowKey * @throws Exception */public static void getResult(TableName tableName, String rowKey) throws Exception{    Table table = conn.getTable(tableName);    //获得一行    Get get = new Get(Bytes.toBytes(rowKey));    Result set = table.get(get);    Cell[] cells = set.rawCells();    for (Cell cell: cells){        System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" +        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));    }    table.close();}//过滤器 LESS <  LESS_OR_EQUAL <=   EQUAL =   NOT_EQUAL <>   GREATER_OR_EQUAL >=   GREATER >   NO_OP 排除所有/** * @param tableName * @throws Exception */public static void scanTable(TableName tableName) throws Exception{    Table table = conn.getTable(tableName);        //①全表扫描    Scan scan1 = new Scan();    ResultScanner rscan1 = table.getScanner(scan1);        //②rowKey过滤器    Scan scan2 = new Scan();    //str$ 末尾匹配,相当于sql中的 %str  ^str开头匹配,相当于sql中的str%    RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$"));    scan2.setFilter(filter);    ResultScanner rscan2 = table.getScanner(scan2);        //③列值过滤器    Scan scan3 = new Scan();    //下列参数分别为列族,列名,比较符号,值    SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),               CompareOperator.EQUAL, Bytes.toBytes("spark"));    scan3.setFilter(filter3);    ResultScanner rscan3 = table.getScanner(scan3);        //列名前缀过滤器    Scan scan4 = new Scan();    ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name"));    scan4.setFilter(filter4);    ResultScanner rscan4 = table.getScanner(scan4);        //过滤器集合    Scan scan5 = new Scan();    FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);    SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),              CompareOperator.EQUAL, Bytes.toBytes("spark"));    ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name"));    list.addFilter(filter51);    list.addFilter(filter52);    scan5.setFilter(list);    ResultScanner rscan5 = table.getScanner(scan5);        for (Result rs : rscan){        String rowKey = Bytes.toString(rs.getRow());        System.out.println("row key :" + rowKey);        Cell[] cells = rs.rawCells();        for (Cell cell: cells){            System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::"                    + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::"                    + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));        }        System.out.println("-------------------------------------------");    }}

3.总结

 查看Spark上读写HBase

  查看flink上读写HBase

github地址:

参考资料:

https://hbase.apache.org/book.html

转载地址:http://cfcii.baihongyu.com/

你可能感兴趣的文章
Spring AOP + Redis + 注解实现redis 分布式锁
查看>>
poj 1976 A Mini Locomotive (dp 二维01背包)
查看>>
《计算机网络》第五章 运输层 ——TCP和UDP 可靠传输原理 TCP流量控制 拥塞控制 连接管理
查看>>
《PostgreSQL技术内幕:查询优化深度探索》养成记
查看>>
剑指_复杂链表的复制
查看>>
FTP 常见问题
查看>>
shell 快捷键
查看>>
MODULE_DEVICE_TABLE的理解
查看>>
No devices detected. Fatal server error: no screens found
查看>>
db db2_monitorTool IBM Rational Performace Tester
查看>>
postgresql监控工具pgstatspack的安装及使用
查看>>
swift中单例的创建及销毁
查看>>
获取App Store中App的ipa包
查看>>
UIImage存为本地文件与UIImage转换为NSData
查看>>
[转]打印质数的各种算法
查看>>
[转]javascript with延伸的作用域是只读的吗?
查看>>
php的autoload与global
查看>>
IE不支持option的display:none属性
查看>>
[分享]mysql内置用于字符串型ip地址和整数型ip地址转换函数
查看>>
Https加密及攻防
查看>>