0%

使用phoenix和hbase搭建数仓基础知识

简介

在之前的项目中采用HDFS和Hive来搭建数据仓库。但是使用HDFS和hive来搭建数据仓库无法做到数据的实时查询,因为hive的本质是mapreduce运算,map和reduce的过程都存储在磁盘,存储在磁盘避免不了磁盘的i/o,随机i/o又是磁盘的主要瓶颈(固态硬盘可从无视),所以这边对使用hbase搭建数仓库,hbase也是基于hdfs上建设的nosql数据库,之所以它能够做到数据的实时查询是因为它以列去存储数据,并且在数据写入到磁盘时已经将数据排序完成,尽量避免了磁盘的随机i/o而磁盘的顺序读取速度又非常的快,几乎可达到内存的速度,其实我目前除了auto sharding以外,个人比较喜欢hbase可以保留不同版本数据的这个功能,在看过我之前的关于数据建立维度里面就提到,数仓的好坏的难点就是在维度的建设方面,而维度最麻烦的就是处理维度的变化,如果相同的维度数据可以保留多个历史版本,这样就完美解决维度变化这个问题,可是目前我在phoenix并没有看到可以查找到历史数据这个功能,但是在网上查询到有人通过对phoenix修改后达到这个目的。相比关系型数据库其能存储海,这是依托由其auto sharding功能,这使nosql在数据存储扩展方面具有得天独厚的优势,在关系型数据库中对数据进行扩展往往意味着对数据分片或分库的重建,如甚于关系型数据库开发的greenplum就必须在搭建时提前考虑好数据的增长量,如果后序增加节点是非常复杂的事,如果你使用的是诸如mongo或hbase来搭建数据仓库的话就完全不要考虑这方面问题,但Hbase也有自己的缺点其时实查询的功能高度依赖rowkey,并且无法创建索引对列中的具体值进行查找(虽然可从使用api提供的过滤器去查找数据),所以我们在设计hbase的rowkey时需要尽可能将查询信息都包含在rowkey中。为了解决这个问题,这里尝试使用Phoenix来当作数仓工具来构建数仓,它将传统的Sql语句转换成hbase的api进行查询,并且支持二级索引和联表统计,和自定义函数等功能,hbase相比hive的另一个优势就是可以完美解决小文件的问题,我们都知道hdfs将数据的元数据都存储在namenode的内存中,因此namenode的内存大小限制了文件的数量,因为即使文件再小都要占用固定的元数据大小的空间,所有hdfs的长处是存储大文件,如果我们的数据是许多小文件构成,那么namenode的内存使用效率就地,我们需要定期自己将小文件合并成块,但是hbase就不需要关系这一点,他可以自动的对数据文件进行合并和拆分,然后再存储到hdfs中

创建

我使用的是cdh来搭建hadoop集群,在官网上下载对应cdh hbase的phoenix版本,phoenix是构建在hbase的数仓工具,之所以我这样说是因为真正存储数据分析的地方是hbase,而hbase又依赖hdfs,就好比hive也数仓而是数仓工具一样,hive大多都依赖hdfs存储数据,hive只提供分析等功能,将解压的包中的service包复制到hbase的各台region server的lib下,重启hbase。以下是官网安装简介

  • download and expand our installation tar
  • copy the phoenix server jar that is compatible with your HBase installation into the lib directory of every region server
  • restart the region servers
  • add the phoenix client jar to the classpath of your HBase client

表操作

hive 分为内部表和外部表,内部表是托管hdfs中的数据,你执行命令删除了hive的数据,hdfs的数据也会删除,而外部表则不将数据拖由hive管理,你删除了hive中的数据,hdfs的数据还将保留。相应的phoenix和sql一样也分为表和视图,表就是将数据拖由phoenix报错,视图对应hive的外部表,但是创建完表后新增的数据只能在hive中进行,如果建立了phoenix和hbase关联后,调用hbase的api新增数据,则新增的数据不会在phoenix中查询的到,到底使用表还是视图?我的理解是你需不需要将数据托管给hive来保管,如果你想那么就使用表
下面我们将phoenix关联hbase,使用我github中的爬虫爬取淘宝数据,会生成一张hbase taobaoList的表,我创建了一个data列簇,有以下字段

