跳转至

Job调度

job调度说明

简介

job调度在数据库内部实现了通用的任务调度功能。通过job调度,用户可以定义任务,即设定要调度的命令,任务的运行周期与有效期,并能够通过calendar表达式指定任务运行的时间周期;进入运行周期后,job调度进程能够在指定时间从后台启动,并自动完成任务,同时记录下任务运行状态;并且能够通过提供的接口函数对任务进行管理(创建、更改以及删除)。

元数据管理

pg_job共享系统表

存储任务调度相关的元数据信息,可以在同一集群下的任一coordinator下查找到所有的任务调度信息;可通过接口函数对pg_job系统表进行操作,接口函数会在后文介绍。

以下为pg_job表结构

                                                        Table "pg_catalog.pg_job"
     Column      |            Type             | Collation | Nullable | Default | Storage  | Stats target | Char Semantics | Description 
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+----------------+-------------
 job_id          | bigint                      |           | not null |         | plain    |              | none           | 
 job_name        | name                        |           | not null |         | plain    |              | none           | 
 username        | name                        |           | not null |         | plain    |              | none           | 
 dbname          | name                        |           | not null |         | plain    |              | none           | 
 start_date      | timestamp without time zone |           | not null |         | plain    |              | none           | 
 end_date        | timestamp without time zone |           | not null |         | plain    |              | none           | 
 enable          | boolean                     |           | not null |         | plain    |              | none           | 
 run_times       | integer                     |           | not null |         | plain    |              | none           | 
 repeat_interval | text                        | C         |          |         | extended |              | none           | 
 command         | text                        | C         |          |         | extended |              | none           | 
 comment         | text                        | C         |          |         | extended |              | none           | 
Indexes:
    "pg_job_id_index" UNIQUE, btree (job_id), tablespace "pg_global"
    "pg_job_job_name_index" UNIQUE, btree (job_name), tablespace "pg_global"
Tablespace: "pg_global"
Access method: heap
  • 参数说明
参数名 参数作用 参数说明
job_id 任务id,为pg_job表的主键 在创建job时须指定,如未指定,会随机生成一个job_id
job_name 任务名,为pg_job表的主键 在创建job时须指定
username 用户名 创建job时,如未指定,则会使用当前用户
dbname 数据库名 创建job时,如未指定,则会使用当前数据库
start_date 任务周期的开始时间 创建job时,如未指定,默认使用当前时间
end_date 任务执行的有效期 超过这个时间后,任务不再调度;创建job时,如未指定,默认使用3999年
enable 任务是否启用 设为"t"时,任务启用,为"f"时,任务不执行,默认为"t"
run_times 运行次数 为正数时,为任务的运行次数;为-1时,则不限次数,默认为-1;取值应大于等于-1
repeat_interval calendar表达式,任务的执行间隔 在创建job时须指定
command 需要执行的命令 在创建job时须指定,当前仅支持SQL命令
comment 对当前job信息的注释说明 可以为空

job调度的使用方法

创建任务

  • 语句
    • SELECT job_submit
  • 功能描述
    • 创建一个新的调度任务
  • 语法格式

    SELECT job_submit(
        job_id,
        jobname,
        command,
        repeat_interval,
        start_date,
        end_date,
        dbname,
        username,
        enable,
        run_times
        comment
    )
    
  • 参数说明

    • job_id
      • 任务id
      • 取值:整数,pg_job系统表中对应的job_id字段
    • job_name
      • 任务名
      • 取值:字符串,要符合标识符的命名规范,pg_job系统表中对应的job_name字段,需指定
    • command
      • 执行的命令
      • 取值:任意SQL语句,需指定
    • repeat_interval
      • calendar表达式,任务的执行间隔
      • 取值:指定格式的字符串,需指定
    • start_date
      • 任务周期的开始时间
      • 取值:timestamp类型,pg_job系统表中对应的start_date字段
    • end_date
      • 任务执行的有效期
      • 取值:timestamp类型,pg_job系统表中对应的end_date字段
    • username
      • 用户名
      • 取值:已经注册的用户名
    • dbname
      • 数据库名
      • 取值:字符串,pg_job系统表中对应的dbname字段
    • enable
      • 任务是否有效
      • 取值:布尔类型,“t”为有效;“f”为无效,即任务不启用
    • run_times
      • 任务运行次数
      • 取值:整数,为正数时,为任务的运行次数;为-1时,则不限次数,默认为-1
    • comment
      • 注释
      • 取值:字符串
  • 注意事项
    • job_id和job_name均不能和已有的job信息重复
    • job_name,repeat_interval和command须用户指定,其余参数可不指定,不指定时默认值如元数据管理下参数说明中所述
    • job_id不指定时,将随机生成一个job_id
    • 超级用户可以将username指定为其他用户,普通用户只能指定自己

