跳转至

数据库文件服务(scfs)

数据库文件服务(scfs)

这一节介绍了数据库文件服务scfs及其数据同步插件dataflow。

scfs全称seabox cluster file server,它在数据集成中充当文件分发服务器的作用。当mpp集群的主节点发送获取文件数据的请求时,scfs会打开该文件的文件数据流,mpp集群的主节点并把请求信息和scfs相关信息发送给各执行节点(executor),各执行节点向scfs获取数据。scfs分发的原理是,将文件进行分块,并以完整行为结束,将文件块发送给请求的各执行节点。

scfs协议用于CREATE FOREIGN TABLESQL命令,以访问SeaboxMPP Database scfs文件服务器实用程序提供的外部数据。

当外部数据由scfs提供时,SeaboxMPP数据库系统中的所有节点都可以并行读取或写入外部表数据。

scfs 安装

安装 scfs

scfs放在SeaboxMPP数据库Coordinator主机安装的$SDHOME/bin中。

建议在一个不同于SeaboxMPP数据库Coordinator或者Executor的机器上运行scfs,例如在专用于ETL处理的机器。

在集群Coordinator或者Executor节点上运行scfs可能会对查询执行造成性能影响。

要在用户的ETL服务器上安装scfs,请从SeaboxMPP Load Tools包中得到它并且按照它的安装指导。

启动和停止 scfs

从用户的当前目录中运行,输入:

scfs &

要从一个不同的目录启动,应指定提供文件的目录,还可以有选择地指定运行的HTTP端口。

要在后台启动scfs并且把输出的消息和错误记录在一个日志文件中:

$ scfs -d /var/load_files -p 8081 -l /home/seabox/log &

对于同一个ETL主机上的多个scfs实例,为每一个实例使用一个不同的基础目录和端口。例如:

$ scfs -d /var/load_files1 -p 8081 -l /home/seabox/log1 &
$ scfs -d /var/load_files2 -p 8082 -l /home/seabox/log2 &

在scfs运行在后台时停止它:

首先找到它的进程ID:

$ ps -ef | grep scfs

然后杀死进程,例如(这个例子中的进程ID是3456):

$ kill 3456

scfs 配置

scfs 参数

参数 参数作用
-h, –help 查看帮助
-v 详细模式
-V 查看scfs测试的端口的信息
-s 简化日志最小值
-B 二进制自动转码
-p port 服务端口,默认8080
-d dir 指定文件输出目录,默认为 '.'
-l logfn 日志文件名
-t tm scfs超时时间,单位:秒
-m maxlen 最大行长度,单位:字节,默认32768
–ssl dir 启动HTTPS服务,使用指定路径的证书
–dataflow 使用dataflow服务
–version 查看版本
-w timeout 关闭目标文件前超时,单位:秒
–parallel num scfs并行度

测试连接

executor在运行时访问scfs。 确保SeaboxMPP的executor主机具有到scfs的网络访问。 scfs是一个Web服务器:可以通过从SeaboxMPP阵列的每一个主机(executor和Coordinator)运行下列命令来测试连接:

$ wget http://scfs_hostname:port/filename           

运行多个 scfs 实例

您可以在多个主机上运行scfs实例,也可以在每个主机上运行多个scfs实例。

这允许您策略性地部署scfs服务器,以便通过利用所有可用的网络带宽和SeaboxMPP数据库的并行性来获得快速的数据加载和导出速率。

  • 允许网络流量同时使用所有的ETL主机网络接口卡(NIC)。
  • 在ETL主机上的多个scfs实例之间平均划分外部表数据。
  • 例如,在一个有两个NIC的ETL系统上,运行两个scfs实例(每个NIC上一个)来优化数据导入性能并且在这两个scfs之间平均划分外部表数据文件。

控制 executor 并行

sc_external_max_execs服务器配置参数控制能同时访问单一scfs实例的executor实例数量。默认值为64。用户可以设置executor的数目为一些executor处理外部数据文件并且一些执行其他数据库处理。

可在Coordinator实例的seaboxsql.conf文件中设置这个参数。

scfs 挂起,超时中止

如果scfs实用程序挂起而没有发生读取或写入活动,则可以在下次发生挂起时生成core文件以帮助调试问题。 将环境变量SCFS_WATCHDOG_TIMER设置为scfs无活动等待多少秒后强制退出。 设置环境变量并且scfs挂起后,实用程序将在指定的秒数后中止,创建core文件,并将中止信息发送到日志文件。

此示例在Linux系统上设置环境变量,以便scfs在300秒(5分钟)无活动后退出。

export SCFS_WATCHDOG_TIMER=300

使用 scfs 导入导出文件

scfs文件服务器实用程序位于SeaboxMPP数据库主节点和每个节点上的$SDHOME/bin目录中。

启动scfs实例时,您指定一个侦听端口以及一个包含要读取的文件或要写入文件的目录的路径。

例如,此命令在后台运行scfs,侦听端口8801,并在/home/scadmin/external_files目录中提供文件:

$ scfs -p 8801 -d /home/scadmin/external_files &

CREATE FOREIGN TABLE命令resource子句将外部表定义连接到一个scfs实例。 如果外部表是可读的,则scfs服务器从指定目录中的文件读取数据记录,将它们打包到块中,并在响应SeaboxMPP数据库节点的请求时发送该块。这些节点解压缩它们接收的行并根据外部表的分发策略分发它们。如果外部表是可写表,则节点将请求中的行块发送到scfs,scfs将它们写入外部文件。

外部数据文件可以包含CSV格式的行或CREATE FOREIGN TABLE命令的FORMAT子句支持的任何分隔文本格式。

CREATE FOREIGN TABLE定义必须有用于scfs的正确的主机名、端口以及文件名、以及相对于scfs提供文件的目录(scfs启动时指定的目录路径)的方式指定文件和路径。

如果用户在其系统上启动scfs且IPv6网络被禁用,测试一个IPv6端口时,scfs会显示下列警告消息。

[WRN scfs.c:2050] Creating the socket failed

如果对应的IPv4端口可用,scfs会使用该端口并且忽略对于IPv6端口的警告。

有关IPv6和IPv4网络的信息,请见操作系统的文档。

当用scfs或者scfss协议读写数据时, scfs工具拒绝头部不包括X-SC-PROTO的HTTP请求。 如果在头部没有检测到XSC-PROTOscfs会在HTTP响应头部的状态行中返回一个400错误: 400 invalid request (no sc-proto)

SeaboxMPP数据库会在HTTP请求头部包括X-SC-PROTO以表示该请求是来自于SeaboxMPP数据库。

此外,scfs可以配置YAML格式的文件,以在支持的文本格式和另一种格式(例如XML或JSON)之间转换外部数据文件。