data.sellNum data.title data.url data.shopName data.price
销售量 商品标题 商品连接 店铺名 商品价格

现在我们运行phoenix客户端 ./sqlline.py ip:2181
其中ip是hbase所在zookeeper集群的ip地址,端口为对应的端口,默认是2181
注意新版本中要在后面加上列的编码不然会导致phoenix列编码和hbase中不一致导致数据查询不出来
phoenix中的列一定要和hbase中列大小对应上,phoenix默认全部都是大写,所以我们使用双引号将小写的列引用起来,除了rowkey 对应的主键名可以自己指定外

1
2

create view "taobaoList"("id" varchar primary key,"data"."sellNum" varchar,"data"."title" varchar,"data"."url" varchar,"data"."shopName" varchar,"data"."price" varchar)column_encoded_bytes=0;

执行 以下命令就可以看见表

1
! tables

查询数据,注意查询数据的条件(如:where id=’1’)中的条件需要用单引号包裹,查询出的字段(如select “id”)用双引号包裹,如果查询的字段是小写的话

1
select * from "taobaoList" limit 1;

grop by

1
select "shopName" ,count(*) from "taobaoList" group by "shopName";

join 加分页

1
select * from "taobaoList" as t1 inner join "taobaoDetailList" t2 on t1."id"=t2."productRowKey" limit 1 offset 20;

二级索引
二级索引也是将二级索引存入在hbase表中,由于hbase使用的是裂存储,索引和sql一样尽量使用覆盖索引,避免回表
首先查看未创建二级索引前的执行计划

1
2
3
4
5
6
7
8
9
explain select * from "taobaoDetailList" where "productRowKey"='100001594042362234';


+----------------------------------------------------------------------------+-----------------+----------------+--------------+
| PLAN | EST_BYTES_READ | EST_ROWS_READ | EST_INFO_TS |
+----------------------------------------------------------------------------+-----------------+----------------+--------------+
| CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER taobaoDetailList | null | null | null |
| SERVER FILTER BY data."productRowKey" = '100001594042362234' | null | null | null |
+----------------------------------------------------------------------------+-----------------+----------------+--------------+

执行计划中现实的是全表扫描,现在创建执行计划,创建执行计划前需要添加配置文件,由于我使用的是4.14.0 phoenix版本根据官网,在所有RegionServer 中添加如下配置即可重启即可CDH只需在hbase配置中搜索hbase-site,然后将值填入到RegionServer的配置项中即可,官网链接https://phoenix.apache.org/secondary_indexing.html

1
2
3
4
<property>
<name>hbase.regionserver.wal.codec</name>
<value>org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec</value>
</property>

下面创建二级覆盖索引include包含你要覆盖的列,这样查询索引时就不要回表,如果不创建覆盖索引就不要添加include后面的代码

1
create index rowkey_index on "taobaoDetailList"("data"."productRowKey") include("id");

现在我们再执行上面的执行计划,发现已经走索引了

1
2
3
4
5
6
+------------------------------------------------------------------------------------------------+-----------------+----------------+--------------+
| PLAN | EST_BYTES_READ | EST_ROWS_READ | EST_INFO_TS |
+------------------------------------------------------------------------------------------------+-----------------+----------------+--------------+
| CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN RANGE SCAN OVER ROWKEY_INDEX ['100001594042362234'] | null | null | null |
| SERVER FILTER BY FIRST KEY ONLY | null | null | null |
+------------------------------------------------------------------------------------------------+-----------------+----------------+--------------+

udf自定义函数

