Job调度
job调度说明¶
简介¶
job调度在数据库内部实现了通用的任务调度功能。通过job调度,用户可以定义任务,即设定要调度的命令,任务的运行周期与有效期,并能够通过calendar表达式指定任务运行的时间周期;进入运行周期后,job调度进程能够在指定时间从后台启动,并自动完成任务,同时记录下任务运行状态;并且能够通过提供的接口函数对任务进行管理(创建、更改以及删除)。
元数据管理¶
pg_job共享系统表¶
存储任务调度相关的元数据信息,可以在同一集群下的任一coordinator下查找到所有的任务调度信息;可通过接口函数对pg_job系统表进行操作,接口函数会在后文介绍。
以下为pg_job表结构
Table "pg_catalog.pg_job"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Char Semantics | Description
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+----------------+-------------
job_id | bigint | | not null | | plain | | none |
job_name | name | | not null | | plain | | none |
username | name | | not null | | plain | | none |
dbname | name | | not null | | plain | | none |
start_date | timestamp without time zone | | not null | | plain | | none |
end_date | timestamp without time zone | | not null | | plain | | none |
enable | boolean | | not null | | plain | | none |
run_times | integer | | not null | | plain | | none |
repeat_interval | text | C | | | extended | | none |
command | text | C | | | extended | | none |
comment | text | C | | | extended | | none |
Indexes:
"pg_job_id_index" UNIQUE, btree (job_id), tablespace "pg_global"
"pg_job_job_name_index" UNIQUE, btree (job_name), tablespace "pg_global"
Tablespace: "pg_global"
Access method: heap
- 参数说明
参数名 | 参数作用 | 参数说明 |
---|---|---|
job_id | 任务id,为pg_job表的主键 | 在创建job时须指定,如未指定,会随机生成一个job_id |
job_name | 任务名,为pg_job表的主键 | 在创建job时须指定 |
username | 用户名 | 创建job时,如未指定,则会使用当前用户 |
dbname | 数据库名 | 创建job时,如未指定,则会使用当前数据库 |
start_date | 任务周期的开始时间 | 创建job时,如未指定,默认使用当前时间 |
end_date | 任务执行的有效期 | 超过这个时间后,任务不再调度;创建job时,如未指定,默认使用3999年 |
enable | 任务是否启用 | 设为"t"时,任务启用,为"f"时,任务不执行,默认为"t" |
run_times | 运行次数 | 为正数时,为任务的运行次数;为-1时,则不限次数,默认为-1;取值应大于等于-1 |
repeat_interval | calendar表达式,任务的执行间隔 | 在创建job时须指定 |
command | 需要执行的命令 | 在创建job时须指定,当前仅支持SQL命令 |
comment | 对当前job信息的注释说明 | 可以为空 |
job调度的使用方法¶
创建任务¶
- 语句
- SELECT job_submit
- 功能描述
- 创建一个新的调度任务
-
语法格式
SELECT job_submit( job_id, jobname, command, repeat_interval, start_date, end_date, dbname, username, enable, run_times, comment )
-
参数说明
- job_id
- 任务id
- 取值:整数,pg_job系统表中对应的job_id字段
- job_name
- 任务名
- 取值:字符串,要符合标识符的命名规范,pg_job系统表中对应的job_name字段,需指定
- command
- 执行的命令
- 取值:任意SQL语句,需指定
- repeat_interval
- calendar表达式,任务的执行间隔
- 取值:指定格式的字符串,需指定
- start_date
- 任务周期的开始时间
- 取值:timestamp类型,pg_job系统表中对应的start_date字段
- end_date
- 任务执行的有效期
- 取值:timestamp类型,pg_job系统表中对应的end_date字段
- username
- 用户名
- 取值:已经注册的用户名
- dbname
- 数据库名
- 取值:字符串,pg_job系统表中对应的dbname字段
- enable
- 任务是否有效
- 取值:布尔类型,“t”为有效;“f”为无效,即任务不启用
- run_times
- 任务运行次数
- 取值:整数,为正数时,为任务的运行次数;为-1时,则不限次数,默认为-1
- comment
- 注释
- 取值:字符串
- job_id
- 注意事项
- job_id和job_name均不能和已有的job信息重复
- job_name,repeat_interval和command须用户指定,其余参数可不指定,不指定时默认值如元数据管理下参数说明中所述
- job_id不指定时,将随机生成一个job_id
- 超级用户可以将username指定为其他用户,普通用户只能指定自己
更新任务信息¶
- 语句
- SELECT job_update
- 功能描述
- 更新调度任务的信息,目前支持修改调度的命令、调度的时间间隔、用户名、调度周期的起始时间、任务是否启用、运行次数和对job的注释信息
-
语法格式
SELECT job_update( job_id, command, repeat_interval, username, start_date, enable, run_times, comment )
-
参数说明
- job_id
- 任务id
- 取值:整数,pg_job系统表中已存在的job_id
- command
- 执行的命令
- 取值:任意SQL语句
- repeat_interval
- calendar表达式,任务的执行间隔
- 取值:指定格式的字符串
- username
- 用户名
- 取值:已经注册的用户名
- start_date
- 任务周期的开始时间
- 取值:timestamp类型,pg_job系统表中对应的start_date字段
- enable
- 任务是否有效
- 取值:布尔类型,“t”为有效
- run_times
- 任务运行次数
- 取值:整数,为正数时,为任务的运行次数;为-1时,则不限次数,默认为-1
- comment
- 注释
- 取值:字符串
- job_id
- 注意事项
- 普通用户只可以修改自己名下的job,只有超级用户可以修改其他用户名下的job
- 除job_id外,其余参数均可不指定,但最少指定一个
删除任务¶
- 语句
- SELECT job_delete
- 功能描述
- 删除任务调度的信息
-
语法格式
SELECT job_delete( job_id )
-
参数说明
- job_id
- 需要删除的job的job_id
- 取值:已存在的job
- job_id
- 注意事项
- 普通用户只可以删除自己名下的job,只有超级用户可以删除其他用户名下的job
停止调度任务¶
- 语句
- SELECT job_terminate
- 功能描述
- 发送中止信号,停止调度任务
-
语法格式
SELECT job_terminate( job_id )
-
参数说明
- job_id
- 需要停止的job的job_id
- 取值:已存在的job
- job_id
-
使用示例
- 以任务不运行时为例
select job_terminate(9); job_terminate -------------------- no task is running (1 row)
获取任务id¶
- 语句
- SELECT job_name_id
- 功能描述
- 由job_name获取job_id
-
语法格式
SELECT job_name_id( job_name )
-
参数说明
- job_name
- 任务名称
- 取值:已存在的job
- job_name
-
使用示例
select job_name_id('insert_job9'); job_name_id ------------- 9 (1 row)
示例¶
job调度的信息,存储在 pg_job 这个共享系统表中,在coordinator上查询这个系统表,可以查看当前集群中所有的任务信息。如下的示例,演示创建和查询job的过程:
select job_submit(8,'insert_job8', 'insert into t2 values(2)','FREQ=SECONDLY;INTERVAL=5',now()::timestamp,(now()+interval '2 D')::timestamp, 'seaboxsql', 'job_scheduler_t2',false,5);
job_submit
------------
8
(1 row)
在创建job时可以不指定job_id,此时会自动分配一个job_id
select job_submit('test_job8', 'select pg_sleep(20)','FREQ=SECONDLY;INTERVAL=30',now()::timestamp,(now()+interval '2 D')::timestamp, 'seaboxsql', 'job_scheduler_t2',true,5);
job_submit
------------
6130
(1 row)
查询pg_job系统表,获取任务调度的信息
select job_id, job_name, command, repeat_interval, dbname, username, enable, run_times from pg_job where job_id = 8;
job_id | job_name | command | repeat_interval | dbname | username | enable | run_times
--------+-------------+--------------------------+--------------------------+-----------+------------------+--------+-----------
8 | insert_job8 | insert into t2 values(2) | FREQ=SECONDLY;INTERVAL=5 | seaboxsql | job_scheduler_t2 | f | 5
(1 row)
使用job_update修改某次任务的信息
SELECT job_update(job_id => job_name_id('insert_job8'), command => 'insert into t2 values(5)', run_times => 10);
job_update
------------
8
(1 row)
此时再查询pg_job系统表,会发现job_id为8的job信息已经发生改变
SELECT * FROM pg_job;
job_id | job_name | username | dbname | start_date | end_date | enable | run_times | repeat_interval | command | comment
--------+-------------+------------------+-----------+----------------------------+----------------------------+--------+-----------+--------------------------+--------------------------+---------
8 | insert_job8 | job_scheduler_t2 | seaboxsql | 2022-09-05 14:47:11.323445 | 2022-09-07 14:47:11.323445 | f | 10 | FREQ=SECONDLY;INTERVAL=5 | insert into t2 values(5) |
(1 row)
通过job_delete可以删除任务调度的信息
select job_delete(job_name_id('test_job8'));
job_delete
---------------------
job 6130 is deleted
(1 row)
SELECT job_delete(8);
job_delete
------------------
job 8 is deleted
(1 row)
忽略部分参数时可采用如下方式创建job:
select job_submit(job_name => 'insert_job11', command => 'insert into t2 values(5)', repeat_interval => 'FREQ=MINUTELY;INTERVAL=10', enable => false);
job_submit
------------
17930
(1 row)
select * from pg_job;
job_id | job_name | username | dbname | start_date | end_date | enable | run_times | repeat_interval | command | comment
--------+--------------+----------+-----------+----------------------------+---------------------+--------+-----------+---------------------------+--------------------------+---------
17930 | insert_job11 | test | seaboxsql | 2022-09-06 17:54:11.483921 | 3999-12-31 16:00:00 | f | -1 | FREQ=MINUTELY;INTERVAL=10 | insert into t2 values(5) |
(1 row)
计算时间表达式¶
job调度中,在创建job时,需使用calendar表达式来指定任务的执行间隔,以下详述calendar表达式的使用。
calendar表达式使用¶
- 基本形式为"FREQ= ; INTERVAL= ",字段之间需用“;”分隔;
- "freq"字段代表频率,可选参数有yearly,monthly,daily,hourly,minutely,secondly;
- "interval"字段代表间隔,可缺省,默认为1,参数为整数类型;
- 还可以指定bymonth,bymonthday,byhour,byminute,bysecond关键字,分别表示哪一个月,一月当中的哪一天,哪一小时,哪一分钟,哪一秒。
- job的具体执行时间会根据设定的start_date自动补齐,即当calendar表达式设置为"FREQ=daily; INTERVAL=1"即每天执行一次,但是具体在一天当中的什么时间执行,则要看创建job时的start_date中指定的时分秒,以此为时间点调度任务。
示例1:
‘FREQ=MINUTELY; INTERVAL=1’
解释:任务每分钟执行一次
示例2:
‘FREQ=HOURLY; INTERVAL=1’
解释:任务每小时执行一次
示例3:
‘FREQ=yearly;BYMONTH=MAY;BYMONTHDAY=1,3,5,7’
解释:任务每年的5月1,3,5,7号执行
计算时间表达式的函数¶
- 语句
- SELECT evaluate_calendar_string
- 功能描述
- 根据calendar表达式,计算出设定次数的任务调度时间;此函数主要为验证设定的calendar表达式是否符合预期
-
语法格式
SELECT evaluate_calendar_string( repeat_interval, start_date, show_date, times )
-
参数说明
- repeat_interval
- 任务的执行间隔
- 取值:指定格式的字符串,calendar表达式
- start_date
- 任务周期的开始时间
- 取值:timestamp类型
- show_date
- 显示这个时间后的任务调度时间
- 取值:timestamp类型
- times
- 展示几次任务调度时间
- 取值:整数,大于0
- repeat_interval
-
使用示例
select evaluate_calendar_string('freq=secondly;interval=1',now()::timestamp,now()::timestamp,5); evaluate_calendar_string ----------------------------------------------------------------------------------------------------------------- {"2022-09-05 14:09:01","2022-09-05 14:09:02","2022-09-05 14:09:03","2022-09-05 14:09:04","2022-09-05 14:09:05"} (1 row) select unnest(evaluate_calendar_string('freq=daily;bymonth= 5;bymonthday=3;interval = 1',now()::timestamp,now()::timestamp,5)) as dates; dates --------------------- 2023-05-03 15:04:23 2024-05-03 15:04:23 2025-05-03 15:04:23 2026-05-03 15:04:23 2027-05-03 15:04:23 (5 rows)
获取当前正在调度的任务状态¶
pg_job_remaining_task系统表¶
job调度通过job scheduler进程控制实现,此进程只存在于coordinator下,且同一时间只有一个job scheduler进程处于工作状态,其余scheduler进程处于standby状态。
- 系统表结构
Table "pg_catalog.pg_job_remaining_task"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Char Semantics | Description
---------+---------+-----------+----------+---------+---------+--------------+----------------+-------------
job_id | bigint | | not null | | plain | | none |
proc_id | integer | | not null | | plain | | none |
dbid | integer | | not null | | plain | | none |
Indexes:
"pg_job_remaining_task_id_index" UNIQUE, btree (job_id, proc_id), tablespace "pg_global"
Tablespace: "pg_global"
Access method: heap
- 功能描述
- 查看当前处于活跃状态的任务调度进程,查询正在调度任务的进程号
-
语法格式
SELECT * FROM pg_job_remaining_task;
job_active_scheduler函数¶
- 语句
- SELECT job_active_scheduler()
- 功能描述
- 查询当前活跃的任务调度进程所在coordinator的dbid
-
语法格式
SELECT job_active_scheduler();
-
使用示例
select job_active_scheduler(); job_active_scheduler ---------------------- 1 (1 row)
job_scheduler_get_activity函数¶
- 语句
- SELECT * FROM job_scheduler_get_activity()
- 功能描述
- 查询当前正在调度的任务的执行状态
-
语法格式
SELECT * FROM job_scheduler_get_activity();
-
参数说明
- 无
- 注意事项
- 需在有任务正在调度的情况下使用,否则为空表
-
使用示例(以连接失败时为例)
select * from job_scheduler_get_activity(); job_id | proc_id | next_date | status | err_msg --------+---------+---------------------+--------+------------------- 8 | | 2022-09-05 18:06:19 | 7 | connection failed (1 row)
job_scheduler_activity视图¶
-
视图结构
View "pg_catalog.job_scheduler_activity" Column | Type | Collation | Nullable | Default | Storage | Description -----------------+-----------------------------+-----------+----------+---------+----------+------------- job_id | bigint | | | | plain | proc_id | integer | | | | plain | repeat_interval | text | C | | | extended | command | text | C | | | extended | start_date | timestamp without time zone | | | | plain | next_date | timestamp without time zone | | | | plain | status | text | | | | extended | err_msg | text | | | | extended |
-
语句
- SELECT * FROM job_scheduler_activity
- 功能描述
- 查询当前正在调度的任务的执行状态
-
语法格式
SELECT * FROM job_scheduler_activity;
-
参数说明
- job_id:任务id,与pg_job表中相同
- proc_id:执行command的进程号
- repeat_interval:任务的执行间隔,与pg_job表中相同
- command:任务调度执行的命令,与pg_job表中相同
- start_date:任务执行的起始时间,与pg_job表中相同
- next_date:任务下一次执行的时间
- status:任务执行的状态,以下为可能出现的九种状态
- starting:任务开始
- connecting:连接中
- sending:发送查询
- running:job运行中
- receiving:接收返回
- done:一次任务调度完成
- killed:任务被杀掉
- error:任务失败
- waiting:等待执行中
- err_msg:任务执行中报出的错误信息
-
使用示例(以连接失败时为例)
select * from job_scheduler_activity; job_id | proc_id | repeat_interval | command | start_date | next_date | status | err_msg --------+---------+--------------------------+--------------------------+----------------------------+---------------------+--------+------------------- 8 | | FREQ=SECONDLY;INTERVAL=5 | insert into t2 values(2) | 2022-09-05 18:05:29.231695 | 2022-09-05 18:15:14 | error | connection failed (1 row)
查看job调度的历史状态¶
通过job_history_audit_info和job_history_info两张视图,可以查询审计日志中记录的任务的历史执行状态。
- 注意:两张视图的使用均需在保证审计日志可用的状态下,且需创建sdaudit扩展,创建语句 "create extension sdaudit;"
job_history_audit_info视图¶
- 语句
- SELECT * FROM job_history_audit_info;
- 功能描述
- 从审计日志中提取任务的历史执行状态
-
语法格式
SELECT * FROM job_history_audit_info;
-
参数说明(status列)
- started:任务开始
- connecting:连接中
- sending query:发送查询
- running:job运行中
- completed:一次任务调度完成
- connection failed:连接失败
- connection lost:连接失效
- killed:任务被杀掉
- failed:任务失败
-
使用示例
select * from job_history_audit_info where job_id = 1978; job_id | status | error_message | log_time | db_id --------+---------------+---------------+----------------------------+------- 1978 | completed | | 2022-09-02 09:41:27.265+08 | 1 1978 | running | | 2022-09-02 09:41:27.202+08 | 1 1978 | sending query | | 2022-09-02 09:41:27.201+08 | 1 1978 | started | | 2022-09-02 09:41:27.194+08 | 1 1978 | connecting | | 2022-09-02 09:41:27.194+08 | 1 1978 | completed | | 2022-09-02 09:41:18.161+08 | 1 1978 | running | | 2022-09-02 09:41:18.147+08 | 1 1978 | sending query | | 2022-09-02 09:41:18.147+08 | 1 1978 | started | | 2022-09-02 09:41:18.13+08 | 1 1978 | connecting | | 2022-09-02 09:41:18.13+08 | 1 (10 rows)
job_history_info视图¶
- 语句
- SELECT * FROM job_history_info;
- 功能描述
- 查询job的基本信息和历史执行状态
-
语法格式
SELECT * FROM job_history_info;
-
参数说明(status列)
- started:任务开始
- connecting:连接中
- sending query:发送查询
- running:job运行中
- completed:一次任务调度完成
- connection failed:连接失败
- connection lost:连接失效
- killed:任务被杀掉
- failed:任务失败
-
使用示例
SELECT * FROM job_history_info; job_id | job_name | command | repeat_interval | status | error_message | log_time | db_id --------+-------------+--------------------------+---------------------------+---------------+--------------------------------------+----------------------------+------- 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | started | | 2022-09-02 09:41:18.13+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | connecting | | 2022-09-02 09:41:18.13+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | running | | 2022-09-02 09:41:18.147+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | sending query | | 2022-09-02 09:41:18.147+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | completed | | 2022-09-02 09:41:18.161+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | connecting | | 2022-09-02 09:41:27.194+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | started | | 2022-09-02 09:41:27.194+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | sending query | | 2022-09-02 09:41:27.201+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | running | | 2022-09-02 09:41:27.202+08 | 1 1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | completed | | 2022-09-02 09:41:27.265+08 | 1 (10 rows)
故障切换¶
- 功能描述
- 设定当发生故障切换后,对前调度进程剩余的运行中的job进行kill处理