一种基于批处理的N天窗口内趋势异常分析【物联网大数据应用】

编程入门 行业动态 更新时间:2024-10-26 20:21:26

目录

0 问题背景

1 算法分析

2 问题描述

3 数据准备

4 问题分析

5 小 结


0 问题背景

     在设备健康监测管理时,我们往往需要对设备运行时的电路参数进行监控,以反映设备的运行状况,通过对设备的运行参数长期统计观察建立设备的状态评估模型 。其中一个典型的场景就是我们需要对设备参数指标,进行趋势分析,比如判断某个指标在某一段时间内呈现递增或是递减的趋势,换句话说设备在一时期某一个参数朝着恶化的趋势发展,我们需要对这种趋势进行预警。解决这种长周期的趋势预警问题,我们往往需要构建设备数仓,在批处理中进行分析。本文以转辙机动作电流为例进行分析,针对转辙机动作电流文中给出了一种设备趋势预警的方法,对一定窗口内,同一设备的同一阶段的动作电流平均值指标变化趋势进行趋势分析,当1个时间窗口内,设备在某一阶段的动作电流水平保持整体性增长,大数据系统进行报警,文中通过批处理的形式对该方法进行了实现。

1 算法分析

趋势预警判断规则如下:

对于n个状态平均点形成动作电流平均值窗口集合:P{   Pt1,  Pt2, Pt3…… Ptn}
求得
{d1,d2, ……..dn}
其中di  = (Pt(i+1) - Pt(i))   /  Pt(i)
1)Σdi /(n-1)   ≥  0.1 (平均变化率,可配置); di ≠dn(最末1日,不计算增长率),反应平均变化速度
2)|max(Pi)- Pbase|/Pbase(基线变化率)  ≥ 0.5 (可配置),或|min(Pi)- Pbase|/Pbase,反应峰值偏离基线的程度.
3)|Pn-P1|/P1(最后一天值与第一天值的差值的绝对值/第一天值)反应相对于初始值增长或降低情况,称为首尾变化率  ≥ 0.5(可配置)。反应偏离初始值的程度,或者称为定基发展速度。
4)时间窗口内,最多有容忍30%个点没有增长。反向变化率或正向变化率。反向变化率:未增长的点个数/窗口点个数 -1.正向变化率大于0.7或反向变化率小于0.3时预警。反向变化率=1-正向变化率  
5)1)and 2) and 3)and4) 同时成立时,预警条件符合。
注意:这里的Pbase基线暂取为窗口内数据的中位值

2 问题描述

现已知动作电流按天分区的天表,其表结构如下(为了便于分析本文以7天窗口为例):

gw_id:表示网关id,sensor_id:表示设备id,switch_dir:转换方向 0表示定到反,1表示反到定。

avg_act_ic:表示计算的指标,动作电流按天的平均值。compute_day:计算日期

gw_idsensor_idswitch_diravg_act_iccompute_day
100112345605.42017-07-01
100112345603.52017-07-02
100112345605.82017-07-03
100112345608.12017-07-04
100112345605.22017-07-05
100112345608.52017-07-06
1001123456013.52017-07-07
100112345613.12017-07-01
100112345612.82017-07-02
100112345611.12017-07-03
100112345615.52017-07-04
100112345615.32017-07-05
100112345615.62017-07-06
100112345612.82017-07-07
100123456718.12017-07-01
100123456717.22017-07-02
100123456716.22017-07-03
100123456715.12017-07-04
100123456719.32017-07-05
100123456714.82017-07-06
100123456712.32017-07-07

根据以上趋势预警规则,通过SQL形式分析出结果。预警结果保持增长的趋势定义warnning_code为act_ic_growths,递减的趋势定义为act_ic_decline,最终的结果只需要在原有表的基础上给出warning_code即可。

3 数据准备

 (1) 数据准备

   vim act_ic.txt

