項目使用Hbase進行數據快速查詢的代碼案例


之前項目中對於數據詳情的查詢使用的ddb技術,由於成本過高,現考慮使用開源的hbase框架,借此機會進行hbase的代碼案例記錄,之前已經對

hbase的原理進行介紹,介紹了hbase中的rowkey,列,列族,以及存儲原理等,可以參考之前的博客,現只針對hbase的java Api進行分析。

 

一、連接配置,拿到 connection 

    /**
     * 聲明靜態配置
     */
    private Configuration conf = null;
    private Connection connection = null;

    /**
     * 構造函數
     */
    public HBaseService(Configuration conf)
    {
        this.conf = conf;
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
            log.error("獲取HBase連接失敗");
        }
    }

 

 

   @Value("${HBase.nodes}")
    private String nodes;

    @Value("${HBase.maxsize}")
    private String maxsize;

    @Bean
    public HBaseService getHbaseService(){
        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",nodes );
        conf.set("hbase.client.keyvalue.maxsize",maxsize);
        conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 60000);
        conf.setInt("hbase.rpc.timeout", 20000);
        conf.setInt("hbase.client.operation.timeout", 30000);
        conf.setInt("hbase.client.scanner.timeout.period", 20000);
        conf.setInt("hbase.client.pause", 50);
        conf.setInt("hbase.client.retries.number", 15);

        return new HBaseService(conf);
    }

 

創建表:

  /**
     * 創建表
     * @author gxy
     * @date 2018/7/3 17:50
     * @since 1.0.0
     * @param tableName 表名
     * @param columnFamily 列族名  list表
     * @return void
     */
    public boolean creatTable(String tableName, List<String> columnFamily)
    {
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            //一個表中可以存在多個列族
            List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());

            for(String cf : columnFamily)   //遍歷所有列族
            {
                familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());  //添加列族的描述
            }

            TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))   //添加表的描述
                    .setColumnFamilies(familyDescriptors)
                    .build();

            if (admin.tableExists(TableName.valueOf(tableName))) {
                log.debug("table Exists!");
            } else {
                admin.createTable(tableDescriptor);
                log.debug("create table Success!");
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("創建表{0}失敗",tableName),e);
            return false;
        }finally {
            close(admin,null,null);
        }
        return true;
    }

    /**
     * 預分區創建表
     * @param tableName 表名
     * @param columnFamily 列族名的集合
     * @param splitKeys 預分期region
     * @return 是否創建成功
     */
    public boolean createTableBySplitKeys(String tableName, List<String> columnFamily, byte[][] splitKeys) {
        Admin admin = null;
        try {
            if (StringUtils.isBlank(tableName) || columnFamily == null || columnFamily.size() == 0)
            {
                log.error("===Parameters tableName|columnFamily should not be null,Please check!===");
                return false;
            }
            admin = connection.getAdmin();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                return true;
            } else {
                List<ColumnFamilyDescriptor> familyDescriptors = new ArrayList<>(columnFamily.size());

                for(String cf : columnFamily)
                {
                    familyDescriptors.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
                }

                TableDescriptor tableDescriptor = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)).setColumnFamilies(familyDescriptors).build();

                //指定splitkeys
                admin.createTable(tableDescriptor, splitKeys);
                log.info("===Create Table " + tableName + " Success!columnFamily:" + columnFamily.toString() + "===");
            }
        } catch (IOException e) {
            log.error("",e);
            return false;
        }finally {
            close(admin,null,null);
        }

        return true;
    }

    /**
     * 自定義獲取分區splitKeys
     */
    public static byte[][] getSplitKeys(String[] keys){
        if(keys==null){
            //默認為10個分區
            keys = new String[] {  "1|", "2|", "3|", "4|", "5|", "6|", "7|", "8|", "9|" };
        }
        byte[][] splitKeys = new byte[keys.length][];
        //升序排序
        TreeSet<byte[]> rows = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
        for(String key : keys){
            rows.add(Bytes.toBytes(key));
        }

        Iterator<byte[]> rowKeyIter = rows.iterator();
        int i=0;
        while (rowKeyIter.hasNext()) {
            byte[] tempRow = rowKeyIter.next();
            rowKeyIter.remove();
            splitKeys[i] = tempRow;
            i++;
        }
        return splitKeys;
    }

 

