跳转至

内置时序数据分析处理功能

内置时序数据分析处理功能

时序数据通常指按时间序列海量增长的数据,数据拥有一个时间标签,并持续输入到数据库中。 Seabox数据库内置了时序数据的高级分析处理功能,包括内置的时序分析函数,对缺失数据的空隙填充,数据的持续聚集等功能。 这些时序相关功能在包含在数据库的默认安装中,不需要单独开启即可使用。

时序分析函数

  1. time_bucket()

时间桶函数,time_bucket()将时间进行分区,不同于PostgreSQL 的date_trunc函数, time_bucket允许任意时间间隔,而不把时间间隔限制成秒,分钟,小时。返回值是分区的开始时间。

示例:计算每五分钟的cpu平均值:

SELECT time_bucket('5 minutes', time) AS five_min, avg(cpu)
 FROM metrics
 GROUP BY five_min
 ORDER BY five_min DESC LIMIT 10;
  1. first()

first(value, time)聚合函数允许用户按照time列的顺序得到value列值。 例如:first(temp, time) 会根据聚合组内的时间返回最早的温度值。

示例:根据device_id分组获得第一个温度值

SELECT device_id, first(temp, time)
 FROM metrics
 GROUP BY device_id;
  1. last()

last(value, time)聚合函数允许用户按照time列的顺序得到value列值。 顺序与first函数相反。例如:last(temp, time) 会根据聚合组内的时间返回最晚的温度值。

示例:根据device_id分组获得最后一个温度值

SELECT device_id, last(temp, time)
 FROM metrics
 GROUP BY device_id;
  1. histogram()

histogram()直方图函数用来表示一组值的分布,该分布通过一组等宽桶(bucket)表示。 直方图函数根据输入的最小边界(min)和最大边界(max) 将数据分区到指定数量的桶中(nbuckets)。

返回值是一个包含n + 2个桶的数组,中间的n个桶用于指定范围内的值个数, 第一个桶用于表示小于最小边界的值的个数,最后一个桶用于表示大于或等于的最大边界的值的个数。 对于中间的n个桶,每个桶的边界是左闭右开[min,max)。

示例:显示readings表的battery_level列的直方图(按device_id分组)。

SELECT device_id, histogram(battery_level, 20, 60, 5)
 FROM readings
 GROUP BY device_id
 LIMIT 10;
  1. 时序函数综合使用示例

  2. 建表,插入数据

CREATE TABLE conditions (
   timec       TIMESTAMP         NOT NULL,
   location    TEXT              NOT NULL,
   temperature DOUBLE PRECISION  NULL,
   humidity    DOUBLE PRECISION  NULL
 );
insert into conditions values ( '2022-01-01 09:20:00-08', 'tianjin', 12, 45);
insert into conditions values ( '2022-01-02 09:30:00-08', 'shanghai', 37, 100);
insert into conditions values ( '2022-01-02 09:20:00-08', 'tianjin', 18, 45);
insert into conditions values ( '2022-01-02 09:10:00-08', 'beijing', 18, 45);
insert into conditions values ( '2022-11-01 09:20:00-08', 'beijing', 7, 30);
insert into conditions values ( '2022-11-01 10:40:00-08', 'beijing', 12, 35);
insert into conditions values ( '2022-11-01 11:50:00-08', 'beijing', 18, 40);
insert into conditions values ( '2022-11-01 12:10:00-08', 'beijing', 23, 45);
insert into conditions values ( '2022-11-01 13:10:00-08', 'beijing', 29, 50);
insert into conditions values ( '2022-11-02 09:20:00-08', 'beijing', -12, 10);
insert into conditions values ( '2022-11-02 10:30:00-08', 'beijing', -6, 15);
insert into conditions values ( '2022-11-02 11:40:00-08', 'beijing', null, null);
insert into conditions values ( '2022-11-03 09:50:00-08', 'beijing', null, null);

select * from conditions order by timec;
        timec        | location | temperature | humidity 