和hive一样phoenix一样支持自定义函数,下面这个自定义函数就是将taobaoList中的sellNum字段转换成数字,从淘宝上爬取下来的交易笔数都是如 1万笔 800笔这样的,我们需要将其转换成数字
首先添加phoenix下bin目录下的hbase-site.xml配置文件中的配置信息 hbase.rootdir改成真实的hbase在hdfs的路径即可,hbase.dynamic.jars.dir为你后续编写的udf函数存放的路径,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<property>
<name>phoenix.functions.allowUserDefinedFunctions</name>
<value>true</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>${hbase.tmp.dir}/hbase</value>
<description>The directory shared by region servers and into
which HBase persists. The URL should be 'fully-qualified'
to include the filesystem scheme. For example, to specify the
HDFS directory '/hbase' where the HDFS instance's namenode is
running at namenode.example.org on port 9000, set this value to:
hdfs://namenode.example.org:9000/hbase. By default, we write
to whatever ${hbase.tmp.dir} is set too -- usually /tmp --
so change this configuration or else all data will be lost on
machine restart.</description>
</property>
<property>
<name>hbase.dynamic.jars.dir</name>
<value>${hbase.rootdir}/lib</value>
<description>
The directory from which the custom udf jars can be loaded
dynamically by the phoenix client/region server without the need to restart. However,
an already loaded udf class would not be un-loaded. See
HBASE-1936 for more details.
</description>
</property>

鉴于我都是存放在hdfs下所以我的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<property>
<name>phoenix.functions.allowUserDefinedFunctions</name>
<value>true</value>
</property>
<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://hztnode1:8020/hbase</value>
</property>
<property>
<name>hbase.dynamic.jars.dir</name>
<value>hdfs://hztnode1:8020/hbase/lib</value>
</property>

创建udf代码实现udf类,具体代码说明如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88

package com.liu;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.function.ScalarFunction;
import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PVarchar;

import java.io.UnsupportedEncodingException;
import java.util.List;


/**
* 功能: TODO(用一句话描述类的功能)
*
* ──────────────────────────────────────────
* version 变更日期 修改人 修改说明
* ------------------------------------------
* V1.0.0 2020/7/9 Liush 初版
* ──────────────────────────────────────────
*/
//指定函数名和参数
@FunctionParseNode.BuiltInFunction(
name ="F_SELLNUM" ,
args = {
@FunctionParseNode.Argument(allowedTypes = {PVarchar.class}),
}
)
public class SellNumUDF extends ScalarFunction {
@Override
public String getName() {
return "F_SELLNUM";
}


public SellNumUDF(List<Expression> children) {
super(children);
}



@Override
public boolean evaluate(Tuple tuple, ImmutableBytesWritable immutableBytesWritable) {
//获取到参数
Expression arg1 =this.getChildren().get(0);
//从表达式中或获取参数
if(arg1.evaluate(tuple,immutableBytesWritable)){
if(immutableBytesWritable.getLength()>0){
//得到参数字节码
byte[] valueByte=immutableBytesWritable.copyBytes();
try {
String sellNum=new String(valueByte,"UTF-8");
if(sellNum.contains("万笔")){
String num=sellNum.substring(0,sellNum.indexOf("万笔"));
int numInt=(int)(Float.valueOf(num)*10000);
//将处理后的数据返回,如果执行的是类似sum运算的话返回的必须是数字类型
immutableBytesWritable.set(PInteger.INSTANCE.toBytes(numInt));
return true;
}
if(sellNum.contains("笔")){
String num=sellNum.substring(0,sellNum.indexOf("笔"));
int numInt=Integer.valueOf(num);
immutableBytesWritable.set(PInteger.INSTANCE.toBytes(numInt));
return true;
}



} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}

}

}
return false;
}

@Override
public PDataType getDataType() {
return PInteger.INSTANCE;
}
}

打包我采用maven打包,使用maven打包插件,此打包插件可指定工程的配置文件路径等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>make-assembly</id>
<!-- 绑定到package生命周期 -->
<phase>package</phase>
<goals>
<!-- 只运行一次 -->
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<!-- 配置描述符文件 -->
<!-- <descriptor>src/main/assembly/assembly.xml</descriptor>-->
<!-- 也可以使用Maven预配置的描述符-->
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>

最后将打完打jar包放入,之前配置文件中配置的hdfs的路径下,现在我们在shell中执行sql

1
select f_sellnum("sellNum"),"sellNum" from "taobaoList" limit 1;

我们查询到

1
2
3
4
5
+----------------------------+----------+
| F_SELLNUM(data."sellNum") | sellNum |
+----------------------------+----------+
| 39000 | 3.9万笔 |
+----------------------------+----------+