增加、跟新數據:

 /**
     * 為表添加 or 更新數據
     * @author gxy
     * @date 2018/7/3 17:26
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columns 列名數組
     * @param values 列值得數組
     */
    public void putData(String tableName,String rowKey, String familyName, String[] columns, String[] values) {
        // 獲取表
        Table table= null;
        try {
            table=getTable(tableName);
            putDataIntoHbase(table,rowKey,tableName,familyName,columns,values);

        } catch (Exception e) {
            log.error(MessageFormat.format("為表添加 or 更新數據失敗,tableName:{0},rowKey:{1},familyName:{2}"
                    ,tableName,rowKey,familyName),e);
        }finally {
            close(null,null,table);
        }
    }

    /**
     * 為表添加 or 更新數據  -- 多列
     * @author gxy
     * @date 2018/7/3 17:26
     * @since 1.0.0
     * @param table Table
     * @param rowKey rowKey
     * @param tableName 表名
     * @param familyName 列族名
     * @param columns 列名數組
     * @param values 列值得數組
     */
    private void putDataIntoHbase(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) {
        try {
            //設置rowkey
            Put put = new Put(Bytes.toBytes(rowKey));

            if(columns != null && values != null && columns.length == values.length){
                for(int i=0;i<columns.length;i++){
                    if(columns[i] != null && values[i] != null){
                        put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i]));
                    }else{
                        throw new NullPointerException(MessageFormat.format("列名和列數據都不能為空,column:{0},value:{1}"
                                ,columns[i],values[i]));
                    }
                }
            }

            table.put(put);
            log.debug("putData add or update data Success,rowKey:" + rowKey);
            table.close();
        } catch (Exception e) {
            log.error(MessageFormat.format("為表添加 or 更新數據失敗,tableName:{0},rowKey:{1},familyName:{2}"
                    ,tableName,rowKey,familyName),e);
        }
    }

    /**
     * 為表的某個單元格賦值  -- 單列
     * @author gxy
     * @date 2018/7/4 10:20
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param column1 列名
     * @param value1 列值
     */
    public void setColumnValue(String tableName, String rowKey, String familyName, String column1, String value1){
        Table table=null;
        try {
            // 獲取表
            table=getTable(tableName);
            // 設置rowKey
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(column1), Bytes.toBytes(value1));

            table.put(put);
            log.debug("add data Success!");
        }catch (IOException e) {
            log.error(MessageFormat.format("為表的某個單元格賦值失敗,tableName:{0},rowKey:{1},familyName:{2},column:{3}"
                    ,tableName,rowKey,familyName,column1),e);
        }finally {
            close(null,null,table);
        }
    }

刪除數據:

 /**
     * 刪除指定的單元格
     * @author gxy
     * @date 2018/7/4 11:41
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columnName 列名
     * @return boolean
     */
    public boolean deleteColumn(String tableName, String rowKey, String familyName, String columnName)
    {
        Table table=null;
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName)))  //判斷表是存在的
            {
                //獲取表
                table=getTable(tableName);
                Delete delete = new Delete(Bytes.toBytes(rowKey));
                // 設置待刪除的列
                delete.addColumns(Bytes.toBytes(familyName), Bytes.toBytes(columnName));    //精確到列名

                table.delete(delete);
                log.debug(MessageFormat.format("familyName({0}):columnName({1})is deleted!",familyName,columnName));
            }

        }catch (IOException e) {
            log.error(MessageFormat.format("刪除指定的列失敗,tableName:{0},rowKey:{1},familyName:{2},column:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
            return false;
        }finally {
            close(admin,null,table);
        }
        return true;
    }

    /**
     * 根據rowKey刪除指定的行
     * @author gxy
     * @date 2018/7/4 13:26
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @return boolean
     */
    public boolean deleteRow(String tableName, String rowKey) {
        Table table=null;
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName))){
                // 獲取表
                table=getTable(tableName);
                Delete delete = new Delete(Bytes.toBytes(rowKey)); //精確到rowKey

                table.delete(delete);
                log.debug(MessageFormat.format("row({0}) is deleted!",rowKey));
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("刪除指定的行失敗,tableName:{0},rowKey:{1}"
                    ,tableName,rowKey),e);
            return false;
        }finally {
            close(admin,null,table);
        }
        return true;
    }

    /**
     * 根據columnFamily刪除指定的列族
     * @author gxy
     * @date 2018/7/4 13:26
     * @since 1.0.0
     * @param tableName 表名
     * @param columnFamily 列族
     * @return boolean
     */
    public boolean deleteColumnFamily(String tableName, String columnFamily) {
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName)))
            {
                admin.deleteColumnFamily(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));   //精確到表明中的列族名
                log.debug(MessageFormat.format("familyName({0}) is deleted!",columnFamily));
            }
        }catch (IOException e) {

            log.error(MessageFormat.format("刪除指定的列族失敗,tableName:{0},columnFamily:{1}",tableName,columnFamily),e);
            return false;
        }finally {
            close(admin,null,null);
        }
        return true;
    }

    /**
     * 刪除表
     * @author gxy
     * @date 2018/7/3 18:02
     * @since 1.0.0
     * @param tableName 表名
     */
    public boolean deleteTable(String tableName){
        Admin admin = null;
        try {
            admin = connection.getAdmin();

            if(admin.tableExists(TableName.valueOf(tableName)))
            {
                admin.disableTable(TableName.valueOf(tableName)); //首先disable掉table
                admin.deleteTable(TableName.valueOf(tableName));
                log.debug(tableName + "is deleted!");
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("刪除指定的表失敗,tableName:{0}"
                    ,tableName),e);
            return false;
        }finally {
            close(admin,null,null);
        }
        return true;
    }

