admin管理员组

文章数量:1661565

一、Hive基础

1. hive和其他对比

(1)Hive中的SQL与传统SQL区别

  • 数据更新:hive之前不支持数据更新(修改和删除),在0.14版本之后支持数据更新,但是默认不支持更新需要配置更改。
  • 执行:传统SQL对于不同的数据库有不同的执行引擎,HSQL底层是mapreduce。
  • 可拓展性:HSQL支持自定义函数
    • UDF:直接应用于select语句,通常程序查询的时候,需要对字段做一些格式化处理(大小写转换)。特点:一进一出,一对一,进来一个字段输出一个字段。如进来一个小写字段输出一个大写字段。
    • UDAF:多对一的场景,通常用在group by聚合阶段。
    • UDTF:一对多。
  • 数据检查:
    • 读时模式:只有hive读的时候才会检查、解析字段和schema(数据结构的表达)。优点:写得快;load data非常迅速,因为在写的过程中是不需要解析数据。
    • 写时模式:缺点:写得慢,需要建立一些索引、压缩、数据一致性、字段检查等等。优点:读的时候会得到优化。

(2)数据库和数据仓库

OLTP:联机事务处理 数据库
OLAP:联机分析处理 数据仓库

(3)hive和mapreduce

  • hive的本质是将SQL语句转化成Mapreduce任务执行。
  • hive想看底层mapreduce逻辑:使用explain

2.Hive数据模型

(1)四个数据模型

  • Hive支持四个数据模型:table、external table、partion、bucket。

(2)分区

(a)Partition
  • 优点:partition是辅助查询,缩小查询范围,加快数据的检索速度和对数据按照一定的规格和条件进行管理。
  • 优点理解:如当有一个select xxx from table where date=‘2017-11-11’ limit 100查询语句,如果表对date字段已分区的话就可以直接定位到20171111的目录 ,这样可以大大提高查询的效率。因为每一个date字段的内容当作文件夹的名字。
  • 可以对多个字段分区。
  • 分区的字段一般选择经常用于where查询条件且范围不是特别大的字段。
  • HDFS目录名:如分区字段为action = insight, day= 20131020 ,对应HDFS目录名是/lbs/mobile_user/action=insight/day=20131020。
(b)动态分区
  • 动态分区理论
    • 静态分区插入的时候知道分区类型,而且每个分区写一个load data,很繁琐。使用动态分区可解决以上问题,其可以根据查询得到的数据动态分配到分区里。其实动态分区与静态分区区别就是不指定分区目录,由系统自己选择。
    • 首先,启动动态分区功能
      set hive.exec.dynamic.partition=true;
    • 注意,动态分区不允许主分区采用动态列而副分区采用静态列,这样将导致所有的主分区都要创建副分区静态列所定义的分区。
      动态分区可以允许所有的分区列都是动态分区列,但是要设置一个参数:
      set hive.exec.dynamic.partition.mode=nostrick;
#开启动态分区,默认是false
set hive.exec.dynamici.partition=true;  

#开启允许所有分区都是动态的,否则必须要有静态分区才能使用
set hive.exec.dynamic.partition.mode=nonstrict; 

  • 创建表时指定动态分区
	drop table if exists `itcast_dw`.`fact_order_refunds`;
	create  table `itcast_dw`.`fact_order_refunds`(
	    id                bigint,
	    orderId           bigint,
	    createTime        string,
	    modifiedTime        string,
	    dw_start_date string,
	  dw_end_date string
	)
	partitioned by (dt string) --按照天分区
	STORED AS PARQUET TBLPROPERTIES('parquet.compression'='SNAPPY');
  • 插入表时指定动态分区字段,需要在最后的字段后面添加上想要设置动态分区的字段,如下是使用指定格式的createTime字段做动态分区。
insert overwrite table itcast_dw.fact_order_refunds 
select
id,
orderId,
createTime,
modifiedTime,
date_format(modifiedTime,'yyyy-MM-dd') as dw_start_date,
'9999-12-31' as dw_end_date,
--此次数据分区按照订单退款的创建时间
date_format(createTime,'yyyyMMdd')
from itcast_ods.itcast_order_refunds
(c)静态分区
  • 添加分区
    alter table ‘demo’. ‘ods_product’ add if not exists partition(dt=‘${dt}’)
  • 删除分区
    alter table ‘demo’. ‘ods_product’ drop partition(dt=‘${dt}’)
  • 对分区表插入数据
insert overwrite table 'demo' . 'dw_product' partition(dt= '2019-12-20')
select
goods_id,goods_status,createtime,modifytime
from 'demo'.'ods_product' where dt= '2019-12-20';

(d)静态分区与动态分区对比(插入方面)
insert into table table_name partition(score=1) select * from table_name2 ;
# 静态分区插入
insert into table table_name partition(score) select * from table_name2 ;
# 动态分区插入