---------------------+----------+-------------+----------
 2022-01-01 09:20:00 | tianjin  |          12 |       45
 2022-01-02 09:10:00 | beijing  |          18 |       45
 2022-01-02 09:20:00 | tianjin  |          18 |       45
 2022-01-02 09:30:00 | shanghai |          37 |      100
 2022-11-01 09:20:00 | beijing  |           7 |       30
 2022-11-01 10:40:00 | beijing  |          12 |       35
 2022-11-01 11:50:00 | beijing  |          18 |       40
 2022-11-01 12:10:00 | beijing  |          23 |       45
 2022-11-01 13:10:00 | beijing  |          29 |       50
 2022-11-02 09:20:00 | beijing  |         -12 |       10
 2022-11-02 10:30:00 | beijing  |          -6 |       15
 2022-11-02 11:40:00 | beijing  |             |         
 2022-11-03 09:50:00 | beijing  |             |         
(13 rows)
  • 使用时序函数进行分析:每天一个时间间隔,统计每个城市的不同指标的 first, last, max, min 值
select location, time_bucket('1day', timec), first(humidity, timec), last(humidity, timec), max(temperature), min(temperature)
from conditions
group by time_bucket('1day', timec), location order by 2;
 location |     time_bucket     | first | last | max | min 
----------+---------------------+-------+------+-----+-----
 tianjin  | 2022-01-01 00:00:00 |    45 |   45 |  12 |  12
 tianjin  | 2022-01-02 00:00:00 |    45 |   45 |  18 |  18
 shanghai | 2022-01-02 00:00:00 |   100 |  100 |  37 |  37
 beijing  | 2022-01-02 00:00:00 |    45 |   45 |  18 |  18
 beijing  | 2022-11-01 00:00:00 |    30 |   50 |  29 |   7
 beijing  | 2022-11-02 00:00:00 |    10 |      |  -6 | -12
 beijing  | 2022-11-03 00:00:00 |       |      |     |    
(7 rows)

间隙填充(gapfill)

间隙填充实现了对时间序列中缺失的数据进行算法补全的功能

  1. time_bucket_gapfill()

time_bucket_gapfill函数的工作原理与time_bucket相似,但还会在开始和结束之间的时间间隔 内激活间隙填充。即某个时间桶内没有数据时,也会生成一个新的时间桶。它只能与聚合查询一起使用。

示例:按照 1 秒采集周期进行填值

select time_bucket_gapfill('1 sec', time,1568116800000, 1568116805000) as ts, device_id
 from table3
 group by ts, device_id
 order by device_id, ts;
  1. locf()

使用上一个时间桶的值对缺失的数值进行填充,必须配合 time_bucket_gapfill 函数使用。

示例:

SELECT time_bucket_gapfill('1 day', time, now() - INTERVAL '1 week', now()) AS day,
   device_id,
   avg(temperature) AS value,
   locf(avg(temperature))
 FROM metrics
 WHERE time > now () - INTERVAL '1 week'
 GROUP BY day, device_id
 ORDER BY day;
  1. interpolate()

对缺失的数值进行插值操作,必须配合 time_bucket_gapfill 函数使用。

示例:

SELECT time_bucket_gapfill('1 day', time, now() - INTERVAL '1 week', now()) AS day,
   device_id,
   avg(temperature) AS value,
   interpolate(avg(temperature))
 FROM metrics
 WHERE time > now () - INTERVAL '1 week'
 GROUP BY day, device_id
 ORDER BY day;
  1. 间隙填充的综合示例

  2. 创建表,插入时序数据,其中包含5点和7点的数据值,希望通过间隙填充补全6点钟的数据值

CREATE TABLE metrics_tstz(time timestamp, location INT, device_id INT, v1 float, v2 int) distributed by(location);
INSERT INTO metrics_tstz VALUES
    (timestamp '2022-01-01 05:00:00 PST', 1, 1, 0.5, 10),
    (timestamp '2022-01-01 05:00:00 PST', 1, 2, 0.7, 20),
    (timestamp '2022-01-01 05:00:00 PST', 1, 3, 0.9, 30),
    (timestamp '2022-01-01 07:00:00 PST', 1, 1, 0.0, 0),
    (timestamp '2022-01-01 07:00:00 PST', 1, 2, 1.4, 40),
    (timestamp '2022-01-01 07:00:00 PST', 1, 3, 0.9, 30)
