内置时序数据分析处理功能
内置时序数据分析处理功能¶
时序数据通常指按时间序列海量增长的数据,数据拥有一个时间标签,并持续输入到数据库中。 Seabox数据库内置了时序数据的高级分析处理功能,包括内置的时序分析函数,对缺失数据的空隙填充,数据的持续聚集等功能。 这些时序相关功能在包含在数据库的默认安装中,不需要单独开启即可使用。
时序分析函数¶
- time_bucket()
时间桶函数,time_bucket()将时间进行分区,不同于PostgreSQL 的date_trunc函数, time_bucket允许任意时间间隔,而不把时间间隔限制成秒,分钟,小时。返回值是分区的开始时间。
示例:计算每五分钟的cpu平均值:
SELECT time_bucket('5 minutes', time) AS five_min, avg(cpu)
FROM metrics
GROUP BY five_min
ORDER BY five_min DESC LIMIT 10;
- first()
first(value, time)聚合函数允许用户按照time列的顺序得到value列值。 例如:first(temp, time) 会根据聚合组内的时间返回最早的温度值。
示例:根据device_id分组获得第一个温度值
SELECT device_id, first(temp, time)
FROM metrics
GROUP BY device_id;
- last()
last(value, time)聚合函数允许用户按照time列的顺序得到value列值。 顺序与first函数相反。例如:last(temp, time) 会根据聚合组内的时间返回最晚的温度值。
示例:根据device_id分组获得最后一个温度值
SELECT device_id, last(temp, time)
FROM metrics
GROUP BY device_id;
- histogram()
histogram()直方图函数用来表示一组值的分布,该分布通过一组等宽桶(bucket)表示。 直方图函数根据输入的最小边界(min)和最大边界(max) 将数据分区到指定数量的桶中(nbuckets)。
返回值是一个包含n + 2个桶的数组,中间的n个桶用于指定范围内的值个数, 第一个桶用于表示小于最小边界的值的个数,最后一个桶用于表示大于或等于的最大边界的值的个数。 对于中间的n个桶,每个桶的边界是左闭右开[min,max)。
示例:显示readings表的battery_level列的直方图(按device_id分组)。
SELECT device_id, histogram(battery_level, 20, 60, 5)
FROM readings
GROUP BY device_id
LIMIT 10;
-
时序函数综合使用示例
-
建表,插入数据
CREATE TABLE conditions (
timec TIMESTAMP NOT NULL,
location TEXT NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
insert into conditions values ( '2022-01-01 09:20:00-08', 'tianjin', 12, 45);
insert into conditions values ( '2022-01-02 09:30:00-08', 'shanghai', 37, 100);
insert into conditions values ( '2022-01-02 09:20:00-08', 'tianjin', 18, 45);
insert into conditions values ( '2022-01-02 09:10:00-08', 'beijing', 18, 45);
insert into conditions values ( '2022-11-01 09:20:00-08', 'beijing', 7, 30);
insert into conditions values ( '2022-11-01 10:40:00-08', 'beijing', 12, 35);
insert into conditions values ( '2022-11-01 11:50:00-08', 'beijing', 18, 40);
insert into conditions values ( '2022-11-01 12:10:00-08', 'beijing', 23, 45);
insert into conditions values ( '2022-11-01 13:10:00-08', 'beijing', 29, 50);
insert into conditions values ( '2022-11-02 09:20:00-08', 'beijing', -12, 10);
insert into conditions values ( '2022-11-02 10:30:00-08', 'beijing', -6, 15);
insert into conditions values ( '2022-11-02 11:40:00-08', 'beijing', null, null);
insert into conditions values ( '2022-11-03 09:50:00-08', 'beijing', null, null);
select * from conditions order by timec;
timec | location | temperature | humidity
---------------------+----------+-------------+----------
2022-01-01 09:20:00 | tianjin | 12 | 45
2022-01-02 09:10:00 | beijing | 18 | 45
2022-01-02 09:20:00 | tianjin | 18 | 45
2022-01-02 09:30:00 | shanghai | 37 | 100
2022-11-01 09:20:00 | beijing | 7 | 30
2022-11-01 10:40:00 | beijing | 12 | 35
2022-11-01 11:50:00 | beijing | 18 | 40
2022-11-01 12:10:00 | beijing | 23 | 45
2022-11-01 13:10:00 | beijing | 29 | 50
2022-11-02 09:20:00 | beijing | -12 | 10
2022-11-02 10:30:00 | beijing | -6 | 15
2022-11-02 11:40:00 | beijing | |
2022-11-03 09:50:00 | beijing | |
(13 rows)
- 使用时序函数进行分析:每天一个时间间隔,统计每个城市的不同指标的 first, last, max, min 值
select location, time_bucket('1day', timec), first(humidity, timec), last(humidity, timec), max(temperature), min(temperature)
from conditions
group by time_bucket('1day', timec), location order by 2;
location | time_bucket | first | last | max | min
----------+---------------------+-------+------+-----+-----
tianjin | 2022-01-01 00:00:00 | 45 | 45 | 12 | 12
tianjin | 2022-01-02 00:00:00 | 45 | 45 | 18 | 18
shanghai | 2022-01-02 00:00:00 | 100 | 100 | 37 | 37
beijing | 2022-01-02 00:00:00 | 45 | 45 | 18 | 18
beijing | 2022-11-01 00:00:00 | 30 | 50 | 29 | 7
beijing | 2022-11-02 00:00:00 | 10 | | -6 | -12
beijing | 2022-11-03 00:00:00 | | | |
(7 rows)
间隙填充(gapfill)¶
间隙填充实现了对时间序列中缺失的数据进行算法补全的功能
- time_bucket_gapfill()
time_bucket_gapfill函数的工作原理与time_bucket相似,但还会在开始和结束之间的时间间隔 内激活间隙填充。即某个时间桶内没有数据时,也会生成一个新的时间桶。它只能与聚合查询一起使用。
示例:按照 1 秒采集周期进行填值
select time_bucket_gapfill('1 sec', time,1568116800000, 1568116805000) as ts, device_id
from table3
group by ts, device_id
order by device_id, ts;
- locf()
使用上一个时间桶的值对缺失的数值进行填充,必须配合 time_bucket_gapfill 函数使用。
示例:
SELECT time_bucket_gapfill('1 day', time, now() - INTERVAL '1 week', now()) AS day,
device_id,
avg(temperature) AS value,
locf(avg(temperature))
FROM metrics
WHERE time > now () - INTERVAL '1 week'
GROUP BY day, device_id
ORDER BY day;
- interpolate()
对缺失的数值进行插值操作,必须配合 time_bucket_gapfill 函数使用。
示例:
SELECT time_bucket_gapfill('1 day', time, now() - INTERVAL '1 week', now()) AS day,
device_id,
avg(temperature) AS value,
interpolate(avg(temperature))
FROM metrics
WHERE time > now () - INTERVAL '1 week'
GROUP BY day, device_id
ORDER BY day;
-
间隙填充的综合示例
-
创建表,插入时序数据,其中包含5点和7点的数据值,希望通过间隙填充补全6点钟的数据值
CREATE TABLE metrics_tstz(time timestamp, location INT, device_id INT, v1 float, v2 int) distributed by(location);
INSERT INTO metrics_tstz VALUES
(timestamp '2022-01-01 05:00:00 PST', 1, 1, 0.5, 10),
(timestamp '2022-01-01 05:00:00 PST', 1, 2, 0.7, 20),
(timestamp '2022-01-01 05:00:00 PST', 1, 3, 0.9, 30),
(timestamp '2022-01-01 07:00:00 PST', 1, 1, 0.0, 0),
(timestamp '2022-01-01 07:00:00 PST', 1, 2, 1.4, 40),
(timestamp '2022-01-01 07:00:00 PST', 1, 3, 0.9, 30)
;
select * from metrics_tstz ;
time | location | device_id | v1 | v2
---------------------+----------+-----------+-----+----
2022-01-01 05:00:00 | 1 | 1 | 0.5 | 10
2022-01-01 05:00:00 | 1 | 2 | 0.7 | 20
2022-01-01 05:00:00 | 1 | 3 | 0.9 | 30
2022-01-01 07:00:00 | 1 | 1 | 0 | 0
2022-01-01 07:00:00 | 1 | 2 | 1.4 | 40
2022-01-01 07:00:00 | 1 | 3 | 0.9 | 30
(6 rows)
- 执行如下查询,进行间隙填充
SELECT
time_bucket_gapfill(interval '1h',time,timestamp '2022-01-01 05:00:00-8', timestamp '2022-01-01 07:00:00-8'),
device_id,
location,
locf(avg(v1)) AS locf_v1,
locf(min(v2)) AS locf_v2,
interpolate(avg(v1)) AS interpolate_v1,
interpolate(avg(v2)) AS interpolate_v2
FROM metrics_tstz
GROUP BY 1,2,3
ORDER BY 1,2,3;
time_bucket_gapfill | device_id | location | locf_v1 | locf_v2 | interpolate_v1 | interpolate_v2
---------------------+-----------+----------+---------+---------+----------------+----------------
2022-01-01 05:00:00 | 1 | 1 | 0.5 | 10 | 0.5 | 10
2022-01-01 05:00:00 | 2 | 1 | 0.7 | 20 | 0.7 | 20
2022-01-01 05:00:00 | 3 | 1 | 0.9 | 30 | 0.9 | 30
2022-01-01 06:00:00 | 1 | 1 | 0.5 | 10 | 0.25 | 5
2022-01-01 06:00:00 | 2 | 1 | 0.7 | 20 | 1.05 | 30
2022-01-01 06:00:00 | 3 | 1 | 0.9 | 30 | 0.9 | 30
2022-01-01 07:00:00 | 1 | 1 | 0 | 0 | 0 | 0
2022-01-01 07:00:00 | 2 | 1 | 1.4 | 40 | 1.4 | 40
2022-01-01 07:00:00 | 3 | 1 | 0.9 | 30 | 0.9 | 30
(9 rows)
查看如上填充好的数据,它按照device_id和localtion分组,每组填充了06:00:00时刻的数据值, locf_v1填充的是05:00该组的平均值,locf_v2填充的时05:00该组的最小值, interpolate_v1填充的是05:00和07:00该组的中间值,interpolate_v2填充的是05:00和07:00该组的中间值
持续聚集(Continuous Aggregates)¶
对持续生成的时序数据进行预聚集,把现有数据生成一次聚集结果并物化。当需要聚集所有数据时,把预聚集的数据进行二次聚集 并追加新产生数据的聚集结果,生成最终结果集。提高聚集操作的效率。
- CREATE MATERIALIZED VIEW (Continuous Aggregate)
创建持续聚集的物化视图
示例:
CREATE MATERIALIZED VIEW continuous_aggregate_view( timec, minl, sumt, sumh )
WITH (seaboxts.continuous) AS
SELECT time_bucket('1day', timec), min(location), sum(temperature), sum(humidity)
FROM conditions
GROUP BY time_bucket('1day', timec)
- DROP MATERIALIZED VIEW (Continuous Aggregate)
删除持续聚集的物化视图
示例:
DROP MATERIALIZED VIEW continuous_aggregate_view;
- refresh_continuous_aggregate
刷新持续聚集的物化视图
示例:
CALL refresh_continuous_aggregate('continuous_aggregate_view', '2020-01-01', '2020-02-01')
-
持续聚集的综合示例
-
继续使用时序分析函数的数据
select * from conditions order by timec;
timec | location | temperature | humidity
---------------------+----------+-------------+----------
2022-01-01 09:20:00 | tianjin | 12 | 45
2022-01-02 09:10:00 | beijing | 18 | 45
2022-01-02 09:20:00 | tianjin | 18 | 45
2022-01-02 09:30:00 | shanghai | 37 | 100
2022-11-01 09:20:00 | beijing | 7 | 30
2022-11-01 10:40:00 | beijing | 12 | 35
2022-11-01 11:50:00 | beijing | 18 | 40
2022-11-01 12:10:00 | beijing | 23 | 45
2022-11-01 13:10:00 | beijing | 29 | 50
2022-11-02 09:20:00 | beijing | -12 | 10
2022-11-02 10:30:00 | beijing | -6 | 15
2022-11-02 11:40:00 | beijing | |
2022-11-03 09:50:00 | beijing | |
(13 rows)
- 创建一个持续聚集,对数据进行预聚集
create materialized view mat_m2(location, timec, firsth, lasth, maxtemp, mintemp)
WITH (seaboxts.continuous)
as
select location, time_bucket('1day', timec), first(humidity, timec), last(humidity, timec), max(temperature), min(temperature)
from conditions
group by time_bucket('1day', timec), location;
- 查看持续聚集视图的数据,与时序函数实例的结果相同
select * from mat_m2 order by 2;
location | timec | firsth | lasth | maxtemp | mintemp
----------+---------------------+--------+-------+---------+---------
tianjin | 2022-01-01 00:00:00 | 45 | 45 | 12 | 12
tianjin | 2022-01-02 00:00:00 | 45 | 45 | 18 | 18
shanghai | 2022-01-02 00:00:00 | 100 | 100 | 37 | 37
beijing | 2022-01-02 00:00:00 | 45 | 45 | 18 | 18
beijing | 2022-11-01 00:00:00 | 30 | 50 | 29 | 7
beijing | 2022-11-02 00:00:00 | 10 | | -6 | -12
beijing | 2022-11-03 00:00:00 | | | |
(7 rows)
- 继续向原表中插入时序数据
insert into conditions values ( '2022-12-02 20:10:00-08', 'tianjin', 12, 45);
insert into conditions values ( '2022-12-02 21:20:00-08', 'tianjin', 18, 45);
insert into conditions values ( '2022-12-02 20:30:00-08', 'beijing', 18, 45);
insert into conditions values ( '2022-12-02 21:50:00-08', 'beijing', 7, 30);
- 刷新持续聚集视图,查看包含新数据的聚集结果
CALL refresh_continuous_aggregate('mat_m2', NULL, NULL);
select * from mat_m2 order by 2;
location | timec | firsth | lasth | maxtemp | mintemp
----------+---------------------+--------+-------+---------+---------
tianjin | 2022-01-01 00:00:00 | 45 | 45 | 12 | 12
tianjin | 2022-01-02 00:00:00 | 45 | 45 | 18 | 18
shanghai | 2022-01-02 00:00:00 | 100 | 100 | 37 | 37
beijing | 2022-01-02 00:00:00 | 45 | 45 | 18 | 18
beijing | 2022-11-01 00:00:00 | 30 | 50 | 29 | 7
beijing | 2022-11-02 00:00:00 | 10 | | -6 | -12
beijing | 2022-11-03 00:00:00 | | | |
beijing | 2022-12-02 00:00:00 | 45 | 30 | 18 | 7
tianjin | 2022-12-02 00:00:00 | 45 | 45 | 18 | 12
(9 rows)