数据存储服务
创建数据源¶
数据库可以通过sql将同一类型的数据源抽象为一个服务(create server),后续的copy加载或创建外部表都可以通过该server名来访问数据。
将一些通用的参数例如format
delimiter
等定义到server上,后续进行的任务都继承此参数,表定义或COPY语句中不需要再重复指定。
create server scfs_server foreign data wrapper scfs_fdw options (host 'localhost', port '8081', delimiter ',');
create user mapping for public server scfs_server;
-- t1,t2为已经创建好的表
-- delimiter和server定义中的一致,参数从server中继承
copy t1 from 'fdw://scfs_server/t1_data.txt' (format 'text', NULL ' ');
-- delimiter和server定义不同,指定后覆盖server中的同名参数
copy t2 from 'fdw://scfs_server/t2_data.csv' (format 'csv', delimiter '|', NULL ' ');
-- 创建FOREIGN TABLE
-- delimiter和server定义中的一致,参数从server中继承
create foreign table t3(a int, b varchar(20))server scfs_server options(resource 't1_data.csv', format 'text', NULL ' ');
-- delimiter和server定义不同,指定后覆盖server中的同名参数
create foreign table t4(a int, b varchar(20))server scfs_server options(resource 't2_data.csv', format 'csv', delimiter '|', NULL ' ');
SeaboxMPP内置了很多扩展,使用这些扩展可以定义多种服务(server),和各种数据源交互。
插件(FDW) | 支持连接的数据源 | 数据源上的数据格式 | 是否支持写 |
---|---|---|---|
scfs_fdw | 支持SeaboxMPP文件服务器(scfs,scfss) | csv, txt 支持压缩(gzip,bzip2) 支持加密传输(ssl) |
是(文本,gzip) |
http_fdw | apache/nginx等http服务器 | csv, txt | 否 |
ftp_fdw | vsftp,filezilla等ftp服务器 | csv, txt | 是(文本) |
seabox_fdw | seaboxdb ,seaboxmpp | 表,query | 否 |
gp_fdw | gpdb | 表,query | 否 |
pg_fdw | postgresql | 表,query | 否 |
kafka_fdw | kafka | csv, txt,json | 否 |
oracle_fdw | oracle | 表 | 否 |
dameng_fdw | dameng | 表 | 否 |
pxf_fdw(系列fdw) | 连接pxf_server,配置pxf_server连接不同的数据源 | text, parquet, orc, hive, hbase | 是 |
hdfs_fdw | hdfs数据源 | txt, parquet, orc | 是(文本) |
hive_fdw | hive数据源 | hive表 | 否 |
mysql_fdw | mysql | mysql | 是 |
odbc_fdw | 支持odbc访问的数据库 | 数据库 | 否 |
s3_fdw | 访问s3上的数据 | text,parquet,parquettext | 否 |
数据源使用¶
本节中提到的t1/t2为SeaboxMpp中已经创建好的表。
scfs¶
Seabox文件服务器,负责分发数据,支持的数据格式为文本文件和压缩文件。
参数说明¶
-
host : IP地址/hostname
-
port : scfs使用-p参数指定的端口号
-
resource :文件路径
使用说明¶
create server scfs_server foreign data wrapper scfs_fdw options (host 'localhost', port '8081');
-- 从数据源中copy到数据库
copy t1 from 'fdw://scfs_server/a.txt' (format 'csv', delimiter '|');
-- 外部表
create foreign table t ( a int, b varchar(10)) server scfs_server options(resource 'a.txt', delimiter '|');
insert into t select * from t1;
http¶
可以使用http服务访问的文本文件。
参数说明¶
- host : IP地址/hostname
- port : HTTP端口号
- resource : 文件路径
使用说明¶
create server http_server foreign data wrapper http_fdw options (host 'localhost', port '8080', format 'txt');
-- 从数据源中copy到数据库
copy t1 from 'fdw://http_server/a.txt' (format 'csv', delimiter '|');
-- 外部表
create foreign table t3 ( a int, b varchar(10)) server http_server options(resource 'a.txt', delimiter '|');
insert into t1 select * from t3;
ftp¶
可以使用ftp服务访问的文本文件。
参数说明¶
- host : IP地址/hostname
- port : ftp端口号,默认21
- username : 用户名,对应操作系统的一个用户名
- password : 用户名对应的密码
- resource : 文件路径
使用说明¶
create server ftp_server foreign data wrapper ftp_fdw options (host 'localhost', port '21', username 'seabox', password 'seabox');
-- 从数据源中copy到数据库
copy t1 from 'fdw://ftp_server/a.txt' (format 'csv', delimiter '|');
-- 外部表
create foreign table t3 ( a int, b varchar(10)) server ftp_server options(resource 'a.txt', delimiter '|');
insert into t1 select * from t3;
seabox¶
数据源为Seabox集群或者单机,此类的数据源并行加载的性能和源的executor数有关,目标集群的executor数比源集群多能充分并行。
参数说明¶
- host : 源IP地址或hostname
- port : 源集群coordinator端口号
- user : 源集群登录用户名
- user : 源集群password
- dbname : 源集群要连接的数据库
- resource:源集群表名
使用说明¶
create extension seabox_fdw;
create server seaboxmpp_server foreign data wrapper seabox_fdw options (host '127.0.0.1', port '3000', user 'seabox', password '', dbname 'seaboxsql');
-- 将数据源集群的t2表加载到本集群的t1表中
copy t1 from 'fdw://seaboxmpp_server/t2';
-- 源数据库集群的表t2
create foreign table t (a int, b varchar(10)) server seaboxmpp_server options(resource 't2');
insert into t select * from t2;
-- 源数据库上的sql语句,此类外部表没有并行。
create foreign table t (a int, b varchar(10)) server seaboxmpp_server options(resource 'select a,c from t1 join t2 on a = b');
insert into t select * from t2;
gpdb¶
数据源为gpdb集群,此类的数据源并行加载的性能和源集群的segment数有关,本集群的executor数比源集群多能充分并行。
参数说明¶
- host : 源集群IP地址或hostname
- port : 源集群master端口号
- user : 源集群登录用户名
- password : 源集群password
- dbname : 源集群要连接的数据库
- resource:源集群表名
使用说明¶
create extension gp_fdw;
create server gpdb_server foreign data wrapper gp_fdw options (host '127.0.0.1', port '7300', user 'postgres', password '', dbname 'postgres');
-- 将数据源集群的t2表加载到本集群的t1表中
copy t1 from 'fdw://gpdb_server/t2';
-- 源数据库集群的表t2
create foreign table t (a int, b varchar(10)) server gpdb_server options(resource 't2');
insert into t select * from t2;
-- 源数据库上的sql语句,此类外部表没有并行。
create foreign table t (a int, b varchar(10)) server gpdb_server options(resource 'select a,c from t1 join t2 on a = b');
insert into t select * from t2;
postgres¶
数据源为postgres,由集群中的一个executor负责从postgres中读数据
参数说明¶
- host : IP地址或hostname
- port : 端口号
- user : 登录用户名
- user : password
- dbname : 数据库名
- resource:源集群表名
使用说明¶
create extension pg_fdw;
create server postgres_server foreign data wrapper pg_fdw options (host '127.0.0.1', port '7300', user 'postgres', password '', dbname 'postgres');
-- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://postgres_server/t2';
-- 源数据库集群的表t2
create foreign table t (a int, b varchar(10)) server postgres_server options(resource 't2');
insert into t select * from t2;
create foreign table t (a int, b varchar(10)) server postgres_server options(resource 'select a,c from t1 join t2 on a = b');
insert into t select * from t2;
kafka消息中间件¶
数据源为kafka集群,每个executor从一个或者多个partition读取数据
kafka中存储的数据格式不同,接受的参数会有所不同,文本文件参考COPY
语法。
参数说明¶
- brokers: broker列表
- resource:源topic名称
- topic: 同resource一样,选一种即可
使用说明¶
-- 创建数据源
create extension kafka_fdw;
-- 创建数据源服务
create server kafka_server foreign data wrapper kafka_fdw options (brokers '172.17.0.7:9092,172.17.0.8:9092');
-- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://kafka_server/topic1' (delimiter '|', format 'csv');
-- 源数据库集群的表t2
create foreign table t (a int, b varchar(10)) server kafka_server options(resource 'topic1');
create foreign table t (a int, b varchar(10)) server kafka_server options(topic 'topic1');
insert into t select * from t2;
oracle数据源¶
数据源为oracle数据库,coordinator将表进行分割,每个executor处理表的一部分,以达到并行加速的目的。
参数说明¶
- host: 域名
- port: 端口
- service: 服务名
- server: 服务器名
- user: 用户名
- password: 用户密码
- table/resource: oracle表名
- encoding: Oracle数据库编码
使用说明¶
--- 创建数据源
create extension oracle_fdw;
--- 创建数据源服务
create server oracle_server foreign data wrapper oracle_fdw options(host '192.168.0.28', port '1521', service 'orcl', user 'test', password 'test');
--- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://oracle_server/t2';
--- 导入gbk编码的表
copy t3 from 'fdw://oracle_server/t4' (encoding 'gbk');
--- 创建外部表
create foreign table ftest(a int) server oracle_server options(table 't2');
insert into t1 select * from ftest;
dameng数据源¶
数据源为dameng数据库,coordinator将表进行分割,每个executor处理表的一部分,以达到并行加速的目的。
参数说明¶
- host: 域名
- port: 端口
- user: 用户名
- password: 用户密码
- table/resource: dameng表名
- encoding: dameng数据库编码
使用说明¶
--- 创建数据源
create extension dameng_fdw;
--- 创建数据源服务
CREATE SERVER dameng_server FOREIGN DATA WRAPPER dameng_fdw OPTIONS(host '192.168.0.28', port '5236', user 'test', password 'damengtest');
--- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://dameng_server/t2';
--- 导入gbk编码的表
copy t3 from 'fdw://dameng_server/t4' (encoding 'gbk');
--- 创建外部表
create foreign table ftest(a int) server dameng_server options(table 't2');
insert into t1 select * from ftest;
PXF服务¶
pxf支持的数据源都支持抽象为server,详细的描述参见使用PXF访问外部数据
create server hive_test foreign data wrapper hive_pxf_fdw options (host 'test-3', port '10000'); --HIVE source
create server hdfs_test foreign data wrapper hdfs_pxf_fdw options (host 'test-3', port '8020'); --HDFS source
hdfs数据源¶
- 导入: 支持直接获取hdfs上的格式为text/parquet/orc的数据,mpp的executors并行拉取hdfs上的数据来提升速度.
- 导出: 同时支持seaboxmpp上的数据导出到hdfs上的指定目录中,支持导出为csv,text格式。
hdfs导入¶
hadoop一般会采用kerberos访问控制机制,这里分成kerberos和非keberos两个类别进行举例说明
- kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server
-- host:namenode接入点hostname,当启用高可用namenode时,配置为可用nn 列表,自动选择active的nn进行连接
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证,
-- principal: kerberbos中的用户身份
-- keytab: kerberbos中的principal和加密密钥组合的存储文件
create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp04.seabox.com',port '8020',mechanism 'GSSAPI',principal 'ci-test@SEABOX.COM', keytab '/home/seabox/ci-test.keytab');
create foreign table t (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/lineitem_small_parquet',format 'parquet');
create foreign table t (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/lineitem_small_parquet',format 'orc');
create foreign table t (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/lineitem_small_parquet',format 'text');
-- 本地数据表入库,也可以直接查询外部表
create foreign table t_input (a int, b varchar(10)) using scolumn;
insert into t_input select * from t;
- 非kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server
-- host:namenode接入点hostname,当启用高可用namenode时,配置为多nn点,自动选择active的nn进行连接
-- port:namenode服务端口号
create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp01.seabox.com,hdp02.seabox.com', port '8020');
create foreign table t (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/lineitem_small_parquet',format 'parquet');
create foreign table t (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/lineitem_small_parquet',format 'orc');
create foreign table t (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/lineitem_small_parquet',format 'text');
-- 本地数据表入库,也可以直接查询外部表
create foreign table t_input (a int, b varchar(10)) using scolumn;
insert into t_input select * from t;
hdfs导出¶
hadoop一般会采用kerberos访问控制机制,这里分成kerberos和非keberos两个类别进行举例说明
- kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server
-- host:namenode接入点hostname,当启用高可用namenode时,配置为可用nn 列表,自动选择active的nn进行连接
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证,
-- principal: kerberbos中的用户身份
-- keytab: kerberbos中的principal和加密密钥组合的存储文件
create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp04.seabox.com',port '8020',mechanism 'GSSAPI',principal 'ci-test@SEABOX.COM', keytab '/home/seabox/ci-test.keytab');
-- 创建外部表写入对象
-- resource: 必须是一个目录,否则报错,如果目录不是空的,默认行为是先清空目录下所有的内容,然后再写入。注意此hdfs目录要赋予seabox用户写权限。
-- format: 可配置为csv,text。
-- clean_dir: 可忽略不设置,可配置参数false,true,如果配置false时,目标目录非空,则会报错退出, 如果配置为true,清空目标目录内容,默认为true
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证。
-- principal: kerberbos中的用户身份
-- keytab: kerberbos中的principal和加密密钥组合的存储文件
create foreign table t_csv (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/test_write/',format 'csv' , clean_dir 'true');
create foreign table t_csv (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/test_write/',format 'text' , clean_dir 'true');
-- 导出seaboxmpp的内部表到hdfs上。
insert into t_csv select * from t_source;
select * from t_csv;
- 非kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server
-- host:namenode接入点hostname,当启用高可用namenode时,配置为多nn点,自动选择active的nn进行连接
-- port:namenode服务端口号
create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp01.seabox.com,hdp02.seabox.com', port '8020');
-- 创建外部表写入对象
-- resource: 必须是一个目录,否则报错,如果目录不是空的,默认行为是先清空目录下所有的内容,然后再写入。注意此hdfs目录要赋予seabox用户写权限。
-- is_writable: 是否为可写外部表,可写外部表也可以直接查询数据。
-- format: 可配置为csv,text。
-- clean_dir: 可忽略不设置,可配置参数false,true,如果配置false时,目标目录非空,则会报错退出, 如果配置为true,清空目标目录内容,默认为true
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证。
-- principal: kerberbos中的用户身份
-- keytab: kerberbos中的principal和加密密钥组合的存储文件
create foreign table t_csv (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/test_write/',format 'csv' , clean_dir 'true');
create foreign table t_csv (a int, b varchar(10)) server hdfs_server options(resource '/hive/warehouse/test_write/',format 'text' , clean_dir 'true');
-- 导出seaboxmpp的内部表到hdfs上。
insert into t_csv select * from t_source;
select * from t_csv;
hive数据源¶
直接获取hive数据源, 格式为text/parquet/orc的数据,通过目标mpp的executors并行拉取hdfs上的数据来提升速度。其他格式数据直接走hive的客户端链接获取数据。
hadoop一般会采用kerberos访问控制机制,这里分成kerberos和非keberos两个类别进行举例说明。
- kerberos
-- 创建 hive_server
-- host: hiveServer2的服务的host,
-- port: hive服务端口号
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证, 'PLAIN'为用户名密码认证
-- principal: 用户在kerberbos中的身份
-- keytab: kerberbos中的principal和加密密钥组合的存储文件
-- service,sfqdn:用户要访问的kerberos内的服务, 这两部分组成其在kerberos中的principal:service/sfqdn
create extension hive_fdw;
create server hive_server foreign data wrapper hive_fdw options (host 'hdp04.seabox.com', port '10000', mechanism 'GSSAPI', service 'hive', sfqdn 'hdp04.seabox.com', principal 'ci-test@SEABOX.COM', keytab '/home/seabox/ci-test.keytab');
create foreign table tf (a int, b varchar(10)) server hive_server options(resource 'lineitem_small_parquet');
insert into t select * from tf;
- 非kerberos
-- 创建 hive_server
-- host: hiveServer2的服务的host,
-- port: hive服务端口号
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证, 'PLAIN'为用户名密码认证
-- namenodes: hdfs的namenodes的可选list,从中选择active的namenode进行连接
-- user,password: 用户名,密码,
create extension hive_fdw;
create server hive_server foreign data wrapper hive_fdw options (host 'hdp01.seabox.com', port '10000', mechanism 'PLAIN', namenodes 'hdp01.seabox.com,hdp02.seabox.com')
create foreign table tf (a int, b varchar(10)) server hive_server options(resource 'lineitem_small_parquet');
insert into t select * from tf;
mysql数据源¶
数据源为mysql数据库, 如果是partition表会根据partition进行分割,每个executor处理部分分区,普通表只有一个executor参与抽取。
参数说明¶
- host: 域名
- port: 端口
- username: 用户名
- password: 用户密码
- dbname: 库名
- table_name: 要访问的表名
- encoding: mysql数据库编码
使用说明¶
create extension if not exists mysql_fdw;
create server mysql_svr foreign data wrapper mysql_fdw options (host '192.168.0.28', port '3306');
create user mapping for public server mysql_svr options (username 'test', password 'test');
create foreign table tf(a int, b varchar(20)) server mysql_svr options (dbname 'mysql', table_name 'mysql_test');
--- 导入,t为seaboxmpp的表
insert into t select * from tf;
--- 导出到MySQL数据库
insert into tf values(1,'test');
使用odbc访问数据库¶
大多数的数据库都提供了odbc驱动,正确的配置后提供了一种一致的访问数据库的方法。
要使用odbc_fdw需要宿主机安装unixODBC,并且安装了要访问数据库对应的驱动。
在centos
上需要配置/etc/odbcinst.ini和/etc/odbc.ini。
odbc_fdw扩展只支持导入。
odbcinst¶
配置驱动的位置
[mysql]
Description = ODBC for MySQL
Driver = /usr/local/lib/libmyodbc8a.so
FileUsage = 1
odbc¶
配置数据源的访问方法
[mysql]
Description= mysql odbc driver setup
Driver=mysql
Database=citest
Server=192.168.0.28
参数说明¶
- dsn: 在odbc一节中配置的数据源名称
- user: 用户名
- password: 用户密码
使用说明¶
--- 创建数据源
create extension if not exists odbc_fdw;
--- 创建数据源服务
create server mysql foreign data wrapper odbc_fdw options(dsn 'mysql', user 'citest', password '111111');
--- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://mysql/t2';
--- 使用外部表
create foreign table ftest(a int) server mysql options(table 't2');
insert into t1 select * from ftest;
select * from ftest;
s3数据源¶
S3提供了对象(object)存储服务,数据文件可以通过s3的resetful接口进行访问。 支持文本和parquet格式的文件数据。
参数说明¶
- host: s3存储的访问地址和端口号
- bucket: 存放数据的存储桶
- prefix: 数据的存储路径,如果路径类似一个文件夹,其下面的所有文件都会被访问
- resource: 和prefix意义相同
- accessid:用户的访问ID
- secret: 用户的访问密钥
- partkeys: 此参数中的列的值是从数据存储的路径中获取
- format: 支持的文件数据格式
csv/text/parquet/parquettext
(parquettext格式指的是所有列都是字符串类型的parquet文件)
全局参数说明¶
- foreign_use_file_column_name : 设置此参数可以用控制是否使用parquet文件中的列名字来对应数据库表中的列。
parquet文件中的列顺序:
a |b |c
stringa|stringb|stringc
建表:
create table (a text, c text, b text);
set foreign_use_file_column_name = off;
加载后查询的结果为:
a |c |b
stringa|stringb|stringc
set foreign_use_file_column_name = on;
加载后查询的结果为:
a |c |b
stringa|stringc|stringb
使用说明¶
--- 创建数据源
create extension s3_fdw;
--- 创建数据源服务
create server s3 foreign data wrapper s3_fdw options (host 'ss3:9000', bucket 'citest', accessid 'admin', secret 'adminadmin');
--- 将s3上citest存储桶下面的data/a.txt对象加载到本集群的t1表中
copy t1 from 'fdw://s3/data/a.txt';
--- 创建外部表访问s3上的文件
create foreign table test(a int, b varchar(20)) server s3 options(prefix 'parquet/test', format 'parquet');
create foreign table testtext(a int, b varchar(20)) server s3 options(prefix 'csv/a.csv', format 'csv');
--- 创建外部表访问s3上的文件,此表中的c列的值是路径上的`20230210`,代表一个时间分区
create foreign table part(a int, b varchar(20), c date) server s3 options(prefix 'parquet/part/20230210/100.parquet', format 'parquet', partkeys 'd');