查詢數據:

   /**
     * 獲取table
     * @param tableName 表名
     * @return Table
     * @throws IOException IOException
     */
    private Table getTable(String tableName) throws IOException
    {
        return connection.getTable(TableName.valueOf(tableName));
    }

    /**
     * 查詢庫中所有表的表名
     */
    public List<String> getAllTableNames()
    {
        List<String> result = new ArrayList<>();

        Admin admin = null;
        try {
            admin = connection.getAdmin();
            TableName[] tableNames = admin.listTableNames();

            for(TableName tableName : tableNames)
            {
                result.add(tableName.getNameAsString());
            }
        }catch (IOException e) {
            log.error("獲取所有表的表名失敗",e);
        }finally {
            close(admin,null,null);
        }

        return result;
    }

    /**
     * 遍歷查詢指定表中的所有數據
     * @author gxy
     * @date 2018/7/3 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScanner(String tableName){
        Scan scan = new Scan();
        return this.queryData(tableName,scan);
    }

    /**
     * 根據startRowKey和stopRowKey遍歷查詢指定表中的所有數據
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param startRowKey 起始rowKey
     * @param stopRowKey 結束rowKey
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScanner(String tableName, String startRowKey, String stopRowKey){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(startRowKey) && StringUtils.isNoneBlank(stopRowKey)){
            scan.withStartRow(Bytes.toBytes(startRowKey));
            scan.withStopRow(Bytes.toBytes(stopRowKey));
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 通過行前綴過濾器查詢數據
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param prefix 以prefix開始的行鍵
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerPrefixFilter(String tableName, String prefix){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(prefix)){
            Filter filter = new PrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 通過列前綴過濾器查詢數據
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param prefix 以prefix開始的列名
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerColumnPrefixFilter(String tableName, String prefix){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(prefix)){
            Filter filter = new ColumnPrefixFilter(Bytes.toBytes(prefix));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 查詢行鍵中包含特定字符的數據
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param keyword 包含指定關鍵詞的行鍵
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerRowFilter(String tableName, String keyword){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(keyword)){
            Filter filter = new RowFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }

    /**
     * 查詢列名中包含特定字符的數據
     * @author gxy
     * @date 2018/7/4 18:21
     * @since 1.0.0
     * @param tableName 表名
     * @param keyword 包含指定關鍵詞的列名
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    public Map<String,Map<String,String>> getResultScannerQualifierFilter(String tableName, String keyword){
        Scan scan = new Scan();

        if(StringUtils.isNoneBlank(keyword)){
            Filter filter = new QualifierFilter(CompareOperator.GREATER_OR_EQUAL,new SubstringComparator(keyword));
            scan.setFilter(filter);
        }

        return this.queryData(tableName,scan);
    }



    /**
     * 通過表名以及過濾條件查詢數據
     * @author gxy
     * @date 2018/7/4 16:13
     * @since 1.0.0
     * @param tableName 表名
     * @param scan 過濾條件
     * @return java.util.Map<java.lang.String,java.util.Map<java.lang.String,java.lang.String>>
     */
    private Map<String,Map<String,String>> queryData(String tableName,Scan scan){
        //<rowKey,對應的行數據>
        Map<String,Map<String,String>> result = new HashMap<>();

        ResultScanner rs = null;
        // 獲取表
        Table table= null;
        try {
            table = getTable(tableName);
            rs = table.getScanner(scan);
            for (Result r : rs) {
                //每一行數據
                Map<String,String> columnMap = new HashMap<>();
                String rowKey = null;
                for (Cell cell : r.listCells()) {
                    if(rowKey == null){
                        rowKey = Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength());
                    }
                    columnMap.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }

                if(rowKey != null){
                    result.put(rowKey,columnMap);
                }
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("遍歷查詢指定表中的所有數據失敗,tableName:{0}"
                    ,tableName),e);
        }finally {
            close(null,rs,table);
        }

        return result;
    }

    /**
     * 根據tableName和rowKey精確查詢一行的數據
     * @author gxy
     * @date 2018/7/3 16:07
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey 行鍵
     * @return java.util.Map<java.lang.String,java.lang.String> 返回一行的數據
     */
    public Map<String,String> getRowData(String tableName, String rowKey){
        //返回的鍵值對
        Map<String,String> result = new HashMap<>();

        Get get = new Get(Bytes.toBytes(rowKey));
        // 獲取表
        Table table= null;
        try {
            table = getTable(tableName);
            Result hTableResult = table.get(get);   //table通過封裝rowKey的Get 獲得具體的行 拿到 Cell
            if (hTableResult != null && !hTableResult.isEmpty()) {
                for (Cell cell : hTableResult.listCells()) {
//                System.out.println("family:" + Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
//                System.out.println("qualifier:" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
//                System.out.println("value:" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
//                System.out.println("Timestamp:" + cell.getTimestamp());
//                System.out.println("-------------------------------------------");
                    result.put(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        }catch (IOException e) {
            log.error(MessageFormat.format("查詢一行的數據失敗,tableName:{0},rowKey:{1}"
                    ,tableName,rowKey),e);
        }finally {
            close(null,null,table);
        }

        return result;
    }

    /**
     * 根據tableName、rowKey、familyName、column查詢指定單元格的數據
     * @author gxy
     * @date 2018/7/4 10:58
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columnName 列名
     * @return java.lang.String
     */
    public String getColumnValue(String tableName, String rowKey, String familyName, String columnName){
        String str = null;
        Get get = new Get(Bytes.toBytes(rowKey));
        // 獲取表
        Table table= null;
        try {
            table = getTable(tableName);
            Result result = table.get(get);
            if (result != null && !result.isEmpty()) {
                Cell cell = result.getColumnLatestCell(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
                if(cell != null){
                    str = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("查詢指定單元格的數據失敗,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
        }finally {
            close(null,null,table);
        }

        return str;
    }

    /**
     * 根據tableName、rowKey、familyName、column查詢指定單元格多個版本的數據
     * @author gxy
     * @date 2018/7/4 11:16
     * @since 1.0.0
     * @param tableName 表名
     * @param rowKey rowKey
     * @param familyName 列族名
     * @param columnName 列名
     * @param versions 需要查詢的版本數
     * @return java.util.List<java.lang.String>
     */
    public List<String> getColumnValuesByVersion(String tableName, String rowKey, String familyName, String columnName,int versions) {
        //返回數據
        List<String> result = new ArrayList<>(versions);

        // 獲取表
        Table table= null;
        try {
            table = getTable(tableName);
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
            //讀取多少個版本
            get.readVersions(versions);
            Result hTableResult = table.get(get);
            if (hTableResult != null && !hTableResult.isEmpty()) {
                for (Cell cell : hTableResult.listCells()) {
                    result.add(Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }
            }
        } catch (IOException e) {
            log.error(MessageFormat.format("查詢指定單元格多個版本的數據失敗,tableName:{0},rowKey:{1},familyName:{2},columnName:{3}"
                    ,tableName,rowKey,familyName,columnName),e);
        }finally {
            close(null,null,table);
        }

        return result;
    }

關閉流:

 /**
     * 關閉流
     */
    private void close(Admin admin, ResultScanner rs, Table table)
    {
        if(admin != null){
            try {
                admin.close();   //關閉 admin
            } catch (IOException e) {
                log.error("關閉Admin失敗",e);
            }
        }

        if(rs != null){     //關閉 ResultScanner
            rs.close();
        }

        if(table != null){
            try {
                table.close();  //關閉 table
            } catch (IOException e) {
                log.error("關閉Table失敗",e);
            }
        }
    }

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2021 ITdaan.com