更新任务信息

  • 语句
    • SELECT job_update
  • 功能描述
    • 更新调度任务的信息,目前支持修改调度的命令、调度的时间间隔、用户名、调度周期的起始时间、任务是否启用、运行次数和对job的注释信息
  • 语法格式

    SELECT job_update(
        job_id,
        command,
        repeat_interval,
        username,
        start_date,
        enable,
        run_times,
        comment
    )
    
  • 参数说明

    • job_id
      • 任务id
      • 取值:整数,pg_job系统表中已存在的job_id
    • command
      • 执行的命令
      • 取值:任意SQL语句
    • repeat_interval
      • calendar表达式,任务的执行间隔
      • 取值:指定格式的字符串
    • username
      • 用户名
      • 取值:已经注册的用户名
    • start_date
      • 任务周期的开始时间
      • 取值:timestamp类型,pg_job系统表中对应的start_date字段
    • enable
      • 任务是否有效
      • 取值:布尔类型,“t”为有效
    • run_times
      • 任务运行次数
      • 取值:整数,为正数时,为任务的运行次数;为-1时,则不限次数,默认为-1
    • comment
      • 注释
      • 取值:字符串
  • 注意事项
    • 普通用户只可以修改自己名下的job,只有超级用户可以修改其他用户名下的job
    • 除job_id外,其余参数均可不指定,但最少指定一个

删除任务

  • 语句
    • SELECT job_delete
  • 功能描述
    • 删除任务调度的信息
  • 语法格式

    SELECT job_delete(
        job_id
    )
    
  • 参数说明

    • job_id
      • 需要删除的job的job_id
      • 取值:已存在的job
  • 注意事项
    • 普通用户只可以删除自己名下的job,只有超级用户可以删除其他用户名下的job

停止调度任务

  • 语句
    • SELECT job_terminate
  • 功能描述
    • 发送中止信号,停止调度任务
  • 语法格式

    SELECT job_terminate(
        job_id
    )
    
  • 参数说明

    • job_id
      • 需要停止的job的job_id
      • 取值:已存在的job
  • 使用示例

    • 以任务不运行时为例
    select job_terminate(9);
    job_terminate    
    --------------------
    no task is running
    (1 row)
    

获取任务id

  • 语句
    • SELECT job_name_id
  • 功能描述
    • 由job_name获取job_id
  • 语法格式

    SELECT job_name_id(
        job_name
    )
    
  • 参数说明

    • job_name
      • 任务名称
      • 取值:已存在的job
  • 使用示例

    select job_name_id('insert_job9');
    job_name_id 
    -------------
            9
    (1 row)
    

示例

job调度的信息,存储在 pg_job 这个共享系统表中,在coordinator上查询这个系统表,可以查看当前集群中所有的任务信息。如下的示例,演示创建和查询job的过程:

select job_submit(8,'insert_job8', 'insert into t2 values(2)','FREQ=SECONDLY;INTERVAL=5',now()::timestamp,(now()+interval '2 D')::timestamp, 'seaboxsql', 'job_scheduler_t2',false,5);
 job_submit
------------
          8
(1 row)

在创建job时可以不指定job_id,此时会自动分配一个job_id

select job_submit('test_job8', 'select pg_sleep(20)','FREQ=SECONDLY;INTERVAL=30',now()::timestamp,(now()+interval '2 D')::timestamp, 'seaboxsql', 'job_scheduler_t2',true,5);
 job_submit 
------------
        6130
(1 row)

查询pg_job系统表,获取任务调度的信息

select job_id, job_name, command, repeat_interval, dbname, username, enable, run_times from pg_job where job_id = 8;
 job_id |  job_name   |         command          |     repeat_interval      |  dbname   |     username     | enable | run_times 