1001	123456	0	5.4	2017-07-01
1001	123456	0	3.5	2017-07-02
1001	123456	0	5.8	2017-07-03
1001	123456	0	8.1	2017-07-04
1001	123456	0	5.2	2017-07-05
1001	123456	0	8.5	2017-07-06
1001	123456	0	13.5	2017-07-07
1001	123456	1	3.1	2017-07-01
1001	123456	1	2.8	2017-07-02
1001	123456	1	1.1	2017-07-03
1001	123456	1	5.5	2017-07-04
1001	123456	1	5.3	2017-07-05
1001	123456	1	5.6	2017-07-06
1001	123456	1	2.8	2017-07-07
1001	234567	1	8.1	2017-07-01
1001	234567	1	7.2	2017-07-02
1001	234567	1	6.2	2017-07-03
1001	234567	1	5.1	2017-07-04
1001	234567	1	9.3	2017-07-05
1001	234567	1	4.8	2017-07-06
1001	234567	1	2.3	2017-07-07

  (2) 建表.在hive数仓中建表

drop table if exists act_ic;
     
 create table if not exists act_ic(
       gw_id string,
       sensor_id string,
        switch_dir int,
       avg_act_ic float,
       compute_day string
        )
 row format delimited fields terminated by '\t'

(3)加载数据

load data local inpath "/home/centos/dan_test/act_ic.txt" into table act_ic;

(4)查询数据

hive> select * from act_ic;
OK
1001	123456	0	5.4	2017-07-01
1001	123456	0	3.5	2017-07-02
1001	123456	0	5.8	2017-07-03
1001	123456	0	8.1	2017-07-04
1001	123456	0	5.2	2017-07-05
1001	123456	0	8.5	2017-07-06
1001	123456	0	13.5	2017-07-07
1001	123456	1	3.1	2017-07-01
1001	123456	1	2.8	2017-07-02
1001	123456	1	1.1	2017-07-03
1001	123456	1	5.5	2017-07-04
1001	123456	1	5.3	2017-07-05
1001	123456	1	5.6	2017-07-06
1001	123456	1	2.8	2017-07-07
1001	234567	1	8.1	2017-07-01
1001	234567	1	7.2	2017-07-02
1001	234567	1	6.2	2017-07-03
1001	234567	1	5.1	2017-07-04
1001	234567	1	9.3	2017-07-05
1001	234567	1	4.8	2017-07-06
1001	234567	1	2.3	2017-07-07
Time taken: 0.592 seconds, Fetched: 21 row(s)

4 问题分析

注:问题求解钱需要先获取按天统计按天分区的动作电流均值指标表,这里的统计在本文中不再具体阐述。

(1) 先做底表。先利用lag()函数求出相邻上一条记录的avg_act_ic指标值作为辅助列,求出max(avg_act_ic),avg_act_ic的中位值,窗口内按时间排序后的第一个值和最后一个值。

SQL代码如下:为了保持结果按天排序我们按照row_number()进行排序

select *
      ,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
     ,sensor_id
     ,switch_dir
     ,avg_act_ic
     ,compute_day
     ,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
     ,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
     ,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir)
     ,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
     ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
     ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
from act_ic
) t

结果如下:

--------------------------------------------------------------------------------
OK
1001	123456	0	5.4	2017-07-01	5.4	0	13.5	5.8	13.5	5.4	1
1001	123456	0	3.5	2017-07-02	5.4	-1.9	13.5	5.8	13.5	5.4	2
1001	123456	0	5.8	2017-07-03	3.5	2.3	13.5	5.8	13.5	5.4	3
1001	123456	0	8.1	2017-07-04	5.8	2.3	13.5	5.8	13.5	5.4	4
1001	123456	0	5.2	2017-07-05	8.1	-2.9	13.5	5.8	13.5	5.4	5
1001	123456	0	8.5	2017-07-06	5.2	3.3	13.5	5.8	13.5	5.4	6
1001	123456	0	13.5	2017-07-07	8.5	5	13.5	5.8	13.5	5.4	7
1001	123456	1	3.1	2017-07-01	3.1	0	5.6	3.1	2.8	3.1	1
1001	123456	1	2.8	2017-07-02	3.1	-0.3	5.6	3.1	2.8	3.1	2
1001	123456	1	1.1	2017-07-03	2.8	-1.7	5.6	3.1	2.8	3.1	3
1001	123456	1	5.5	2017-07-04	1.1	4.4	5.6	3.1	2.8	3.1	4
1001	123456	1	5.3	2017-07-05	5.5	-0.2	5.6	3.1	2.8	3.1	5
1001	123456	1	5.6	2017-07-06	5.3	0.3	5.6	3.1	2.8	3.1	6
1001	123456	1	2.8	2017-07-07	5.6	-2.8	5.6	3.1	2.8	3.1	7
1001	234567	1	8.1	2017-07-01	8.1	0	9.3	6.2	2.3	8.1	1
1001	234567	1	7.2	2017-07-02	8.1	-0.9	9.3	6.2	2.3	8.1	2
1001	234567	1	6.2	2017-07-03	7.2	-1	9.3	6.2	2.3	8.1	3
1001	234567	1	5.1	2017-07-04	6.2	-1.1	9.3	6.2	2.3	8.1	4
1001	234567	1	9.3	2017-07-05	5.1	4.2	9.3	6.2	2.3	8.1	5
1001	234567	1	4.8	2017-07-06	9.3	-4.5	9.3	6.2	2.3	8.1	6
1001	234567	1	2.3	2017-07-07	4.8	-2.5	9.3	6.2	2.3	8.1	7
Time taken: 21.78 seconds, Fetched: 21 row(s)

此处也可以统计处差值大于0的个数,以及正向变化率,为了优化逻辑,我们将部分指标计算进行下沉处理,SQL代码如下:

select *
     ,sum(case when avg_act_ic_diff <0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) -1 as anti_var_cnt
     ,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
     ,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
     ,sensor_id
     ,switch_dir
     ,avg_act_ic
     ,compute_day
     ,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
     ,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
     ,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
     ,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
     ,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
     ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
     ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
     ,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
--------------------------------------------------------------------------------
OK
1001	123456	0	5.4	2017-07-01	5.4	0	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	1
1001	123456	0	3.5	2017-07-02	5.4	-1.9	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	2
1001	123456	0	5.8	2017-07-03	3.5	2.3	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	3
1001	123456	0	8.1	2017-07-04	5.8	2.3	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	4
1001	123456	0	5.2	2017-07-05	8.1	-2.9	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	5
1001	123456	0	8.5	2017-07-06	5.2	3.3	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	6
1001	123456	0	13.5	2017-07-07	8.5	5	13.5	3.5	5.8	13.5	5.4	7    0.49773696506423515	7
1001	123456	1	3.1	2017-07-01	3.1	0	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	1
1001	123456	1	2.8	2017-07-02	3.1	-0.3	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	2
1001	123456	1	1.1	2017-07-03	2.8	-1.7	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	3
1001	123456	1	5.5	2017-07-04	1.1	4.4	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	4
1001	123456	1	5.3	2017-07-05	5.5	-0.2	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	5
1001	123456	1	5.6	2017-07-06	5.3	0.3	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	6
1001	123456	1	2.8	2017-07-07	5.6	-2.8	5.6	1.1	3.1	2.8	3.1	7    0.8828140656227909	7
1001	234567	1	8.1	2017-07-01	8.1	0	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	1
1001	234567	1	7.2	2017-07-02	8.1	-0.9	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	2
1001	234567	1	6.2	2017-07-03	7.2	-1	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	3
1001	234567	1	5.1	2017-07-04	6.2	-1.1	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	4
1001	234567	1	9.3	2017-07-05	5.1	4.2	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	5
1001	234567	1	4.8	2017-07-06	9.3	-4.5	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	6
1001	234567	1	2.3	2017-07-07	4.8	-2.5	9.3	2.3	6.2	2.3	8.1	7    0.3759421760605059	7
Time taken: 16.968 seconds, Fetched: 21 row(s)