(3)分区、分桶

  • Bucket
    • Hive会针对某一个列进行桶的组织,通常对列值做hash。
    • 例如:mysql存储的数据有限,一般记录存到上亿或十亿以上性能不太好。因为一张表太大了,达到mysql支持的上限了。要想存完整数据,就要进行分库(把一张表拆分成多个表)。例如userid作为row key:分32库,userid % 32= 余数num(桶号)。
    • 分桶后表名,如一个表student:student -> student_0, student_1, …
    • HDFS可以存海量数据,为什么要分桶?一是优化查询,原本两个大表join分库后会变成两个小表join,两个分区表做join会自动激活map端的map-side join。二是方便采样。
    • HDFS目录名:如分区字段userid分为32桶,对应HDFS目录名是 /lbs/mobile_user/action=insight/day=20131020/part-00000,类似reduce输出的路径。
    • 创建分桶举例:hive中table可以拆分成partition,table和partition可以通过‘CLUSTERED BY’进一步分bucket,bucket中的数据可以通过‘SORT BY’排序。如 create table bucket_user (id int,name string)clustered by (id) into 4 buckets;
    • 分桶相关配置:‘set hive.enforce.bucketing = true’ 可以自动控制上一轮reduce的数量从而适配bucket的个数,当然,用户也可以自主设置mapred.reduce.tasks去适配bucket个数。
    • 查看sampling数据:
      • hive> select * from student tablesample(bucket 1 out of 2 on id);
      • tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
      • 参数x和y:y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。
  • 两个优化:
    • Table -> Partition -> Bucket
    • Table -> Bucket

(4)内部表table和外部表external table:

  • 最大差别:在删除表的时候,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的!如果外部表不小心删除是可以恢复过来的,建议创建表尽量创建外部表,能够避免误操作删除导致数据丢失的风险。
    • 特点:在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的!而内部表则不一样;
    • 内部表数据是hive自身管理,外部表数据由HDFS管理。
  • 外部表和内部表存储位置都可以由自己指定,可以指定除/user/hive/warehouse以外的路径;默认default数据库,默认hive在hdfs中存在的位置/user/hive/warehouse.
  • 外部表修改数据结构需要msck repair table itcast_ods.itcast_orders;命令。
    • 例如外部表使用kettle直接把数据导入hdfs对应路径,kettle再使用SQL组件(msck repair),即可加载到hive表;内部表没试过不知道。
  • 外部表声明时加external 。

(5)查看hive表的存储路径

show create table tablename;

3.Hive数据类型

(1)数据类型

  • 原生类型
    • TINYINT
    • SMALLINT
    • INT
    • BIGINT
    • BOOLEAN
    • FLOAT
    • DOUBLE
    • STRING
    • BINARY(Hive 0.8.0以上才可用)
    • TIMESTAMP(Hive 0.8.0以上才可用)
  • 复合类型
    • Arrays:ARRAY<data_type>
    • Maps:MAP<primitive_type, data_type>
    • Structs:STRUCT<col_name: data_type[COMMENT col_comment],……>
    • Union:UNIONTYPE<data_type, data_type,……>

(2)复合类型有什么用?

  • 例如数据

    • name score
    • 张三 “数学”:80, “语文”:90,“英语”:90
    • 李四 “数学”:60, “语文”:80,“英语”:90
    select score from table
    {
         "数学":80, "语文":90,"英语":90}
    {
         "数学":60, "语文":80,"英语":90}
    #map数据类型
    
    select name,score['数学'] from table
    张三 80
    李四 60
    

4.hive不存储数据

  • 数据存储在HDFS上,_hive本身部存储数据。
    • 数据是存储在hdfs上,hive本身不存储数据,构建表的逻辑(元数据)存在指定数据库(mysql )。

二、Hive优化

1.调整并发度,Map和Reduce的优化。

(1)Map的优化

  • 全局配置:可能影响其他任务。通过hdfs-site.xml配置文件可以设置set dfs.block.size(=128),从而影响map个数。

    • 作业会通过input的目录产生一个或者多个map任务。
    • 调整block size可以影响map个数:block大小直接影响map切分的split份数,一个split对接一个map,切分地细一点split个数会增加从而直接影响到map个数增加。
    • 这个修改是全局的block size修改,可能其他任务会受到block size不合适的影响。所以为了不影响其他任务,可以使用提交任务的脚本通过配置去设置。
  • Map越多越好吗?是不是保证每个map处理接近文件块的大小?

  • 局部配置:使用提交任务的脚本通过配置去设置

    • 如何合并小文件,减少map数?
    set mapred.max.split.size=100000000;
    set mapred.min.split.size.per.node=100000000;
    set mapred.min.split.size.per.rack=100000000;
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    
    • 如何适当的增加map数?
    set mapred.map.tasks=10;
    #直接设置map个数,但是不一定生效,只是参考。
    
    • 数据倾斜问题解决方案:负载均衡
      • Map端聚合 hive.map.aggr=true 。 Mr中的Combiners.