--------+-------------+--------------------------+--------------------------+-----------+------------------+--------+-----------
      8 | insert_job8 | insert into t2 values(2) | FREQ=SECONDLY;INTERVAL=5 | seaboxsql | job_scheduler_t2 | f      |         5
(1 row)

使用job_update修改某次任务的信息

SELECT job_update(job_id => job_name_id('insert_job8'), command => 'insert into t2 values(5)', run_times => 10);
 job_update
------------
          8
(1 row)

此时再查询pg_job系统表,会发现job_id为8的job信息已经发生改变

SELECT * FROM pg_job;
 job_id |  job_name   |     username     |  dbname   |         start_date         |          end_date          | enable | run_times |     repeat_interval      |         command          | comment 
--------+-------------+------------------+-----------+----------------------------+----------------------------+--------+-----------+--------------------------+--------------------------+---------
      8 | insert_job8 | job_scheduler_t2 | seaboxsql | 2022-09-05 14:47:11.323445 | 2022-09-07 14:47:11.323445 | f      |        10 | FREQ=SECONDLY;INTERVAL=5 | insert into t2 values(5) |
(1 row)

通过job_delete可以删除任务调度的信息

select job_delete(job_name_id('test_job8'));
     job_delete
---------------------
 job 6130 is deleted
(1 row)

SELECT job_delete(8);
    job_delete
------------------
 job 8 is deleted
(1 row)

忽略部分参数时可采用如下方式创建job:

 select job_submit(job_name => 'insert_job11', command => 'insert into t2 values(5)', repeat_interval => 'FREQ=MINUTELY;INTERVAL=10', enable => false);
 job_submit
------------
      17930
(1 row)

select * from pg_job;
 job_id |   job_name   | username |  dbname   |         start_date         |      end_date       | enable | run_times |      repeat_interval      |         command          | comment
--------+--------------+----------+-----------+----------------------------+---------------------+--------+-----------+---------------------------+--------------------------+---------
  17930 | insert_job11 | test     | seaboxsql | 2022-09-06 17:54:11.483921 | 3999-12-31 16:00:00 | f      |        -1 | FREQ=MINUTELY;INTERVAL=10 | insert into t2 values(5) |
(1 row)

计算时间表达式

job调度中,在创建job时,需使用calendar表达式来指定任务的执行间隔,以下详述calendar表达式的使用。

calendar表达式使用

  • 基本形式为"FREQ= ; INTERVAL= ",字段之间需用“;”分隔;
  • "freq"字段代表频率,可选参数有yearly,monthly,daily,hourly,minutely,secondly;
  • "interval"字段代表间隔,可缺省,默认为1,参数为整数类型;
  • 还可以指定bymonth,bymonthday,byhour,byminute,bysecond关键字,分别表示哪一个月,一月当中的哪一天,哪一小时,哪一分钟,哪一秒。
  • job的具体执行时间会根据设定的start_date自动补齐,即当calendar表达式设置为"FREQ=daily; INTERVAL=1"即每天执行一次,但是具体在一天当中的什么时间执行,则要看创建job时的start_date中指定的时分秒,以此为时间点调度任务。
示例1:

‘FREQ=MINUTELY; INTERVAL=1’

解释:任务每分钟执行一次

示例2:

‘FREQ=HOURLY; INTERVAL=1’

解释:任务每小时执行一次

示例3:

‘FREQ=yearly;BYMONTH=MAY;BYMONTHDAY=1,3,5,7’

解释:任务每年的5月1,3,5,7号执行

计算时间表达式的函数

  • 语句
    • SELECT evaluate_calendar_string
  • 功能描述
    • 根据calendar表达式,计算出设定次数的任务调度时间;此函数主要为验证设定的calendar表达式是否符合预期
  • 语法格式

    SELECT evaluate_calendar_string(
        repeat_interval,
        start_date,
        show_date,
        times
    )
    
  • 参数说明

    • repeat_interval
      • 任务的执行间隔
      • 取值:指定格式的字符串,calendar表达式
    • start_date
      • 任务周期的开始时间
      • 取值:timestamp类型
    • show_date
      • 显示这个时间后的任务调度时间
      • 取值:timestamp类型
    • times
      • 展示几次任务调度时间
      • 取值:整数,大于0
  • 使用示例

    select evaluate_calendar_string('freq=secondly;interval=1',now()::timestamp,now()::timestamp,5);
                                                evaluate_calendar_string                                             
    -----------------------------------------------------------------------------------------------------------------
    {"2022-09-05 14:09:01","2022-09-05 14:09:02","2022-09-05 14:09:03","2022-09-05 14:09:04","2022-09-05 14:09:05"}
    (1 row)
    
    select unnest(evaluate_calendar_string('freq=daily;bymonth= 5;bymonthday=3;interval = 1',now()::timestamp,now()::timestamp,5)) as dates;
            dates        
    ---------------------
    2023-05-03 15:04:23
    2024-05-03 15:04:23
    2025-05-03 15:04:23
    2026-05-03 15:04:23
    2027-05-03 15:04:23
    (5 rows)
    