对于可读的外部表,scfs会自动解压缩gzip.gz)和bzip2.bz2)文件。其中可以使用parallelgz的方式对gzip文件进行解压加速,使用方法为配置环境变量SD_GZMETHOD=parallelgz,能够减少大约25%的解压时间。

您可以使用通配符(*)或其他C样式模式匹配来表示要读取的多个文件。

外部文件路径都假定相对于启动scfs实例时指定的路径。

使用 scfs 进行数据同步(dataflow)

dataflow是scfs的一个数据同步插件,用来将oracle或kafka(OGG格式)中的数据加载到mpp中,包括全量同步oracle-flow和增量同步kafka-flow,如下图所示。

oracle-flow作用是做数据的全量同步。其主要实现原理是从oracle数据库多线程并行获取数据,并写入文件中,最后将写入的文件数据加载到mpp中。

kafka-flow作用是做数据的增量同步。其主要实现原理是从kafka集群消费数据,该数据是由ogg等工具导出的指定格式的数据,并对消费的数据进行解析,将解析的数据写入文件,然后加载到mpp中。

dataflow 使用说明

  • 安装 scfs 包,并在启动前,确定已 source scfs 包下的seaboxmpp_loaders_path.sh 文件, 该文件中包含 $SDHOME_LOADERS 环境变量
  • 启动 scfs 时需要配上 "–dataflow" 参数
## 可以先使用scfs -h 查看各参数的作用,使用dataflow功能启动参数如下
scfs -p 19588 -d /home/seabox/kafka/tmp/ -l logfile.txt -m 10240000 --dataflow &
dataflow 功能函数汇总
接口及调用 功能
CALL dataflow.dataflow_create() 创建dataflow
CALL dataflow.dataflow_alter() 修改dataflow配置信息
CALL dataflow.dataflow_start() 启动dataflow
CALL dataflow.dataflow_stop() 停止dataflow
CALL dataflow.dataflow_drop() 删除dataflow
SELECT * FROM dataflow.dataflow_show() 查看dataflow配置信息
SELECT * FROM dataflow.dataflow_status() 查看dataflow状态
SELECT * FROM dataflow.dataflow_all() 查看所有存在的dataflow
SELECT * FROM dataflow.dataflow_log() 查看dataflow日志
SELECT * FROM dataflow.ddlx_create_dataflow() 查看创建dataflow的create语句
创建 dataflow
  • 函数接口
call dataflow.dataflow_create(flow_name text, flow_type text, options text default ‘’, flow_default text DEFAULT default);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
flow_type dataflow类型,incr增量(kafka),oracle全量(oracle)
options oracle-flow和kafka-flow配置参数(形式key=value) ''
flow_default 在指定dataflow配置信息上更改 'default'
  • 使用示例及结果
    --- 不使用指定任务配置创建新dataflow
    seaboxsql=# call dataflow.dataflow_create('kafka_test1', 'incr', 'kafka_broker=127.0.0.1:9092, kafka_topic=OGG_GIS_GZ98_01, scfs_host=127.0.0.1:8282');
     dataflow_create
    -----------------
     CREATE SUCCESS
    (1 row)
    
    --- 使用指定任务配置创建新dataflow, 在kafka_test1配置上更改,则复用kafka_test1的kafka_topic_partition=-1
    seaboxsql=# call dataflow.dataflow_create('kafka_test2', 'incr', 'kafka_topic=OGG_GIS_GZ98_02', 'kafka_test1');
                   dataflow_create               
    ---------------------------------------------
     CREATE SUCCESS: USE CONFIG OF "kafka_test1"
    (1 row)
    
    
    --- 创建失败(以参数不存在为例)
    seaboxsql=# call dataflow.dataflow_create('kafka_test3', 'incr', 'aaa=bbb');
    ERROR:  column does not exist: "aaa"
    CONTEXT:  PL/Python function "dataflow_create"
    
修改 dataflow
  • 函数接口