(2)Reduce的优化

  • Reduce的优化

    • 提交SQL的时候可以设置hive.exec.reducers.bytes.per.reducer:reduce任务处理的数据量。
      • 它会直接影响到reduce个数,因为公式reduce个数=InputFileSize / bytes per reducer=所有节点总大小 / 每一个节点处理大小。
      • 但它不能确定reduce个数而是会影响reduce个数。
    • 确定调整reduce的个数:
      • 设置reduce处理的数据量
      • set mapred.reduce.tasks=10
    • 如果这两个配置hive.exec.reducers.bytes.per.reducer和mapred.reduce.tasks都设置,以下面mapred.reduce.tasks配置为准。
  • 一个Reduce,并发小,需要优化。distinct和orderby都可能导致一个reduce。

    • 没有group by
    # good case
    # groupby做聚合,增加并发。
    select pt,count(1)
    from popt_tbaccountcopy_mes
    where pt = '2012-07-04' group by pt;
    
    # bad case
    # 没有group by,一个reduce来运行,没有做优化。
    select count(1)
    from popt_tbaccountcopy_mes
    where pt = '2012-07-04';
    
    • order by(可以使用distribute by和sort by)
    • 笛卡尔积
    # bad case
    # 做join笛卡尔积不带on的话,也是一个reduce来运行
    select xxx from A_table A
    join B_table B 
    where A.userid=B.userid
    
    # good  case
    select xxx from A_table A
    join B_table B 
    on A.userid=B.userid
    
    • distinct:如果数据量很大,慎用distinct,尽量用group by
      当数据量很大时,下图第二种方法groupby比第一种distinct快。

2.partition、笛卡尔积、Map join、Union all

  • 分区裁剪(partition)

    • Where中的分区条件,会提前生效,不必特意做子查询,直接Join和GroupBy
  • 笛卡尔积

    • join的时候不加on条件或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积(如同上面reduce的优化中笛卡儿积的例子)
  • Map join

    • /*+ MAPJOIN(tablelist) */,必须是小表,不要超过1G,或者50万条记录。通过这个参数来标识哪个表是小表。
    • Map Join类似一种检索配置文件的形式。如A.join(B),如果A是小表、B是大表,考虑是否可以将A放入内存,对B进行遍历就可以了。
    SELECT /*+ MAPJOIN(b) */ a.key, a.value
    FROM a
    JOIN b ON a.key = b.key;
    # 指明b是小表,做join时把一个表当作小表提前放入内存是一个优化。
    # MAPJION会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多.
    
  • Union all

    • 先做union all再做join或group by等操作可以有效减少MR过程,尽管是多个Select,最终只有一个mr。
    # 数据
    T1:
    A
    B
    C
    
    T2:
    B
    C
    D
    
    # Union操作:合并去重
    A
    B
    C
    D
    
    Union all操作:不去重。操作简单,没有太多性能消耗,性能明显高于union。
    # 所以当确认不存在有重复记录或者允许结果有重复记录时,最好用union all。
    A
    B
    C
    B
    C
    D
    

3.来自同表加载一次、merge、Multi-Count Distinct

  • Multi-insert & multi-group by:来自同表加载一次

    • 从一份基础表中按照不同的维度,一次组合出不同的数据
    # 如果两个不同任务的数据源来自一个表的话,想从公共的表取出数据写到另外一个表里面去,语句如下
    # from一个总表,生成两个子表。
    FROM from_statement
    INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1)] select_statement1 group by key1
    INSERT OVERWRITE TABLE tablename2 [PARTITION(partcol2=val2 )] select_statement2 group by key2
    
    • eg:业务需求: 创建一张表user_stat, 把用户年龄的最大值和最小值存在这张表中。
      这种方法可能是大家常用的一种方法,从同一张表种取数据,然后多次插入到同一张表的不同分区 多次查询 多次插入

      解决了多次插入的问题,但是还是向同一张表种做多次查询

      如果我们的数据来自同一张表,我们可以加载一次就ok。如果业务中多次读取某张表的时候,可以先from把这张表加载进来,也可以用with as
  • Automatic merge

    • 执行hive任务时会生成一些临时的中间文件,小的文件会比较多,可以通过配置一些参数让多个小文件合并。
    • 当文件大小比阈值小时,hive会启动一个mr进行合并
    • Map阶段合并:hive.merge.mapfiles = true 是否和并 Map 输出文件,默认为 True。这个是提交任务直接去配置的。
    • Reduce阶段合并:hive.merge.mapredfiles = false 是否合并 Reduce 输出文件,默认为 False
    • 每一个任务预期合并的文件size大小:hive.merge.size.per.task = 25610001000 合并文件的大小
  • Multi-Count Distinct

    • 想开启这个优化必须设置参数:set hive.groupby.skewindata=true;可以帮助解决数据倾斜问题。
    select dt, count(distinct uniq_id), count(distinct ip)
    from ods_log where dt=20170301 group by dt
    

    设置这个参数set hive.groupby.skewindata=true有什么用?
    1、如果一个SQL有group by,它会对每一个key做聚合,然后会生成一个mapreduce:SQL(group by key) -> 1个MR Job。

    2、map阶段: 相同的ke

本文标签: 理论Hive