目录
0 问题背景
1 算法分析
2 问题描述
3 数据准备
4 问题分析
5 小 结
0 问题背景
在设备健康监测管理时,我们往往需要对设备运行时的电路参数进行监控,以反映设备的运行状况,通过对设备的运行参数长期统计观察建立设备的状态评估模型 。其中一个典型的场景就是我们需要对设备参数指标,进行趋势分析,比如判断某个指标在某一段时间内呈现递增或是递减的趋势,换句话说设备在一时期某一个参数朝着恶化的趋势发展,我们需要对这种趋势进行预警。解决这种长周期的趋势预警问题,我们往往需要构建设备数仓,在批处理中进行分析。本文以转辙机动作电流为例进行分析,针对转辙机动作电流文中给出了一种设备趋势预警的方法,对一定窗口内,同一设备的同一阶段的动作电流平均值指标变化趋势进行趋势分析,当1个时间窗口内,设备在某一阶段的动作电流水平保持整体性增长,大数据系统进行报警,文中通过批处理的形式对该方法进行了实现。
1 算法分析
趋势预警判断规则如下:
对于n个状态平均点形成动作电流平均值窗口集合:P{ Pt1, Pt2, Pt3…… Ptn} | ||||
求得 | ||||
{d1,d2, ……..dn} | ||||
其中di = (Pt(i+1) - Pt(i)) / Pt(i) | ||||
当 | ||||
1)Σdi /(n-1) ≥ 0.1 (平均变化率,可配置); di ≠dn(最末1日,不计算增长率),反应平均变化速度 | ||||
2)|max(Pi)- Pbase|/Pbase(基线变化率) ≥ 0.5 (可配置),或|min(Pi)- Pbase|/Pbase,反应峰值偏离基线的程度. | ||||
3)|Pn-P1|/P1(最后一天值与第一天值的差值的绝对值/第一天值)反应相对于初始值增长或降低情况,称为首尾变化率 ≥ 0.5(可配置)。反应偏离初始值的程度,或者称为定基发展速度。 | ||||
4)时间窗口内,最多有容忍30%个点没有增长。反向变化率或正向变化率。反向变化率:未增长的点个数/窗口点个数 -1.正向变化率大于0.7或反向变化率小于0.3时预警。反向变化率=1-正向变化率 | ||||
5)1)and 2) and 3)and4) 同时成立时,预警条件符合。 | ||||
注意:这里的Pbase基线暂取为窗口内数据的中位值 |
2 问题描述
现已知动作电流按天分区的天表,其表结构如下(为了便于分析本文以7天窗口为例):
gw_id:表示网关id,sensor_id:表示设备id,switch_dir:转换方向 0表示定到反,1表示反到定。
avg_act_ic:表示计算的指标,动作电流按天的平均值。compute_day:计算日期
gw_id | sensor_id | switch_dir | avg_act_ic | compute_day |
1001 | 123456 | 0 | 5.4 | 2017-07-01 |
1001 | 123456 | 0 | 3.5 | 2017-07-02 |
1001 | 123456 | 0 | 5.8 | 2017-07-03 |
1001 | 123456 | 0 | 8.1 | 2017-07-04 |
1001 | 123456 | 0 | 5.2 | 2017-07-05 |
1001 | 123456 | 0 | 8.5 | 2017-07-06 |
1001 | 123456 | 0 | 13.5 | 2017-07-07 |
1001 | 123456 | 1 | 3.1 | 2017-07-01 |
1001 | 123456 | 1 | 2.8 | 2017-07-02 |
1001 | 123456 | 1 | 1.1 | 2017-07-03 |
1001 | 123456 | 1 | 5.5 | 2017-07-04 |
1001 | 123456 | 1 | 5.3 | 2017-07-05 |
1001 | 123456 | 1 | 5.6 | 2017-07-06 |
1001 | 123456 | 1 | 2.8 | 2017-07-07 |
1001 | 234567 | 1 | 8.1 | 2017-07-01 |
1001 | 234567 | 1 | 7.2 | 2017-07-02 |
1001 | 234567 | 1 | 6.2 | 2017-07-03 |
1001 | 234567 | 1 | 5.1 | 2017-07-04 |
1001 | 234567 | 1 | 9.3 | 2017-07-05 |
1001 | 234567 | 1 | 4.8 | 2017-07-06 |
1001 | 234567 | 1 | 2.3 | 2017-07-07 |
根据以上趋势预警规则,通过SQL形式分析出结果。预警结果保持增长的趋势定义warnning_code为act_ic_growths,递减的趋势定义为act_ic_decline,最终的结果只需要在原有表的基础上给出warning_code即可。
3 数据准备
(1) 数据准备
vim act_ic.txt
1001 123456 0 5.4 2017-07-01
1001 123456 0 3.5 2017-07-02
1001 123456 0 5.8 2017-07-03
1001 123456 0 8.1 2017-07-04
1001 123456 0 5.2 2017-07-05
1001 123456 0 8.5 2017-07-06
1001 123456 0 13.5 2017-07-07
1001 123456 1 3.1 2017-07-01
1001 123456 1 2.8 2017-07-02
1001 123456 1 1.1 2017-07-03
1001 123456 1 5.5 2017-07-04
1001 123456 1 5.3 2017-07-05
1001 123456 1 5.6 2017-07-06
1001 123456 1 2.8 2017-07-07
1001 234567 1 8.1 2017-07-01
1001 234567 1 7.2 2017-07-02
1001 234567 1 6.2 2017-07-03
1001 234567 1 5.1 2017-07-04
1001 234567 1 9.3 2017-07-05
1001 234567 1 4.8 2017-07-06
1001 234567 1 2.3 2017-07-07
(2) 建表.在hive数仓中建表
drop table if exists act_ic;
create table if not exists act_ic(
gw_id string,
sensor_id string,
switch_dir int,
avg_act_ic float,
compute_day string
)
row format delimited fields terminated by '\t'
(3)加载数据
load data local inpath "/home/centos/dan_test/act_ic.txt" into table act_ic;
(4)查询数据
hive> select * from act_ic;
OK
1001 123456 0 5.4 2017-07-01
1001 123456 0 3.5 2017-07-02
1001 123456 0 5.8 2017-07-03
1001 123456 0 8.1 2017-07-04
1001 123456 0 5.2 2017-07-05
1001 123456 0 8.5 2017-07-06
1001 123456 0 13.5 2017-07-07
1001 123456 1 3.1 2017-07-01
1001 123456 1 2.8 2017-07-02
1001 123456 1 1.1 2017-07-03
1001 123456 1 5.5 2017-07-04
1001 123456 1 5.3 2017-07-05
1001 123456 1 5.6 2017-07-06
1001 123456 1 2.8 2017-07-07
1001 234567 1 8.1 2017-07-01
1001 234567 1 7.2 2017-07-02
1001 234567 1 6.2 2017-07-03
1001 234567 1 5.1 2017-07-04
1001 234567 1 9.3 2017-07-05
1001 234567 1 4.8 2017-07-06
1001 234567 1 2.3 2017-07-07
Time taken: 0.592 seconds, Fetched: 21 row(s)
4 问题分析
注:问题求解钱需要先获取按天统计按天分区的动作电流均值指标表,这里的统计在本文中不再具体阐述。
(1) 先做底表。先利用lag()函数求出相邻上一条记录的avg_act_ic指标值作为辅助列,求出max(avg_act_ic),avg_act_ic的中位值,窗口内按时间排序后的第一个值和最后一个值。
SQL代码如下:为了保持结果按天排序我们按照row_number()进行排序
select *
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir)
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
from act_ic
) t
结果如下:
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 5.4 0 13.5 5.8 13.5 5.4 1
1001 123456 0 3.5 2017-07-02 5.4 -1.9 13.5 5.8 13.5 5.4 2
1001 123456 0 5.8 2017-07-03 3.5 2.3 13.5 5.8 13.5 5.4 3
1001 123456 0 8.1 2017-07-04 5.8 2.3 13.5 5.8 13.5 5.4 4
1001 123456 0 5.2 2017-07-05 8.1 -2.9 13.5 5.8 13.5 5.4 5
1001 123456 0 8.5 2017-07-06 5.2 3.3 13.5 5.8 13.5 5.4 6
1001 123456 0 13.5 2017-07-07 8.5 5 13.5 5.8 13.5 5.4 7
1001 123456 1 3.1 2017-07-01 3.1 0 5.6 3.1 2.8 3.1 1
1001 123456 1 2.8 2017-07-02 3.1 -0.3 5.6 3.1 2.8 3.1 2
1001 123456 1 1.1 2017-07-03 2.8 -1.7 5.6 3.1 2.8 3.1 3
1001 123456 1 5.5 2017-07-04 1.1 4.4 5.6 3.1 2.8 3.1 4
1001 123456 1 5.3 2017-07-05 5.5 -0.2 5.6 3.1 2.8 3.1 5
1001 123456 1 5.6 2017-07-06 5.3 0.3 5.6 3.1 2.8 3.1 6
1001 123456 1 2.8 2017-07-07 5.6 -2.8 5.6 3.1 2.8 3.1 7
1001 234567 1 8.1 2017-07-01 8.1 0 9.3 6.2 2.3 8.1 1
1001 234567 1 7.2 2017-07-02 8.1 -0.9 9.3 6.2 2.3 8.1 2
1001 234567 1 6.2 2017-07-03 7.2 -1 9.3 6.2 2.3 8.1 3
1001 234567 1 5.1 2017-07-04 6.2 -1.1 9.3 6.2 2.3 8.1 4
1001 234567 1 9.3 2017-07-05 5.1 4.2 9.3 6.2 2.3 8.1 5
1001 234567 1 4.8 2017-07-06 9.3 -4.5 9.3 6.2 2.3 8.1 6
1001 234567 1 2.3 2017-07-07 4.8 -2.5 9.3 6.2 2.3 8.1 7
Time taken: 21.78 seconds, Fetched: 21 row(s)
此处也可以统计处差值大于0的个数,以及正向变化率,为了优化逻辑,我们将部分指标计算进行下沉处理,SQL代码如下:
select *
,sum(case when avg_act_ic_diff <0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) -1 as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 5.4 0 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 1
1001 123456 0 3.5 2017-07-02 5.4 -1.9 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 2
1001 123456 0 5.8 2017-07-03 3.5 2.3 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 3
1001 123456 0 8.1 2017-07-04 5.8 2.3 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 4
1001 123456 0 5.2 2017-07-05 8.1 -2.9 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 5
1001 123456 0 8.5 2017-07-06 5.2 3.3 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 6
1001 123456 0 13.5 2017-07-07 8.5 5 13.5 3.5 5.8 13.5 5.4 7 0.49773696506423515 7
1001 123456 1 3.1 2017-07-01 3.1 0 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 1
1001 123456 1 2.8 2017-07-02 3.1 -0.3 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 2
1001 123456 1 1.1 2017-07-03 2.8 -1.7 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 3
1001 123456 1 5.5 2017-07-04 1.1 4.4 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 4
1001 123456 1 5.3 2017-07-05 5.5 -0.2 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 5
1001 123456 1 5.6 2017-07-06 5.3 0.3 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 6
1001 123456 1 2.8 2017-07-07 5.6 -2.8 5.6 1.1 3.1 2.8 3.1 7 0.8828140656227909 7
1001 234567 1 8.1 2017-07-01 8.1 0 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 1
1001 234567 1 7.2 2017-07-02 8.1 -0.9 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 2
1001 234567 1 6.2 2017-07-03 7.2 -1 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 3
1001 234567 1 5.1 2017-07-04 6.2 -1.1 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 4
1001 234567 1 9.3 2017-07-05 5.1 4.2 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 5
1001 234567 1 4.8 2017-07-06 9.3 -4.5 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 6
1001 234567 1 2.3 2017-07-07 4.8 -2.5 9.3 2.3 6.2 2.3 8.1 7 0.3759421760605059 7
Time taken: 16.968 seconds, Fetched: 21 row(s)
(2)根据步骤1计算的宽表依据具体的算法求各项指标。指标包括:平均变化率、基线变化率、正向变化率、反向变化率、首尾变化率
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
,cast(anti_var_cnt / cnt as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
注意点:需要考虑边界情况。计算结果如下:
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 0 3.5 2017-07-02 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 0 5.8 2017-07-03 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 0 8.1 2017-07-04 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 0 5.2 2017-07-05 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 0 8.5 2017-07-06 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 0 13.5 2017-07-07 0.5 1.3 -0.4 1.5 0.3 0.7
1001 123456 1 3.1 2017-07-01 0.9 0.8 -0.6 0.1 0.6 0.4
1001 123456 1 2.8 2017-07-02 0.9 0.8 -0.6 0.1 0.6 0.4
1001 123456 1 1.1 2017-07-03 0.9 0.8 -0.6 0.1 0.6 0.4
1001 123456 1 5.5 2017-07-04 0.9 0.8 -0.6 0.1 0.6 0.4
1001 123456 1 5.3 2017-07-05 0.9 0.8 -0.6 0.1 0.6 0.4
1001 123456 1 5.6 2017-07-06 0.9 0.8 -0.6 0.1 0.6 0.4
1001 123456 1 2.8 2017-07-07 0.9 0.8 -0.6 0.1 0.6 0.4
1001 234567 1 8.1 2017-07-01 0.4 0.5 -0.6 0.7 0.7 0.3
1001 234567 1 7.2 2017-07-02 0.4 0.5 -0.6 0.7 0.7 0.3
1001 234567 1 6.2 2017-07-03 0.4 0.5 -0.6 0.7 0.7 0.3
1001 234567 1 5.1 2017-07-04 0.4 0.5 -0.6 0.7 0.7 0.3
1001 234567 1 9.3 2017-07-05 0.4 0.5 -0.6 0.7 0.7 0.3
1001 234567 1 4.8 2017-07-06 0.4 0.5 -0.6 0.7 0.7 0.3
1001 234567 1 2.3 2017-07-07 0.4 0.5 -0.6 0.7 0.7 0.3
Time taken: 14.807 seconds, Fetched: 21 row(s)
(3) 通过第二步计算的指标,进行阈值比较判断得出趋势预警code
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,case when posvarrate>=0.7 and avgvarrate>0 and end1stvarrate>0
then if(abs(avgvarrate)>=0.1 and abs(pos_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_growths",null)
else case when antivarrate>=0.7 and anti_basedvarrate<0
then if(abs(avgvarrate)>=0.1 and abs(anti_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_decline",null)
else null end
end as warning_code
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
,cast(anti_var_cnt / cnt as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
) n
计算结果如下:
VERTICES: 06/06 [==========================>>] 100% ELAPSED TIME: 15.98 s
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 act_ic_growths
1001 123456 0 3.5 2017-07-02 act_ic_growths
1001 123456 0 5.8 2017-07-03 act_ic_growths
1001 123456 0 8.1 2017-07-04 act_ic_growths
1001 123456 0 5.2 2017-07-05 act_ic_growths
1001 123456 0 8.5 2017-07-06 act_ic_growths
1001 123456 0 13.5 2017-07-07 act_ic_growths
1001 123456 1 3.1 2017-07-01 NULL
1001 123456 1 2.8 2017-07-02 NULL
1001 123456 1 1.1 2017-07-03 NULL
1001 123456 1 5.5 2017-07-04 NULL
1001 123456 1 5.3 2017-07-05 NULL
1001 123456 1 5.6 2017-07-06 NULL
1001 123456 1 2.8 2017-07-07 NULL
1001 234567 1 8.1 2017-07-01 act_ic_decline
1001 234567 1 7.2 2017-07-02 act_ic_decline
1001 234567 1 6.2 2017-07-03 act_ic_decline
1001 234567 1 5.1 2017-07-04 act_ic_decline
1001 234567 1 9.3 2017-07-05 act_ic_decline
1001 234567 1 4.8 2017-07-06 act_ic_decline
1001 234567 1 2.3 2017-07-07 act_ic_decline
Time taken: 18.042 seconds, Fetched: 21 row(s)
(4)过滤掉warning_code为NULL的值 求出最终结果
select *
from (
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,case when posvarrate>=0.7 and avgvarrate>0 and end1stvarrate>0
then if(abs(avgvarrate)>=0.1 and abs(pos_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_growths",null)
else case when antivarrate>=0.7 and anti_basedvarrate<0
then if(abs(avgvarrate)>=0.1 and abs(anti_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_decline",null)
else null end
end as warning_code
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
,cast(anti_var_cnt / cnt as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
) n
) o
where warning_code is not null
结果如下:
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 act_ic_growths
1001 123456 0 3.5 2017-07-02 act_ic_growths
1001 123456 0 5.8 2017-07-03 act_ic_growths
1001 123456 0 8.1 2017-07-04 act_ic_growths
1001 123456 0 5.2 2017-07-05 act_ic_growths
1001 123456 0 8.5 2017-07-06 act_ic_growths
1001 123456 0 13.5 2017-07-07 act_ic_growths
1001 234567 1 8.1 2017-07-01 act_ic_decline
1001 234567 1 7.2 2017-07-02 act_ic_decline
1001 234567 1 6.2 2017-07-03 act_ic_decline
1001 234567 1 5.1 2017-07-04 act_ic_decline
1001 234567 1 9.3 2017-07-05 act_ic_decline
1001 234567 1 4.8 2017-07-06 act_ic_decline
1001 234567 1 2.3 2017-07-07 act_ic_decline
Time taken: 24.369 seconds, Fetched: 14 row(s)
趋势图如下:
act_ic:7天增的趋势如下
act_ic:7天减的趋势如下
备注:
根据业务需要,需要最终将转换方向也写到预警code里面,因此还需要做一步转换,最终代码如下:
#!/bin/bash
lastday=`date --date '-1days' +%Y-%m-%d` #获得昨天的日期
if [ "$1" != "" ];then
lastday=$1
fi;
option=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^option | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`;
posvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^posvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
antivarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^antivarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
avgvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^avgvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
basedvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^basedvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
end1stvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^end1stvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
sql="
select *
from (
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,case when posvarrate>=${posvarrate} and avgvarrate>0 and end1stvarrate>0
then case when switch_dir= 0
then if(abs(avgvarrate)>=${avgvarrate} and abs(pos_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'invstopos_act_ic_growths_exe_30dd',null)
else if(abs(avgvarrate)>=${avgvarrate} and abs(pos_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'postoinvs_act_ic_growths_exe_30dd',null)
end
else case when antivarrate>=${antivarrate} and anti_basedvarrate<0
then case when switch_dir= 0
then if(abs(avgvarrate)>=${avgvarrate} and abs(anti_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'invstopos_act_ic_decline_exe_30dd',null)
else if(abs(avgvarrate)>=${avgvarrate} and abs(anti_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'postoinvs_act_ic_decline_exe_30dd',null)
end
else null
end
end as warning_code
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,cast(avgvarrate as decimal(6,1)) as avgvarrate --平均变化率
,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
,cast(anti_var_cnt / cnt as decimal(6,1)) as antivarrate --反向变化率
,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate --正向变化率
FROM(
select *
,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
from(
select gw_id
,sensor_id
,switch_dir
,avg_act_ic
,compute_day
,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m
) n
) o
where warning_code is not null
;
";
if [ "$option" = "hive" ];then
hive -e "$sql" >>/tmp/$log_dir.log 2>&1 ;
fi
结果如下:
--------------------------------------------------------------------------------
OK
1001 123456 0 5.4 2017-07-01 invstopos_act_ic_growths_exe
1001 123456 0 3.5 2017-07-02 invstopos_act_ic_growths_exe
1001 123456 0 5.8 2017-07-03 invstopos_act_ic_growths_exe
1001 123456 0 8.1 2017-07-04 invstopos_act_ic_growths_exe
1001 123456 0 5.2 2017-07-05 invstopos_act_ic_growths_exe
1001 123456 0 8.5 2017-07-06 invstopos_act_ic_growths_exe
1001 123456 0 13.5 2017-07-07 invstopos_act_ic_growths_exe
1001 234567 1 8.1 2017-07-01 postoinvs_act_ic_decline_exe
1001 234567 1 7.2 2017-07-02 postoinvs_act_ic_decline_exe
1001 234567 1 6.2 2017-07-03 postoinvs_act_ic_decline_exe
1001 234567 1 5.1 2017-07-04 postoinvs_act_ic_decline_exe
1001 234567 1 9.3 2017-07-05 postoinvs_act_ic_decline_exe
1001 234567 1 4.8 2017-07-06 postoinvs_act_ic_decline_exe
1001 234567 1 2.3 2017-07-07 postoinvs_act_ic_decline_exe
Time taken: 15.222 seconds, Fetched: 14 row(s)
预警编码如下:
编码 | 预警名称 |
动作电流趋势预警 | |
postoinvs_act_ic_growths_exe | 近XXX天内ZD6直流转辙机XXX设备由定到反过程中动作电流持续增大异常预警。 |
postoinvs_act_ic_decline_exe | 近XXX天内ZD6直流转辙机XXX设备由定到反过程中动作电流持续减小异常预警。 |
invstopos_act_ic_growths_exe | 近XXX天内ZD6直流转辙机XXX设备由反到定过程中动作电流持续增大异常预警。 |
invstopos_act_ic_decline_exe | 近XXX天内ZD6直流转辙机XXX设备由反到定过程中动作电流持续减小异常预警。 |
5 小 结
本文针对物联网应用场景,以道岔转辙机动作电流为例,详细给出了一种曲线趋势预测方法。该方法通过对具有趋势性的曲线的特性进行抓取,通过统计的特征的形式进行描述,从4个维度分别来反应具有递增或递减趋势的曲线,分别为,基线变化率、首尾变化率、平均变化率、反向或正向变化率,通过该四个维度的特征与阈值比较最终判断是都复合某种发展趋势。其中本文给出的阈值读者可根据业务进行灵活配置,不断调试,以达到复合自己业务要求的结果。文中通过hivesql批处理的形式进行实现,本文给出的算法在实际中得到了较好的验证。
欢迎关注石榴姐公众号"我的SQL呀",关注我不迷路
更多推荐
一种基于批处理的N天窗口内趋势异常分析【物联网大数据应用】
发布评论