--- 修改参数前需要保证任务已经被手动停止
call dataflow.dataflow_alter(flow_name text, options text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
options 待修改的配置参数
  • 使用示例
    --- 修改kafka_test1的enable_use_2phase参数
    call dataflow.dataflow_alter('kafka_test', 'enable_use_2phase=0');
     dataflow_alter
    ---------------
     ALTER SUCCESS
    (1 row)
    
    --- 修改失败(以参数不存在为例)
    seaboxsql=# call dataflow.dataflow_alter('kafka_test', 'aaa=0');
    ERROR:  column does not exist: "aaa"
    CONTEXT:  PL/Python function "dataflow_alter"
    
启动 dataflow
  • 函数接口
call dataflow.dataflow_start(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 使用示例
seaboxsql=# call dataflow.dataflow_start('kafka_test');
 dataflow_start 
----------------
 START SUCCESS
(1 row)
停止 dataflow
  • 函数接口
call dataflow.dataflow_stop(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 使用示例
    seaboxsql=# call dataflow.dataflow_stop('kafka_test');
     dataflow_stop
    ---------------
     STOP SUCCESS
    (1 row)
    
删除 dataflow
  • 函数接口
call dataflow.dataflow_drop(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 使用示例
    --- 删除任务前需要保证任务已经被手动停止
     seaboxsql=# call dataflow.dataflow_drop('task3')
     dataflow_drop
    ---------------
     DROP SUCCESS
    (1 row)
    
查看 dataflow 配置信息
  • 函数接口
select dataflow.dataflow_show(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 使用示例
    --- 查看kafka_test的参数(结果经截取)
    seaboxsql=# select dataflow.dataflow_show('kafka_test');
                  dataflow_show
    ---------------------------------------------------
     (flow_name,kafka_test)
     (flow_type,incr)
     (use_raw,0)
     (kafka_broker,127.0.0.1:9092)
     (kafka_topic,OGG_GIS_GZ98_01)
     (kafka_topic_partition,-1)
     (kafka_topic_partition_offset,0)
     (kafka_consume_exit,never)
     (kafka_consume_timeout,0)
     (kafka_consume_group_id,"")
     (kafka_consume_sasl_user,"")
     (kafka_consume_sasl_passwd,"")
     (kafka_consume_keytab,"")
     (kafka_consume_principal,"")
    
    
     --- 使用select *可以查看格式化后的结果(结果经截取)
     seaboxsql=# select * from dataflow.dataflow_show('kafka_test');
                 config_name            |  config_value
    
    ------------------------------------+---------------------------
     flow_name                          | kafka_test
     flow_type                          | incr
     use_raw                            | 0
     origin_mode                        | 0
     kafka_broker                       | 127.0.0.1:9092
     kafka_topic                        | OGG_GIS_GZ98_01
     kafka_topic_partition              | -1
     kafka_topic_partition_offset       | 0
     kafka_consume_exit                 | never
     kafka_consume_timeout              | 0
     kafka_consume_group_id             |
     kafka_consume_sasl_user            |
     kafka_consume_sasl_passwd          |
     kafka_consume_keytab               |
     kafka_consume_principal            |
    
    
     --- 格式化后可以查询指定的参数名
     seaboxsql=# select * from dataflow.dataflow_show('kafka_test') where config_name='kafka_topic';
     config_name |  config_value
    -------------+-----------------
     kafka_topic | OGG_GIS_GZ98_01
    (1 row)
    
查看 dataflow 状态
  • 函数接口
select dataflow.dataflow_status(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 任务状态说明
状态 格式 说明
已创建 created 当任务被成功创建后的状态
已启动 started 当任务被成功启动后的状态
进行中 running 任务处于进行中,没有发生错误,进行中的任务不能被重启
用户退出 exited by user 任务被用户暂停
任务成功 exited with info: task success 任务执行成功,并已经退出
任务失败 exited with error: task failed 任务执行失败,异常退出,返回失败信息
  • 增量加载实时状态显示
状态名称 作用 说明
source_queue 存储消息的队列信息 格式为 "i, r, B", source 阶段队列中元素个数(i),保存的行数(r),总的字节数(B)
sink_queue 存储解析后消息的队列信息 格式为 "i, r, B", sink 阶段队列中元素个数(i),保存的行数(r),总的字节数(B),
shared_status 共享内存的信息 格式为 "i, B, files B", 共享内存中元素个数(i),共享内存中已占用字节数(B), 总的生成文件字节数(B)
batch_status 写出条数的状态信息 格式为 "r, ms, msg/s", 生成一批的条数®, 时间(ms), 速率(msg/s)
load_table_num 上一批加载表的个数
batch_load_time 上一批加载时间
load_long_time 上一批加载时间最长的sql的运行时间
load_long_time_sql 上一批加载时间最长的sql
- 使用示例
-- oracle任务状态
-- oracle结果信息表是results_table显示的 dataflow.oracle_flow_results
seaboxsql=# select * from dataflow.dataflow_status('ora');
   status_name    |          status_value
------------------+--------------------------------
 flow_name        | ora
 flow_type        | full
 oracle_tables    | split_test
 mpp_tables       | sp
 mpp_schema       | 
 results_table    | dataflow.oracle_flow_results
 task_create_time | 2022-06-16 10:03:45
 task_begin_time  | 2022-06-17 05:53:06
 task_end_time    | 2022-06-17 05:53:10
 task_status      | exited with info: task success
 task_job_id      | 2
(11 rows)


-- kafka普通任务状态
seaboxsql=# select * from dataflow.dataflow_status('kafka_test');
   status_name      |          status_value          
--------------------+--------------------------------
 flow_name          | great_test
 flow_type          | incr
 kafka_topic        | great_test
 offsets            | 0:2298322, 1:2274545, 2:1498648, 3:1881545, 4:2166939
 end_offsets        | 0:4562393, 1:4512734, 2:2965291, 3:3676615, 4:4282965
 skiptables         | 
 insert_counts      | 120000
 delete_counts      | 0
 source_queue       | 60 i, 480000 r, 131.5 MB
 sink_queue         | 0 i, 0 r, 0.0 B
 shared_status      | 1 i, 954 r, files 2.2 MB
 batch_status       | 16000 r, 1216 ms, 13147 msg/s
 load_table_num     | 1
 batch_load_time    | 0.478
 load_long_time     | 0.465
 load_long_time_sql | insert into test1.csg_test0("f_a", "f_c", "f_d", "F_B", "f_e", "f_f") select "f_a", "f_c", "f_d", "F_B", "f_e", "f_f" from seabox_dataflo
w."great_test_test1_csg_test0_ix"
 task_create_time   | 2023-01-12 10:59:32
 task_begin_time    | 2023-01-12 10:59:32
 task_end_time      | 
 task_status        | running
 task_job_id        | 369
(21 rows)


-- kafka带有翻牌机制的任务状态(比普通任务多了账期等信息)
seaboxsql=# select * from dataflow.dataflow_status('kafka_test');
      status_name       |          status_value          
------------------------+--------------------------------
 flow_name              | great_test
 flow_type              | incr
 kafka_topic            | great_test
 offsets                | 0:2549579, 1:2518103, 2:1650207, 3:2075904, 4:2390207
 turnover_offsets       | 0:2505974, 1:2477590, 2:1624179, 3:2040369, 4:2351888
 end_offsets            | 0:4562393, 1:4512734, 2:2965291, 3:3676615, 4:4282965
 skiptables             | 
 insert_counts          | 1184001
 delete_counts          | 0
 kafka_turnover_status  | not started
 accounting_period_time | 2020-05-04 00:00:00
 source_queue           | 13 i, 101334 r, 28.0 MB, 
 sink_queue             | 0 i, 0 r, 0.0 B
 shared_status          | 0 i, 0 r, files 2.1 MB
 batch_status           | 16000 r, 1241 ms, 12882 msg/s
 load_table_num         | 1
 batch_load_time        | 0.509
 load_long_time         | 0.496
 load_long_time_sql     | insert into test1.csg_test0("f_a", "f_c", "f_d", "F_B", "f_e", "f_f", "account_period_time") select "f_a", "f_c", "f_d", "F_B", "f_e", "f_f", "account_period_time" from dataflow."great_test_test1_csg_test0_ix"
 task_create_time       | 2023-01-12 10:52:06
 task_begin_time        | 2023-01-12 10:53:57
 task_end_time          | 2023-01-12 10:56:50
 task_status            | running
 task_job_id            | 8238
(24 rows)
查看所有存在的 dataflow
  • 函数接口
select dataflow.dataflow_all();
  • 使用示例
seaboxsql=# select * from dataflow.dataflow_all();
   flow_name    | flow_type |             status             |     create_time     
----------------+-----------+--------------------------------+---------------------
 task1          | incr      | exited with info: task success | 2022-06-23 14:46:20
 default        | incr      | created                        | 2008-08-08 08:08:08
 common_default | incr      | created                        | 2022-06-23 14:46:01
(3 rows)
查看 dataflow 日志
  • 函数接口
select dataflow.dataflow_log(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 使用示例
seaboxsql=# select * from dataflow.dataflow_log('ora_digit_test');
        log_time         |  level   |                                                              logs                                                       
       
-------------------------+----------+-------------------------------------------------------------------------------------------------------------------------
-------
 2022-07-05 10:21:10.864 | info     | Log init!
 2022-07-05 10:21:10.864 | info     | Begin new oracle_flow for task 'ora_digit_test'
 2022-07-05 10:21:10.864 | info     | table 1 of 2 tables
 2022-07-05 10:21:10.864 | info     | Loading task for table scott.digit_test start ...
 2022-07-05 10:21:11.359 | info     | Table scott.digit_test will be split into 1 slices.
 2022-07-05 10:21:11.359 | info     | Table scott.digit_test start flowing ...
 2022-07-05 10:21:11.359 | info     | Target table for loading: oracle_test.digit_test
 2022-07-05 10:21:11.396 | info     | Create file: oracle_test.digit_test --- ora_digit_test_digit_test0.dat
 2022-07-05 10:21:11.396 | info     | task ora_digit_test: Oracle table scott.digit_test finished, time elapsed: 37ms, processed rows: 1, loading rate: 27reco
rds/s.
 2022-07-05 10:21:11.396 | critical | Oracle table scott.digit_test flow success! 
 2022-07-05 10:21:11.397 | info     | table 2 of 2 tables
 2022-07-05 10:21:11.397 | info     | Loading task for table scott.digit_test start ...
 2022-07-05 10:21:11.862 | info     | Table scott.digit_test will be split into 1 slices.
 2022-07-05 10:21:11.862 | info     | Table scott.digit_test start flowing ...
 2022-07-05 10:21:11.862 | info     | Target table for loading: oracle_test.digit_test_col
 2022-07-05 10:21:11.9   | info     | Create file: oracle_test.digit_test_col --- ora_digit_test_digit_test_col0.dat
 2022-07-05 10:21:11.9   | info     | task ora_digit_test: Oracle table scott.digit_test finished, time elapsed: 38ms, processed rows: 1, loading rate: 26reco
rds/s.
 2022-07-05 10:21:11.9   | critical | Oracle table scott.digit_test flow success! 
(20 rows)
  • scfs端产生的dataflow日志等级分为7级,默认为info
日志等级 作用
off 不启用日志
critial 执行中的严重错误
err 执行中的错误
warn 执行中的警告
info 执行中普通信息
debug 执行中详细信息
trace 执行中包括 SQL 的详细信息
  • 日志排错查看顺序
  • 首先根据dataflow_status()函数查看任务执行结果
  • 若是显示dataflow端错误,则可使用dataflow_log()函数来查看dataflow日志
  • 若是显示加载错误,则可去mpp日志的startup.log中查看
查看创建 dataflow 的 create 语句
  • 函数接口
select dataflow.ddlx_create_dataflow(flow_name text);
  • 参数说明
参数名 参数作用 是否必须 默认值
flow_name dataflow名称
  • 使用示例
    --- 如果存在,则可以生成创建语句
    seaboxsql=# select dataflow.ddlx_create_dataflow('kafka_test2');
             ddlx_create_dataflow          
    ---------------------------------------
     call dataflow_create(              +
             'kafka_test2',               +
             'incr',                      +
             'kafka_topic=OGG_GIS_GZ98_02'+
             'kafka_test1'                +
     );
    (1 row)
    
    --- 如果不存在,返回未定义对象(同ddlx_create函数结果)
    seaboxsql=# select dataflow.ddlx_create_dataflow('kafka_test3');
                ddlx_create_dataflow            
    --------------------------------------------
     -- CREATE UNIDENTIFIED OBJECT: kafka_test3
    (1 row)
    

dataflow 状态显示参数说明

  • flow_name
  • 说明:dataflow任务名称
  • 限制:无

  • flow_type

  • 说明:dataflow任务类型
  • 限制:无

  • kafka_topic

  • 说明:kafka加载的topic名称
  • 限制:kafka 显示

  • offsets

  • 说明:当前任务offsets简要信息(详细信息见 dataflow.kafka_offset 表)
  • 限制:kafka 显示

  • turnover_offsets

  • 说明:当前任务turnover_offsets简要信息(详细信息见 dataflow.kafka_offsets 表)
  • 限制:kafka配置的enable_flow_turn_batch = 1 或 enable_use_2phase = 1

  • end_offsets

  • 说明:当前任务end_offsets简要信息(详细信息见 dataflow.kafka_offsets 表)
  • 限制:kafka显示

  • skiptables

  • 说明:当前任务skiptables简要信息(详细信息见 dataflow.kafka_skip_tables 表)
  • 限制:kafka显示

  • insert_counts

  • 说明:当前任务插入数据条数统计(详细信息见 dataflow.kafka_statistics 表)
  • 限制:kafka显示

  • delete_counts

  • 说明:当前任务删除数据条数统计(详细信息见 dataflow.kafka_statistics 表)
  • 限制:kafka显示

  • kafka_turnover_status

  • 说明:kafka翻牌状态
  • 限制:kafka配置的enable_flow_turn_batch = 1 或 enable_use_2phase = 1

  • accounting_period_time

  • 说明:kafka账期列
  • 限制:kafka配置的enable_flow_turn_batch = 1 或 enable_use_2phase = 1

  • oracle_tables

  • 说明:加载的oracle表
  • 限制:oracle显示

  • mpp_tables

  • 说明:oracle表对应的mpp表
  • 限制:oracle显示

  • task_job_id

  • 说明:调用job的id号
  • 限制:无

  • task_create_time

  • 说明:任务创建时间(UTC)
  • 限制:无

  • task_begin_time

  • 说明:任务启动时间(UTC)
  • 限制:无

  • task_end_time

  • 说明:任务结束时间(UTC)
  • 限制:无

  • task_status

  • 说明:任务执行状态
  • 限制:无

普通用户使用 dataflow

  • 赋予权限
seaboxsql=# create extension plpython3u ;
CREATE EXTENSION
seaboxsql=# create extension dataflow ;
CREATE EXTENSION
seaboxsql=# create user testu1 with password '111';
CREATE ROLE
-- 数据库权限
seaboxsql=# grant ALL ON DATABASE seaboxsql TO testu1 ;
GRANT
-- 使用外部表权限
seaboxsql=# grant usage on foreign server scfs to testu1;
GRANT
  • 删除用户
-- 更改普通用户资源的拥有者
seaboxsql=# REASSIGN OWNED BY testu1 TO seabox;
seaboxsql=# DROP OWNED BY testu1;
-- 删除普通用户创建的 schema 和 table
seaboxsql=# DROP SCHEMA IF EXISTS testu1_permission cascade;
DROP SCHEMA
-- 回收数据库权限
seaboxsql=# revoke all on database regression from testu1;
REVOKE
-- 回收外部表权限
seaboxsql=# revoke usage on foreign server scfs from testu1;
REVOKE
seaboxsql=# drop user testu1;
DROP ROLE

全量同步 oracle-flow

用于同步 Oracle 数据库中的数据。

  • 使用 occi,效率高
  • 数据切分
  • 支持多表
  • 支持加载至文件
schema 与 table

schema: dataflow

任务需要的表和临时外部表保存在 dataflow 模式中,其中 dataflow_config 表保存任务的配置信息。

seaboxsql=# \dt dataflow.*
                        List of relations
 Schema  |        Name         | Type  | Owner  | Storage 
---------+---------------------+-------+--------+---------
dataflow | dataflow_config     | table | seabox | heap
dataflow | oracle_flow_results | table | seabox | heap
(2 rows)

table: oracle_flow_results

seaboxsql=# \d dataflow.oracle_flow_results 
               Table "dataflow.oracle_flow_results"
  Column   |            Type             | Collation | Nullable | Default 
-----------+-----------------------------+-----------+----------+---------
 time      | timestamp without time zone |           |          | 
 flow_name | text                        |           |          | 
 mpp_table | text                        |           |          | 
 message   | text                        |           |          | 
Distributed by: (mpp_table)

external table

每个 task 中的每个表都有对应的 external 表,自动创建,不需要人工干预,external 表与相应的目标表结构一致。

oracle-flow 参数说明
  • oracle_user
  • 说明:oracle 登录用户
  • 默认值:无

  • oracle_password

  • 说明:oracle 登录密码
  • 默认值:无

  • oracle_host

  • 说明:oracle host 地址, scfs服务使用该参数与oracle连接
  • 默认值:无

  • oracle_port

  • 说明:oracle 端口
  • 默认值:无

  • oracle_service

  • 说明:oracle 服务名
  • 默认值:无

  • oracle_tables

  • 说明:导入 oracle 的表名,可配置多张表,用分号分隔
  • 默认值:无

  • oracle_file_size

  • 说明:临时文件大小,单位MB
  • 默认值:100

  • oracle_output_file

  • 说明:只导出文件,不加载到 mpp 中
  • 默认值:0

  • oracle_truncate

  • 说明:导入前需要清空之前的表
  • 默认值:1

  • oracle_gbk_mode

  • 说明:oracle 端是 gbk 编码(-1:不是, 0:是,采用 oracle 内部转码, 1: 是,导入时转码)
  • 默认值:-1

  • oracle_clob_tochar

  • 说明:将 clob 字段转为 char 字段,会对源数据截断,上限 2000 字节,用于 oracle 内部转码后依旧有问题字符的情况
  • 默认值:0

  • oracle_reject_limit_count

  • 说明:加载错误条数阈值
  • 默认值:10

  • oracle_reject_limit_type

  • 说明:指定错误条数的方式, 可选 rows 或 percent
  • 默认值:rows

  • scfs_host

  • 说明:scfs 信息(格式host:port)
  • 默认值:无

  • mpp_tables

  • 说明:导入到mpp中的表名,个数需要和oracle_tables相同,以分号分隔
  • 默认值:无

  • mpp_schema

  • 说明:导入到mpp中的表的模式名,只能设置一个
  • 默认值:无

  • mpp_fetch_num

  • 说明:向scfs索要返回信息的最大值
  • 默认值:1

  • parser_delimiter

  • 说明:建立外部表的delimiter参数
  • 默认值:|

  • parser_escape

  • 说明:建立外部表的escape参数
  • 默认值:\

  • parser_enclosed

  • 说明:建立外部表的enclosed参数
  • 默认值:"

  • parser_null_value

  • 说明:建立外部表的null参数
  • 默认值:NULL

  • task_thread

  • 说明:oracle-flow 中表示并行向oracle端获取数据的线程数,可用于将大表切分并行加载; kafka-flow 中表示解析消息的线程数
  • 默认值:3

  • task_log_level

  • 说明:dataflow日志等级
  • 默认值:info

  • task_log_delimiter

  • 说明:dataflow 日志分隔符
  • 默认值:|
oracle-flow 使用
  • oracle 多表入库

oracle_tables和mpp_tables可以配置多张表,使用分号";"分隔,两端表的个数要保持一致;支持设置重复的表,如一张oracle表导入多张mpp表,或多张oracle表导入一张mpp表;使用oracle_truncate参数来指定导入前是否清空原mpp表,若有多张oracle表导入一张mpp表的需求,oracle_truncate应设置为0。

例:

call dataflow.dataflow_create('ora_default_test', 'full', 'oracle_user=test, oracle_password=test, oracle_host=soracle, oracle_port=1521, oracle_service=orcl, scfs_host=127.0.0.1:19588, oracle_tables=ora1;ora2, mpp_tables=mtable1;mtable2');
异常处理

对于数据源中的错误数据(格式不符、类型不符等),用户可以设定阈值参数,如果错误数据不超过设定阈值,数据可以正常加载,否则任务中断。相关配置参数为: - oracle_reject_limit_count - oracle_reject_limit_type

常见异常

  • null value in column "object_id" violates not-null constraint

object_id 约束为 not null, 数据该字段为 null, 无法插入,需要去掉该约束。

  • character set conversion to or from UCS2 failed

oracle 表里存在 clob、blob 类型的字段,在 mpp 里建成 text,如果建成 varchar 有可能超长。

  • scfs error - line too long in file ...

启动 scfs 时指定 -m 参数。

增量同步 kafka-flow

用于同步 kafka 中的流式数据。

  • 支持 json, csv
  • IDU 操作型数据处理
  • 黑/白名单过滤
  • 异常数据处理

支持流批转换,可按设置的账期频率自动生成增量 Insert 和 Delete 数据。

kafka-flow 工具说明

kafka-flow 支持同一 kafka topic中包含多个表的日志数据,此时不需要指定表名。通常情况,同一数据源建议将日志数据抽取到同一 topic 中。

kafka-flow 支持的日志数据格式如下(OGG 生成的格式):

{"table":"TEST.CSG_TEST0","op_type":"I","op_ts":"2020-05-27 17:25:16.000000","current_ts":"2020-05-27T17:25:15.721000","pos":"0000000000000000001","primary_keys":["F_A","F_B"],"after":{"F_A":"A001","F_B":111,"F_C":"C111","F_D":111,"F_E":10.20,"F_F":"2020-05-27 15:00:00"}}
{"table":"TEST.CSG_TEST0","op_type":"U","op_ts":"2020-05-27 17:25:16.000000","current_ts":"2020-05-27T17:25:15.721000","pos":"0000000000000000002","primary_keys":["F_A","F_B"],"before":{"F_A":"A001","F_B":111,"F_C":"C111","F_D":111,"F_E":10.20,"F_F":"2020-05-27 15:00:00"},"after":{"F_A":"A001","F_B":111,"F_C":"C1121","F_D":1121,"F_E":10.20,"F_F":"2020-05-27 15:00:00"}}
{"table":"TEST.CSG_TEST0","op_type":"D","op_ts":"2020-05-27 17:25:16.000000","current_ts":"2020-05-27T17:25:15.721000","pos":"0000000000000000003","primary_keys":["F_A","F_B"],"before":{"F_A":"A001","F_B":111,"F_C":"C1121","F_D":1121,"F_E":10.20,"F_F":"2020-05-27 15:00:00"}}
  • "table"

表明该条数据所属的 Oracle 表名,要求 json 数据中必须有该字段,配合参数 mpp_schema, 决定了该条数据被加载至的目标表。

  • "op_type"

该条数据的操作类型, 要求 json 数据中必须有该字段,操作类型分为 insert, delete 和 update, 通过 parser_insert_str, parser_delete_str, parser_update_str 三个参数配置。

  • "pos"

该条数据在 OGG 抽取数据文件中的偏移量,要求 json 数据中必须有该字段。主要用于在消费多分区数据时,对数据进行排序,保证数据有序。

  • "primary_keys"

主键。要求 json 数据中必须有该字段。kafka-flow 根据该字段进行数据删除和更新(先删除再插入)操作。

  • "before"/"after"

真实数据。该字段与操作类型有关,insert 只有 after, delete 只有 before, update 有 before 和 after.

schema 与 table

schema: dataflow

任务需要的表和临时外部表保存在 dataflow 模式中,其中 dataflow_config 表保存任务的配置信息, dataflow_task_status 表记录任务状态信息。

seaboxsql=# \dt dataflow.*
                     List of relations
  Schema  |         Name         | Type  | Owner  | Storage 
----------+----------------------+-------+--------+---------
 dataflow | dataflow_config      | table | seabox | heap
 dataflow | dataflow_task_status | table | seabox | heap
 dataflow | kafka_map_tables     | table | seabox | heap
 dataflow | kafka_offsets        | table | seabox | heap
 dataflow | kafka_skip_tables    | table | seabox | heap
 dataflow | kafka_statistics     | table | seabox | heap
(6 rows)

table: kafka_map_table

记录消息中的表 (msg_schema, msg_table) 加载到 mpp 中的哪张表 (map_schema, map_table) 中,和使用的外部表名称 (aux_table_name)

        Table "dataflow.kafka_map_tables"
     Column     | Type | Collation | Nullable | Default 
----------------+------+-----------+----------+---------
 flow_name      | text |           | not null | 
 topic_name     | text |           | not null | 
 msg_schema     | text |           | not null | 
 map_schema     | text |           |          | 
 msg_table      | text |           | not null | 
 map_table      | text |           |          | 
 aux_table_name | text |           |          | 
Indexes:
    "kafka_map_tables_pkey" PRIMARY KEY, btree (flow_name, topic_name, msg_schema, msg_table)

table: kafka_offsets

记录每个 task 所对应的 topic 信息,是监控、分析 kafka 实时消费进度的表。

seaboxsql=# \d dataflow.kafka_offsets
                      Table "dataflow.kafka_offsets"
        Column         |          Type          | Collation | Nullable | Default 
-----------------------+------------------------+-----------+----------+---------
 flow_name             | character varying(128) |           | not null | 
 topic_name            | character varying(128) |           | not null | 
 kafka_partition       | integer                |           | not null | 
 kafka_offset          | bigint                 |           |          | 
 kafka_turnover_offset | bigint                 |           |          | 
 kafka_end_offset      | bigint                 |           |          | 
Indexes:
    "kafka_offsets_pkey" PRIMARY KEY, btree (flow_name, topic_name, kafka_partition)
Distributed by: (topic_name)

table: kafka_skip_tables

记录每个 task 是否有 skip 的表,如果此表有记录需要人工介入处理,此表记录的 table 会停止 kafka 消费,因此要经常监控此表,避免有问题的表长时间无法接入数据。

seaboxsql=# \d dataflow.kafka_skip_tables 
              Table "dataflow.kafka_skip_tables"
   Column   |          Type          | Collation | Nullable | Default 
------------+------------------------+-----------+----------+---------
 table_name | character varying(128) |           |          | 
 flow_name  | character varying(128) |           |          | 
 topic_name | character varying(128) |           |          | 
 offsets    | character varying      |           |          | 
 error_msg  | text                   |           |          | 
Distributed by: (topic_name)

table: kafka_statistics

记录各个表每次更新数据的记录数,该表的数据可用于对外平台对接,反应数据实时变化情况。

seaboxsql=# \d dataflow.kafka_statistics 
                      Table "dataflow.kafka_statistics"
       Column        |            Type             | Collation | Nullable | Default 
---------------------+-----------------------------+-----------+----------+---------
 update_time         | character varying(128)      |           |          | 
 domain              | character varying(128)      |           |          | 
 sfdm                | character varying(128)      |           |          | 
 flow_name           | character varying(128)      |           |          | 
 topic               | character varying(128)      |           |          | 
 updatenum           | bigint                      |           |          | 
 table_name          | character varying(128)      |           |          | 
 account_period_time | timestamp without time zone |           |          | 
Distributed by: (topic)

foreign table

每个 task 中的每个表都有对应的 foreign 表,自动创建,不需要人工干预,foreign 表与相应的目标表结构一致。

seaboxsql=# \dE dataflow.*
                         List of relations
 Schema   |          Name           |     Type      | Owner  
----------+-------------------------+---------------+--------
 dataflow | dataflow_task_log       | foreign table | seabox
 dataflow | task_test_csg_test0_ix  | foreign table | seabox
 dataflow | task_test_csg_test0_dx  | foreign table | seabox
 dataflow | task_test_csg_test0_dcx | foreign table | seabox
(4 rows)
kafka-flow 参数说明
  • kafka_use_raw
  • 说明:默认 0 消费 json 格式数据,配置 1 消费 csv 格式数
  • 默认值:0

  • kafka_broker

  • 说明:Kafka broker 地址, scfs服务使用该参数连接kafka
  • 默认值:无

  • kafka_check_topic

  • 说明:是否检查 topic 的格式,配置 1 为检查,默认 0 不检查
  • 默认值:0

  • kafka_topic

  • 说明:Kafka 消费的 topic 名称(若配置 kafka_check_topic = 1 则检查 topic 的格式,格式为 OGG_XX_XX_XX 或 OGG_XX_XX_XX_XX; 否则不检查,格式无特殊要求)
  • 默认值:无

  • kafka_topic_partition

  • 说明:默认 -1 同时消费 kafka 的所有分区, 配置分区名则消费指定的一个分区(只能配置一个分区,如 0 )
  • 默认值:-1

  • kafka_topic_partition_offset

  • 说明:kafka 分区初始偏移量,若配置则从 kafka 的指定位置开始消费数据,否则从 0 开始
  • 默认值:无

  • kafka_partition_queue_size

  • 说明: 控制 rdkafka 队列大小, 简介能够减少内存,单分区需要调大
  • 默认值: 3000

  • kafka_consume_wait_time

  • 说明:等待 Kafka 准备好数据的时间,如果时间较短可能消费不到数据,单位ms
  • 默认值:100

  • kafka_consume_exit

  • 说明:等待消费数据超时后动作,配置 timeout 则超时退出,配置 never 则一直等待
  • 默认值:timeout

  • kafka_consume_timeout

  • 说明:超时时间,若 kafka-flow 长时间没有消费到 kafka 中数据且 kafka_consume_exit 配置为 timeout 则超过该超时时间退出,单位 ms
  • 默认值:6000

  • kafka_consume_group_id

  • 说明:kafka登录认证信息
  • 默认值:无

  • kafka_consume_sasl_user

  • 说明:kafka登录认证信息
  • 默认值:无

  • kafka_consume_sasl_passwd

  • 说明:kafka登录认证信息
  • 默认值:无

  • kafka_consume_sasl_mechanism

  • 说明:kafka支持sasl/scram认证的使用算法机制
  • 默认值:无

  • kafka_consume_keytab

  • 说明:kafka登录认证信息
  • 默认值:无

  • kafka_consume_principal

  • 说明:kafka登录认证信息
  • 默认值:无

  • kafka_batch_size

  • 说明:kafka-flow 消费时每批信息的数据量大小(条数)
  • 默认值:8000

  • kafka_sink_duration

  • 说明:kafka-flow sink 阶段写出文件的间隔时间,单位ms
  • 默认值:1000

  • kafka_mini_duration

  • 说明:kafka-flow souce 阶段数据攒批最长时间,没有达到 kafka_batch_size 的条数但到达该时间算一批,单位ms
  • 默认值:500

  • mpp_tables

  • 说明:在 kafka-flow csv 格式入库(kafka_use_raw = 1)时需要配置, kafka-flow json 格式入库不需要配置。多个表以分号分隔,如 "table1;table2:table3", 若有多个表则以第一个表为准
  • 默认值:无

  • mpp_schema

  • 说明:kafka 消息中的 schema 和 mpp 中 schema 的映射,例: ""、"*:*"、"schema"、"*:schema"、"a:b;*:c"、"a:b;*:*",若由多条必须有一条默认 "*:" 映射
  • 默认值:无

  • mpp_table_map

  • 说明:kafka 消息中表名和 mpp 中表名的映射,格式为 "table1:table2", 以分号分隔
  • 默认值:无

  • mpp_timestamp_column_name

  • 说明:辅助表入库时间列名,开启账期时 (enable_flow_turn_batch = 1) 需要配置(可以使用默认值),不开启账期则不需要配置(可以为空)
  • 默认值:indb_timestamp

  • mpp_delete_table_suffix

  • 说明:删除数据的辅助表,开启账期时 (enable_flow_turn_batch = 1) 需要配置(可以使用默认值),不开启账期则不需要配置(可以为空)
  • 默认值:_delete

  • mpp_insert_table_suffix

  • 说明:插入数据的辅助表,开启账期时 (enable_flow_turn_batch = 1) 需要配置(可以使用默认值),不开启账期则不需要配置(可以为空)
  • 默认值:_insert

  • mpp_missing_cols_value

  • 说明:缺列值,格式为 "table_name.col_name:value", 以分号分隔
  • 默认值:无

  • mpp_fetch_num

  • 说明:mpp 每次向 scfs 索要返回信息的数量上限
  • 默认值:1

  • mpp_skip_error_format

  • 说明:配置加载时需要跳过的错误信息,出错表加入 skip 表中,分号分隔
  • 默认值:无

  • mpp_exit_error_format

  • 说明:配置加载时需要退出的错误信息,分号分隔
  • 默认值:无

  • mpp_default_error_process

  • 说明:若加载出错信息未在以上两个参数中列出,则采用该默认处理方式 (skip: 跳过,exit: 退出)
  • 默认值:skip

  • mpp_prepare_sqls

  • 说明: 在加载前执行 session 级的 SQL 语句,一般用于修改参数来优化执行计划。例如: set enable_hashjoin=off;set enable_seqscan=off;
  • 默认值: 无

  • scfs_host

  • 说明:scfs 信息,格式为 host:port
  • 默认值:无

  • parser_delimiter

  • 说明:建立外部表的 delimiter 参数
  • 默认值:|

  • parser_escape

  • 说明:建立外部表的 escape 参数
  • 默认值:\

  • parser_enclosed

  • 说明:建立外部表的 enclosed 参数
  • 默认值:"

  • parser_null_value

  • 说明:建立外部表的 null 参数
  • 默认值:NULL

  • parser_insert_str

  • 说明:insert 消息的 op_type 类型
  • 默认值:I

  • parser_delete_str

  • 说明:delete 消息的 op_type 类型
  • 默认值:D

  • parser_update_str

  • 说明:update 消息的 op_type 类型
  • 默认值:U

  • parser_white_list

  • 说明:kakfa-flow 解析阶段的白名单(表名),若配置则只解析白名单中的表,以分号分隔
  • 默认值:无

  • parser_black_list

  • 说明:kakfa-flow 解析阶段的黑名单(表名),若配置则过滤黑名单中的表,以分号分隔
  • 默认值:无

  • parser_ignore_encoding_error

  • 说明:解析时忽略json的字符错误
  • 默认值:0

  • parser_error_skip

  • 说明:解析忽略 json 格式错误
  • 默认值:0

  • parser_check_row_format

  • 说明: 是否按行检查数据是否符合 JSON 格式
  • 默认值: 1

  • enable_check_extra_cols

  • 说明:检查消息中列多于mpp
  • 默认值:1

  • enable_statistics

  • 说明:是否开启统计(0 关闭,1 开启),若开启账期(enable_flow_turn_batch = 1)或开启两阶段入库(enable_use_2phase = 1)则需要开启 enable_statistics
  • 默认值:1

  • enable_flow_turn_batch

  • 说明:是否开启账期(0 关闭,1 开启,不能与两阶段同时开启)
  • 默认值:0

  • enable_use_2phase

  • 说明:是否开启两阶段入库(0 关闭,1 开启,不能与账期同时开启)
  • 默认值:0

  • accounting_period_time

  • 说明:当前账期,开启账期或两阶段时 (enable_flow_turn_batch = 1 or enable_use_2phase = 1) 需要配置(格式为 YYYY-MM-DD hh🇲🇲ss),不开启账期和两阶段则不需要配置(可以为空)
  • 默认值:无

  • accounting_period

  • 说明:账期周期,开启账期或两阶段时 (enable_flow_turn_batch = 1 or enable_use_2phase = 1) 需要配置(格式为 1Y,1M,1D,1H…),不开启账期和两阶段则不需要配置(可以为空)
  • 默认值:无

  • accounting_period_forward_time

  • 说明:账期时间延时时长,单位 ms, 开启账期或两阶段时 (enable_flow_turn_batch = 1 or enable_use_2phase = 1) 需要配置(可以使用默认值),不开启账期和两阶段则不需要配置(可以为空)
  • 默认值:0

  • accounting_period_time_column_name

  • 说明:新增账期字段名,开启账期或两阶段时 (enable_flow_turn_batch = 1 or enable_use_2phase = 1) 需要配置(可以使用默认值),不开启账期和两阶段则不需要配置(可以为空)
  • 默认值:account_period_time

  • turnover_delay_time

  • 说明:翻牌时间限制,kafka 中无数据时,当前时间超过账期时间一个周期后该时间触发翻牌,单位ms, 开启账期或两阶段时 (enable_flow_turn_batch = 1 or enable_use_2phase = 1) 需要配置(可以使用默认值),不开启账期和两阶段则不需要配置(可以为空)
  • 默认值:600

  • turnover_function_name

  • 说明:翻牌时调用的存储过程名,参数为(start_time TEXT, end_time TEXT, state integer), 开启账期或两阶段时 (enable_flow_turn_batch = 1 or enable_use_2phase = 1) 需要配置(可以使用默认值),不开启账期和两阶段则不需要配置(可以为空)
  • 默认值:seabox_dataflow_turnover_function

  • task_thread

  • 说明:并行解析的线程数
  • 默认值:3

  • task_log_level

  • 说明:日志等级
  • 默认值:info

  • task_log_delimiter

  • 说明:scfs日志分隔符
  • 默认值:|

  • schedule_interval

  • 说明:表示调度周期,对应sdSync中的interval参数,传入job的repeat_interval中,可在修改任务参数时更改,修改后立即生效,格式为1Y/y, 1M, 1 D/d, 1H/h, 1m, 1S/s, 且不超过99
  • 默认值:2m

  • debug_options

  • 说明: 用于调试, 可以覆盖或临时添加参数, 格式为 "opt1=value1;opt2=value2"
  • 默认值:无

  • debug_log_plan

  • 说明:用于调试,启动后日志中打印 DELETE 语句的执行计划
  • 默认值: 0

  • log_reorder_pos

  • 说明:用于调试,记录排序后的 POS 值
  • 默认值: 0

  • log_filter_skip_message

  • 说明:用于调试, 记录解析过程中跳过的数据
  • 默认值: 0

  • enable_source_trace

  • 说明: 是否启动 source 阶段保存数据
  • 默认值: 0

  • enable_parser_trace

  • 说明: 是否启动 parser 阶段保存数据
  • 默认值: 0

  • enable_sink_trace

  • 说明: 是否启动 sink 阶段保存数据
  • 默认值: 0

  • trace_time

  • 说明: 历史数据保存时间,单位 'h'(小时) 或 'd'(天)
  • 默认值: 1h

  • trace_file_path

  • 说明: 历史数据保存路径
  • 默认值: 空,表示使用 scfs 启动路径

  • trace_file_size

  • 说明: 历史数据文件存储上限,用于 source 和 parser 阶段,单位 'm'(MB) 或 'g'(GB)
  • 默认值: 1g

  • message_details_period

  • 说明: 报文处理记录时间周期, 可以配置为 '1m', '5m', '10m', '20m', '30m', '60m', '1h', '2h', '3h', '6h', '12h', '24h'
  • 默认值: 1h

  • kafka_hash_mode

  • 说明: kafka 端的数据哈希类型,可以配置为 'key'(按主键哈希) 或 'table'(按表哈希)
  • 默认值: key
kafka-flow 使用
  • kafka-flow 一般入库

一般模式下,kafka-flow 由 scfs 端连接 kafka 消费其中的数据并生成多个数据文件,同时 mpp 端消费文件,消费完之后删除文件。

例:

call dataflow.dataflow_create('task','incr','kafka_broker=127.0.0.1:9092,kafka_topic=OGG_GIS_GZ98_01,scfs_host=127.0.0.1:8282');
  • kafka-flow 开账期入库

用户想要每隔一段时间进行一次入库并做统计操作,比如下例为从某天某时开始,间隔为一天为一个账期,则加载完一天的数据后就会执行翻牌函数(默认为seabox_dataflow_turnover_function)进行统计。(注:账期和两阶段不能同时开启) 开启账期时,入库数据加上了时间列,同时加上了翻牌机制,超过当前账期一个账期周期后作一次统计。

例:

call dataflow.dataflow_create('task','incr','kafka_broker=127.0.0.1:9092,kafka_topic=OGG_GIS_GZ98_01,scfs_host=127.0.0.1:8282,enable_flow_turn_batch=1,accounting_period_time=2022-07-06 00:00:00,accounting_period=1D');
  • kafka-flow 两阶段入库

用户想要将指定账期的数据进行入库,但不想立即入库,而是等待账期内的数据加载消费完一起入库。(注:账期和两阶段不能同时开启) 开启两阶段入库时,scfs 端先生成文件,完成后再由 mpp 端消费文件,也有翻牌机制,与开启账期不同的是消费文件在翻牌时完成。

例:

call dataflow.dataflow_create('task','incr','kafka_broker=127.0.0.1:9092,kafka_topic=OGG_GIS_GZ98_01,scfs_host=127.0.0.1:8282,enable_use_2phase=1,accounting_period_time=2022-07-06 00:00:00,accounting_period=1D');

  • kafka-flow csv格式数据入库

kafka-flow 默认消费 json 格式数据,开启 kafka_use_raw 后,可以直接消费 csv 格式的数据。

例:

call dataflow.dataflow_create('task','incr','kafka_broker=127.0.0.1:9092,kafka_topic=OGG_GIS_GZ98_01,scfs_host=127.0.0.1:8282,kafka_use_raw=1,mpp_tables=test');
异常处理

常见异常

  • kinit: Internal ... credentials

kafka kerberos 异常。

  • Sink error Can't find table *.*

MPP 数据库中未创建目标表。

  • ERROR: missing data for column

目标表字段与 json 数据中字段不匹配。

  • Parse JSON error Invalid value. Offset: 111

获取到 kafka 上的数据, json 解析失败。

  • error is ERROR: invalid input syntax for type numeric

字段类型与数据文件不匹配,或表结构与数据文件不匹配。

注意事项
  • kafka-flow 实时同步 update, delete 数据使用主键字段将目标表与临时表进行关联,因此要求同步的表必须具备主键约束。目标表端不需要创建主键,但必须创建使用主键字段的索引(支持复合主键)。

  • kafka-flow 工具不具备 DDL 同步能力,因此使用 kafka-flow 需提前在目标端创建与源端相匹配的表,确保表名一致,字段类型匹配;同步进行时,源端表结构发生变化后,如增减字段或字段类型变化都会导致 kafka-flow 任务失败,需要手工处理。

  • 如果使用 set search_path 切换到其它 schema 下,调用时请加上 public 模式,如 call public.dataflow_create() 创建任务

字段类型对应

Oracle 类型 SeaboxMPP 类型 说明
char char 默认长度单位为字节,SeaboxMPP 长度单位为字符
varchar2 varchar 默认长度单位为字节,SeaboxMPP 长度单位为字符
number int/bigint/numeric 需按照不同长度、精度考虑
float double
BINARY_float real
BINARY_double double
date Timestamp without time zone
Timestamp Timestamp