获取当前正在调度的任务状态

pg_job_remaining_task系统表

job调度通过job scheduler进程控制实现,此进程只存在于coordinator下,且同一时间只有一个job scheduler进程处于工作状态,其余scheduler进程处于standby状态。

  • 系统表结构
                                  Table "pg_catalog.pg_job_remaining_task"
 Column  |  Type   | Collation | Nullable | Default | Storage | Stats target | Char Semantics | Description
---------+---------+-----------+----------+---------+---------+--------------+----------------+-------------
 job_id  | bigint  |           | not null |         | plain   |              | none           |
 proc_id | integer |           | not null |         | plain   |              | none           |
 dbid    | integer |           | not null |         | plain   |              | none           |
Indexes:
    "pg_job_remaining_task_id_index" UNIQUE, btree (job_id, proc_id), tablespace "pg_global"
Tablespace: "pg_global"
Access method: heap
  • 功能描述
    • 查看当前处于活跃状态的任务调度进程,查询正在调度任务的进程号
  • 语法格式

    SELECT * FROM pg_job_remaining_task;
    

job_active_scheduler函数

  • 语句
    • SELECT job_active_scheduler()
  • 功能描述
    • 查询当前活跃的任务调度进程所在coordinator的dbid
  • 语法格式

    SELECT job_active_scheduler();
    
  • 使用示例

    select job_active_scheduler();
    job_active_scheduler 
    ----------------------
                        1
    (1 row)
    

job_scheduler_get_activity函数

  • 语句
    • SELECT * FROM job_scheduler_get_activity()
  • 功能描述
    • 查询当前正在调度的任务的执行状态
  • 语法格式

    SELECT * FROM job_scheduler_get_activity();
    
  • 参数说明

  • 注意事项
    • 需在有任务正在调度的情况下使用,否则为空表
  • 使用示例(以连接失败时为例)

    select * from job_scheduler_get_activity();
    job_id | proc_id |      next_date      | status |      err_msg      
    --------+---------+---------------------+--------+-------------------
        8 |         | 2022-09-05 18:06:19 |      7 | connection failed
    (1 row)
    

job_scheduler_activity视图

  • 视图结构

                                    View "pg_catalog.job_scheduler_activity"
         Column      |            Type             | Collation | Nullable | Default | Storage  | Description 
    -----------------+-----------------------------+-----------+----------+---------+----------+-------------
     job_id          | bigint                      |           |          |         | plain    | 
     proc_id         | integer                     |           |          |         | plain    | 
     repeat_interval | text                        | C         |          |         | extended | 
     command         | text                        | C         |          |         | extended | 
     start_date      | timestamp without time zone |           |          |         | plain    | 
     next_date       | timestamp without time zone |           |          |         | plain    | 
     status          | text                        |           |          |         | extended | 
     err_msg         | text                        |           |          |         | extended | 
    

  • 语句

    • SELECT * FROM job_scheduler_activity
  • 功能描述
    • 查询当前正在调度的任务的执行状态
  • 语法格式

    SELECT * FROM job_scheduler_activity;
    
  • 参数说明

    • job_id:任务id,与pg_job表中相同
    • proc_id:执行command的进程号
    • repeat_interval:任务的执行间隔,与pg_job表中相同
    • command:任务调度执行的命令,与pg_job表中相同
    • start_date:任务执行的起始时间,与pg_job表中相同
    • next_date:任务下一次执行的时间
    • status:任务执行的状态,以下为可能出现的九种状态
      • starting:任务开始
      • connecting:连接中
      • sending:发送查询
      • running:job运行中
      • receiving:接收返回
      • done:一次任务调度完成
      • killed:任务被杀掉
      • error:任务失败
      • waiting:等待执行中
    • err_msg:任务执行中报出的错误信息
  • 使用示例(以连接失败时为例)

    select * from job_scheduler_activity;
    job_id | proc_id |     repeat_interval      |         command          |         start_date         |      next_date      | status |      err_msg      
    --------+---------+--------------------------+--------------------------+----------------------------+---------------------+--------+-------------------
        8 |         | FREQ=SECONDLY;INTERVAL=5 | insert into t2 values(2) | 2022-09-05 18:05:29.231695 | 2022-09-05 18:15:14 | error  | connection failed
    (1 row)
    