;

select * from metrics_tstz ;
        time         | location | device_id | v1  | v2 
---------------------+----------+-----------+-----+----
 2022-01-01 05:00:00 |        1 |         1 | 0.5 | 10
 2022-01-01 05:00:00 |        1 |         2 | 0.7 | 20
 2022-01-01 05:00:00 |        1 |         3 | 0.9 | 30
 2022-01-01 07:00:00 |        1 |         1 |   0 |  0
 2022-01-01 07:00:00 |        1 |         2 | 1.4 | 40
 2022-01-01 07:00:00 |        1 |         3 | 0.9 | 30
(6 rows)
  • 执行如下查询,进行间隙填充
SELECT
  time_bucket_gapfill(interval '1h',time,timestamp '2022-01-01 05:00:00-8', timestamp '2022-01-01 07:00:00-8'),
  device_id,
  location,
  locf(avg(v1)) AS locf_v1,
  locf(min(v2)) AS locf_v2,
  interpolate(avg(v1)) AS interpolate_v1,
  interpolate(avg(v2)) AS interpolate_v2
FROM metrics_tstz
GROUP BY 1,2,3
ORDER BY 1,2,3;
 time_bucket_gapfill | device_id | location | locf_v1 | locf_v2 | interpolate_v1 | interpolate_v2 
---------------------+-----------+----------+---------+---------+----------------+----------------
 2022-01-01 05:00:00 |         1 |        1 |     0.5 |      10 |            0.5 |             10
 2022-01-01 05:00:00 |         2 |        1 |     0.7 |      20 |            0.7 |             20
 2022-01-01 05:00:00 |         3 |        1 |     0.9 |      30 |            0.9 |             30
 2022-01-01 06:00:00 |         1 |        1 |     0.5 |      10 |           0.25 |              5
 2022-01-01 06:00:00 |         2 |        1 |     0.7 |      20 |           1.05 |             30
 2022-01-01 06:00:00 |         3 |        1 |     0.9 |      30 |            0.9 |             30
 2022-01-01 07:00:00 |         1 |        1 |       0 |       0 |              0 |              0
 2022-01-01 07:00:00 |         2 |        1 |     1.4 |      40 |            1.4 |             40
 2022-01-01 07:00:00 |         3 |        1 |     0.9 |      30 |            0.9 |             30
(9 rows)

查看如上填充好的数据,它按照device_id和localtion分组,每组填充了06:00:00时刻的数据值, locf_v1填充的是05:00该组的平均值,locf_v2填充的时05:00该组的最小值, interpolate_v1填充的是05:00和07:00该组的中间值,interpolate_v2填充的是05:00和07:00该组的中间值

持续聚集(Continuous Aggregates)

对持续生成的时序数据进行预聚集,把现有数据生成一次聚集结果并物化。当需要聚集所有数据时,把预聚集的数据进行二次聚集 并追加新产生数据的聚集结果,生成最终结果集。提高聚集操作的效率。

  1. CREATE MATERIALIZED VIEW (Continuous Aggregate)

创建持续聚集的物化视图

示例:

CREATE MATERIALIZED VIEW continuous_aggregate_view( timec, minl, sumt, sumh )
 WITH (seaboxts.continuous) AS
SELECT time_bucket('1day', timec), min(location), sum(temperature), sum(humidity)
 FROM conditions
 GROUP BY time_bucket('1day', timec)
  1. DROP MATERIALIZED VIEW (Continuous Aggregate)

删除持续聚集的物化视图

示例:

DROP MATERIALIZED VIEW continuous_aggregate_view;
  1. refresh_continuous_aggregate

刷新持续聚集的物化视图

示例:

CALL refresh_continuous_aggregate('continuous_aggregate_view', '2020-01-01', '2020-02-01')
  1. 持续聚集的综合示例

  2. 继续使用时序分析函数的数据

   select * from conditions order by timec;
        timec        | location | temperature | humidity 
---------------------+----------+-------------+----------
 2022-01-01 09:20:00 | tianjin  |          12 |       45
 2022-01-02 09:10:00 | beijing  |          18 |       45
 2022-01-02 09:20:00 | tianjin  |          18 |       45
 2022-01-02 09:30:00 | shanghai |          37 |      100
 2022-11-01 09:20:00 | beijing  |           7 |       30
 2022-11-01 10:40:00 | beijing  |          12 |       35
 2022-11-01 11:50:00 | beijing  |          18 |       40
 2022-11-01 12:10:00 | beijing  |          23 |       45
 2022-11-01 13:10:00 | beijing  |          29 |       50
 2022-11-02 09:20:00 | beijing  |         -12 |       10
 2022-11-02 10:30:00 | beijing  |          -6 |       15
 2022-11-02 11:40:00 | beijing  |             |         
 2022-11-03 09:50:00 | beijing  |             |         
(13 rows)
  • 创建一个持续聚集,对数据进行预聚集
create materialized view mat_m2(location, timec, firsth, lasth, maxtemp, mintemp)
WITH (seaboxts.continuous)
as
select location, time_bucket('1day', timec), first(humidity, timec), last(humidity, timec), max(temperature), min(temperature)
from conditions
group by time_bucket('1day', timec), location;
  • 查看持续聚集视图的数据,与时序函数实例的结果相同
select * from mat_m2 order by 2;
 location |        timec        | firsth | lasth | maxtemp | mintemp 
----------+---------------------+--------+-------+---------+---------
 tianjin  | 2022-01-01 00:00:00 |     45 |    45 |      12 |      12
 tianjin  | 2022-01-02 00:00:00 |     45 |    45 |      18 |      18
 shanghai | 2022-01-02 00:00:00 |    100 |   100 |      37 |      37
 beijing  | 2022-01-02 00:00:00 |     45 |    45 |      18 |      18
 beijing  | 2022-11-01 00:00:00 |     30 |    50 |      29 |       7
 beijing  | 2022-11-02 00:00:00 |     10 |       |      -6 |     -12
 beijing  | 2022-11-03 00:00:00 |        |       |         |        
(7 rows)
  • 继续向原表中插入时序数据
insert into conditions values ( '2022-12-02 20:10:00-08', 'tianjin', 12, 45);
insert into conditions values ( '2022-12-02 21:20:00-08', 'tianjin', 18, 45);
insert into conditions values ( '2022-12-02 20:30:00-08', 'beijing', 18, 45);
insert into conditions values ( '2022-12-02 21:50:00-08', 'beijing', 7, 30);
  • 刷新持续聚集视图,查看包含新数据的聚集结果
CALL refresh_continuous_aggregate('mat_m2', NULL, NULL);
select * from mat_m2 order by 2;
 location |        timec        | firsth | lasth | maxtemp | mintemp 
----------+---------------------+--------+-------+---------+---------
 tianjin  | 2022-01-01 00:00:00 |     45 |    45 |      12 |      12
 tianjin  | 2022-01-02 00:00:00 |     45 |    45 |      18 |      18
 shanghai | 2022-01-02 00:00:00 |    100 |   100 |      37 |      37
 beijing  | 2022-01-02 00:00:00 |     45 |    45 |      18 |      18
 beijing  | 2022-11-01 00:00:00 |     30 |    50 |      29 |       7
 beijing  | 2022-11-02 00:00:00 |     10 |       |      -6 |     -12
 beijing  | 2022-11-03 00:00:00 |        |       |         |        
 beijing  | 2022-12-02 00:00:00 |     45 |    30 |      18 |       7
 tianjin  | 2022-12-02 00:00:00 |     45 |    45 |      18 |      12
(9 rows)