(2)根据步骤1计算的宽表依据具体的算法求各项指标。指标包括:平均变化率、基线变化率、正向变化率、反向变化率、首尾变化率

select gw_id
     ,sensor_id
     ,switch_dir
     ,avg_act_ic
     ,compute_day
     ,cast(avgvarrate as decimal(6,1)) as avgvarrate  --平均变化率
     ,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
     ,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
     ,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
     ,cast(anti_var_cnt / cnt  as decimal(6,1)) as antivarrate --反向变化率
     ,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate  --正向变化率
FROM(
select *
     ,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
     ,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
     ,row_number() over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as rn
from(
select gw_id
     ,sensor_id
     ,switch_dir
     ,avg_act_ic
     ,compute_day
     ,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
     ,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
     ,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
     ,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
     ,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
     ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
     ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
     ,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
from act_ic
) t
) m  

注意点:需要考虑边界情况。计算结果如下:

--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
OK
1001	123456	0	5.4	2017-07-01	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	0	3.5	2017-07-02	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	0	5.8	2017-07-03	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	0	8.1	2017-07-04	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	0	5.2	2017-07-05	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	0	8.5	2017-07-06	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	0	13.5	2017-07-07	0.5	1.3	-0.4	1.5	0.3	0.7
1001	123456	1	3.1	2017-07-01	0.9	0.8	-0.6	0.1	0.6	0.4
1001	123456	1	2.8	2017-07-02	0.9	0.8	-0.6	0.1	0.6	0.4
1001	123456	1	1.1	2017-07-03	0.9	0.8	-0.6	0.1	0.6	0.4
1001	123456	1	5.5	2017-07-04	0.9	0.8	-0.6	0.1	0.6	0.4
1001	123456	1	5.3	2017-07-05	0.9	0.8	-0.6	0.1	0.6	0.4
1001	123456	1	5.6	2017-07-06	0.9	0.8	-0.6	0.1	0.6	0.4
1001	123456	1	2.8	2017-07-07	0.9	0.8	-0.6	0.1	0.6	0.4
1001	234567	1	8.1	2017-07-01	0.4	0.5	-0.6	0.7	0.7	0.3
1001	234567	1	7.2	2017-07-02	0.4	0.5	-0.6	0.7	0.7	0.3
1001	234567	1	6.2	2017-07-03	0.4	0.5	-0.6	0.7	0.7	0.3
1001	234567	1	5.1	2017-07-04	0.4	0.5	-0.6	0.7	0.7	0.3
1001	234567	1	9.3	2017-07-05	0.4	0.5	-0.6	0.7	0.7	0.3
1001	234567	1	4.8	2017-07-06	0.4	0.5	-0.6	0.7	0.7	0.3
1001	234567	1	2.3	2017-07-07	0.4	0.5	-0.6	0.7	0.7	0.3
Time taken: 14.807 seconds, Fetched: 21 row(s)

(3) 通过第二步计算的指标,进行阈值比较判断得出趋势预警code