查看job调度的历史状态

通过job_history_audit_info和job_history_info两张视图,可以查询审计日志中记录的任务的历史执行状态。

  • 注意:两张视图的使用均需在保证审计日志可用的状态下,且需创建sdaudit扩展,创建语句 "create extension sdaudit;"

job_history_audit_info视图

  • 语句
    • SELECT * FROM job_history_audit_info;
  • 功能描述
    • 从审计日志中提取任务的历史执行状态
  • 语法格式

    SELECT * FROM job_history_audit_info;
    
  • 参数说明(status列)

    • started:任务开始
    • connecting:连接中
    • sending query:发送查询
    • running:job运行中
    • completed:一次任务调度完成
    • connection failed:连接失败
    • connection lost:连接失效
    • killed:任务被杀掉
    • failed:任务失败
  • 使用示例

    select * from job_history_audit_info where job_id = 1978;
    job_id |    status     | error_message |          log_time          | db_id 
    --------+---------------+---------------+----------------------------+-------
    1978 | completed     |               | 2022-09-02 09:41:27.265+08 |     1
    1978 | running       |               | 2022-09-02 09:41:27.202+08 |     1
    1978 | sending query |               | 2022-09-02 09:41:27.201+08 |     1
    1978 | started       |               | 2022-09-02 09:41:27.194+08 |     1
    1978 | connecting    |               | 2022-09-02 09:41:27.194+08 |     1
    1978 | completed     |               | 2022-09-02 09:41:18.161+08 |     1
    1978 | running       |               | 2022-09-02 09:41:18.147+08 |     1
    1978 | sending query |               | 2022-09-02 09:41:18.147+08 |     1
    1978 | started       |               | 2022-09-02 09:41:18.13+08  |     1
    1978 | connecting    |               | 2022-09-02 09:41:18.13+08  |     1
    (10 rows)
    

job_history_info视图

  • 语句
    • SELECT * FROM job_history_info;
  • 功能描述
    • 查询job的基本信息和历史执行状态
  • 语法格式

    SELECT * FROM job_history_info;
    
  • 参数说明(status列)

    • started:任务开始
    • connecting:连接中
    • sending query:发送查询
    • running:job运行中
    • completed:一次任务调度完成
    • connection failed:连接失败
    • connection lost:连接失效
    • killed:任务被杀掉
    • failed:任务失败
  • 使用示例

    SELECT * FROM job_history_info;
    job_id |  job_name   |         command          |      repeat_interval      |    status     |            error_message             |          log_time          | db_id 
    --------+-------------+--------------------------+---------------------------+---------------+--------------------------------------+----------------------------+-------
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | started       |                                      | 2022-09-02 09:41:18.13+08  |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | connecting    |                                      | 2022-09-02 09:41:18.13+08  |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | running       |                                      | 2022-09-02 09:41:18.147+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | sending query |                                      | 2022-09-02 09:41:18.147+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | completed     |                                      | 2022-09-02 09:41:18.161+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | connecting    |                                      | 2022-09-02 09:41:27.194+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | started       |                                      | 2022-09-02 09:41:27.194+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | sending query |                                      | 2022-09-02 09:41:27.201+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | running       |                                      | 2022-09-02 09:41:27.202+08 |     1
    1978 | insert_job2 | insert into t2 values(5) | FREQ=SECONDLY;INTERVAL=10 | completed     |                                      | 2022-09-02 09:41:27.265+08 |     1
    (10 rows)
    

故障切换

  • 功能描述
    • 设定当发生故障切换后,对前调度进程剩余的运行中的job进行kill处理