select gw_id
     ,sensor_id
     ,switch_dir
     ,avg_act_ic
     ,compute_day
     ,case when posvarrate>=0.7 and avgvarrate>0 and end1stvarrate>0
           then if(abs(avgvarrate)>=0.1 and abs(pos_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_growths",null)
           else case when antivarrate>=0.7 and anti_basedvarrate<0
                     then if(abs(avgvarrate)>=0.1 and abs(anti_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_decline",null)  
                     else null end
           end as warning_code
from(
       select gw_id
         ,sensor_id
         ,switch_dir
         ,avg_act_ic
         ,compute_day
         ,cast(avgvarrate as decimal(6,1)) as avgvarrate  --平均变化率
         ,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
         ,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
         ,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
         ,cast(anti_var_cnt / cnt  as decimal(6,1)) as antivarrate --反向变化率
         ,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate  --正向变化率
    FROM(
        select *
             ,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
             ,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
        from(
            select gw_id
                 ,sensor_id
                 ,switch_dir
                 ,avg_act_ic
                 ,compute_day
                 ,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
                 ,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
                 ,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
                 ,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
                 ,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
                 ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
                 ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
                 ,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
            from act_ic
            ) t
        ) m  
) n

计算结果如下:

VERTICES: 06/06  [==========================>>] 100%  ELAPSED TIME: 15.98 s    
--------------------------------------------------------------------------------
OK
1001	123456	0	5.4	2017-07-01	act_ic_growths
1001	123456	0	3.5	2017-07-02	act_ic_growths
1001	123456	0	5.8	2017-07-03	act_ic_growths
1001	123456	0	8.1	2017-07-04	act_ic_growths
1001	123456	0	5.2	2017-07-05	act_ic_growths
1001	123456	0	8.5	2017-07-06	act_ic_growths
1001	123456	0	13.5	2017-07-07	act_ic_growths
1001	123456	1	3.1	2017-07-01	NULL
1001	123456	1	2.8	2017-07-02	NULL
1001	123456	1	1.1	2017-07-03	NULL
1001	123456	1	5.5	2017-07-04	NULL
1001	123456	1	5.3	2017-07-05	NULL
1001	123456	1	5.6	2017-07-06	NULL
1001	123456	1	2.8	2017-07-07	NULL
1001	234567	1	8.1	2017-07-01	act_ic_decline
1001	234567	1	7.2	2017-07-02	act_ic_decline
1001	234567	1	6.2	2017-07-03	act_ic_decline
1001	234567	1	5.1	2017-07-04	act_ic_decline
1001	234567	1	9.3	2017-07-05	act_ic_decline
1001	234567	1	4.8	2017-07-06	act_ic_decline
1001	234567	1	2.3	2017-07-07	act_ic_decline
Time taken: 18.042 seconds, Fetched: 21 row(s)

(4)过滤掉warning_code为NULL的值 求出最终结果

select * 
from (
	select gw_id
		 ,sensor_id
		 ,switch_dir
		 ,avg_act_ic
		 ,compute_day
		 ,case when posvarrate>=0.7 and avgvarrate>0 and end1stvarrate>0
			   then if(abs(avgvarrate)>=0.1 and abs(pos_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_growths",null)
			   else case when antivarrate>=0.7 and anti_basedvarrate<0
						 then if(abs(avgvarrate)>=0.1 and abs(anti_basedvarrate)>=0.5 and abs(end1stvarrate)>=0.5,"act_ic_decline",null)  
						 else null end
			   end as warning_code
	from(
	   select gw_id
		 ,sensor_id
		 ,switch_dir
		 ,avg_act_ic
		 ,compute_day
		 ,cast(avgvarrate as decimal(6,1)) as avgvarrate  --平均变化率
		 ,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
		 ,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
		 ,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
		 ,cast(anti_var_cnt / cnt  as decimal(6,1)) as antivarrate --反向变化率
		 ,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate  --正向变化率
	FROM(
	select *
		 ,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
		 ,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
	from(
		select gw_id
			 ,sensor_id
			 ,switch_dir
			 ,avg_act_ic
			 ,compute_day
			 ,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
			 ,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
			 ,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
			 ,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
			 ,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
			 ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
			 ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
			 ,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
		from act_ic
			) t
		) m  
	) n
) o
where warning_code is not null

 结果如下:

--------------------------------------------------------------------------------
OK
1001	123456	0	5.4	2017-07-01	act_ic_growths
1001	123456	0	3.5	2017-07-02	act_ic_growths
1001	123456	0	5.8	2017-07-03	act_ic_growths
1001	123456	0	8.1	2017-07-04	act_ic_growths
1001	123456	0	5.2	2017-07-05	act_ic_growths
1001	123456	0	8.5	2017-07-06	act_ic_growths
1001	123456	0	13.5	2017-07-07	act_ic_growths
1001	234567	1	8.1	2017-07-01	act_ic_decline
1001	234567	1	7.2	2017-07-02	act_ic_decline
1001	234567	1	6.2	2017-07-03	act_ic_decline
1001	234567	1	5.1	2017-07-04	act_ic_decline
1001	234567	1	9.3	2017-07-05	act_ic_decline
1001	234567	1	4.8	2017-07-06	act_ic_decline
1001	234567	1	2.3	2017-07-07	act_ic_decline
Time taken: 24.369 seconds, Fetched: 14 row(s)

趋势图如下:

act_ic:7天增的趋势如下

 act_ic:7天减的趋势如下

备注:

根据业务需要,需要最终将转换方向也写到预警code里面,因此还需要做一步转换,最终代码如下:

#!/bin/bash
lastday=`date --date '-1days' +%Y-%m-%d` #获得昨天的日期

if [ "$1" != "" ];then
        lastday=$1
fi;

option=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^option | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`;
posvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^posvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
antivarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^antivarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
avgvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^avgvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
basedvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^basedvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`
end1stvarrate=`hadoop fs -cat /phm/JTTL_ETL_COMMON/etl_process.properties | grep ^end1stvarrate | awk -F '=' '{print $2}' | sed s/[[:space:]]//g`

sql="
select * 
from (
	select gw_id
		 ,sensor_id
		 ,switch_dir
		 ,avg_act_ic
		 ,compute_day
		 ,case when posvarrate>=${posvarrate} and avgvarrate>0 and end1stvarrate>0
               then case when switch_dir= 0
                    then if(abs(avgvarrate)>=${avgvarrate} and abs(pos_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'invstopos_act_ic_growths_exe_30dd',null)
                    else if(abs(avgvarrate)>=${avgvarrate} and abs(pos_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'postoinvs_act_ic_growths_exe_30dd',null)
                    end
               else case when antivarrate>=${antivarrate} and anti_basedvarrate<0
                    then case when switch_dir= 0
                              then if(abs(avgvarrate)>=${avgvarrate} and abs(anti_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'invstopos_act_ic_decline_exe_30dd',null)
                              else if(abs(avgvarrate)>=${avgvarrate} and abs(anti_basedvarrate)>=${basedvarrate} and abs(end1stvarrate)>=${end1stvarrate},'postoinvs_act_ic_decline_exe_30dd',null)
                              end
                    else null
                    end
              end as warning_code
	from(
	   select gw_id
		 ,sensor_id
		 ,switch_dir
		 ,avg_act_ic
		 ,compute_day
		 ,cast(avgvarrate as decimal(6,1)) as avgvarrate  --平均变化率
		 ,cast(abs(max_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as pos_basedvarrate --正向基线变化率
		 ,cast((min_avg_act_ic - avg_act_ic_m)/avg_act_ic_m as decimal(6,1)) as anti_basedvarrate --反向基线变化率
		 ,cast(abs(avg_act_ic_last - avg_act_ic_first)/abs(avg_act_ic_first) as decimal(6,1)) as end1stvarrate --首尾变化率(定基发展速度)
		 ,cast(anti_var_cnt / cnt  as decimal(6,1)) as antivarrate --反向变化率
		 ,1 - cast(anti_var_cnt / cnt as decimal(6,1)) as posvarrate  --正向变化率
	FROM(
	select *
		 ,sum(case when avg_act_ic_diff < 0 then 1 else 0 end ) over(PARTITION by gw_id,sensor_id,switch_dir) as anti_var_cnt
		 ,(sum(abs(avg_act_ic_diff) / avg_act_ic_lag) over(PARTITION by gw_id,sensor_id,switch_dir)) / (cnt -1) as avgvarrate
	from(
		select gw_id
			 ,sensor_id
			 ,switch_dir
			 ,avg_act_ic
			 ,compute_day
			 ,lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_lag
			 ,cast(avg_act_ic - lag(avg_act_ic,1,avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as decimal(6,1)) as avg_act_ic_diff
			 ,max(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as max_avg_act_ic
			 ,min(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir) as min_avg_act_ic
			 ,(percentile(cast(cast(avg_act_ic as decimal(6,1))*10 as int), 0.5) over(PARTITION by gw_id,sensor_id,switch_dir))/10 as avg_act_ic_m
			 ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day desc) as avg_act_ic_last
			 ,first_value(avg_act_ic) over(PARTITION by gw_id,sensor_id,switch_dir order by compute_day) as avg_act_ic_first
			 ,count(1) over(PARTITION by gw_id,sensor_id,switch_dir) as cnt
		from act_ic
			) t
		) m  
	) n
) o
where warning_code is not null
;
";
if [ "$option" = "hive" ];then

   hive -e "$sql"  >>/tmp/$log_dir.log  2>&1 ;
fi

结果如下:

--------------------------------------------------------------------------------
OK
1001	123456	0	5.4	2017-07-01	invstopos_act_ic_growths_exe
1001	123456	0	3.5	2017-07-02	invstopos_act_ic_growths_exe
1001	123456	0	5.8	2017-07-03	invstopos_act_ic_growths_exe
1001	123456	0	8.1	2017-07-04	invstopos_act_ic_growths_exe
1001	123456	0	5.2	2017-07-05	invstopos_act_ic_growths_exe
1001	123456	0	8.5	2017-07-06	invstopos_act_ic_growths_exe
1001	123456	0	13.5	2017-07-07	invstopos_act_ic_growths_exe
1001	234567	1	8.1	2017-07-01	postoinvs_act_ic_decline_exe
1001	234567	1	7.2	2017-07-02	postoinvs_act_ic_decline_exe
1001	234567	1	6.2	2017-07-03	postoinvs_act_ic_decline_exe
1001	234567	1	5.1	2017-07-04	postoinvs_act_ic_decline_exe
1001	234567	1	9.3	2017-07-05	postoinvs_act_ic_decline_exe
1001	234567	1	4.8	2017-07-06	postoinvs_act_ic_decline_exe
1001	234567	1	2.3	2017-07-07	postoinvs_act_ic_decline_exe
Time taken: 15.222 seconds, Fetched: 14 row(s)

预警编码如下:

编码预警名称
动作电流趋势预警
postoinvs_act_ic_growths_exe近XXX天内ZD6直流转辙机XXX设备由定到反过程中动作电流持续增大异常预警。
postoinvs_act_ic_decline_exe近XXX天内ZD6直流转辙机XXX设备由定到反过程中动作电流持续减小异常预警。
invstopos_act_ic_growths_exe近XXX天内ZD6直流转辙机XXX设备由反到定过程中动作电流持续增大异常预警。
invstopos_act_ic_decline_exe近XXX天内ZD6直流转辙机XXX设备由反到定过程中动作电流持续减小异常预警。

5 小 结

本文针对物联网应用场景,以道岔转辙机动作电流为例,详细给出了一种曲线趋势预测方法。该方法通过对具有趋势性的曲线的特性进行抓取,通过统计的特征的形式进行描述,从4个维度分别来反应具有递增或递减趋势的曲线,分别为,基线变化率、首尾变化率、平均变化率、反向或正向变化率,通过该四个维度的特征与阈值比较最终判断是都复合某种发展趋势。其中本文给出的阈值读者可根据业务进行灵活配置,不断调试,以达到复合自己业务要求的结果。文中通过hivesql批处理的形式进行实现,本文给出的算法在实际中得到了较好的验证。

欢迎关注石榴姐公众号"我的SQL呀",关注我不迷路

更多推荐

一种基于批处理的N天窗口内趋势异常分析【物联网大数据应用】

本文发布于:2023-06-14 07:28:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1451593.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:批处理   天窗   口内   异常   趋势

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!