跳转至

数据存储服务

创建数据源

数据库可以通过sql将同一类型的数据源抽象为一个服务(create server),后续的copy加载或创建外部表都可以通过该server名来访问数据。 将一些通用的参数例如format delimiter等定义到server上,后续进行的任务都继承此参数,表定义或COPY语句中不需要再重复指定。

create server scfs_server foreign data wrapper scfs_fdw options (host 'localhost', port '8081', delimiter ',');
create user mapping for public server scfs_server;

-- t1,t2为已经创建好的表
-- delimiter和server定义中的一致,参数从server中继承
copy t1 from 'fdw://scfs_server/t1_data.txt' (format 'text', NULL ' ');

-- delimiter和server定义不同,指定后覆盖server中的同名参数
copy t2 from 'fdw://scfs_server/t2_data.csv' (format 'csv', delimiter '|', NULL ' ');                                 

-- 创建FOREIGN TABLE

-- delimiter和server定义中的一致,参数从server中继承
create foreign table t3(a int, b varchar(20))server scfs_server options(resource 't1_data.csv',  format 'text', NULL ' ');

-- delimiter和server定义不同,指定后覆盖server中的同名参数
create foreign table t4(a int, b varchar(20))server scfs_server options(resource 't2_data.csv',  format 'csv', delimiter '|', NULL ' ');

SeaboxMPP内置了很多扩展,使用这些扩展可以定义多种服务(server),和各种数据源交互。

插件(FDW) 支持连接的数据源 数据源上的数据格式 是否支持写
scfs_fdw 支持SeaboxMPP文件服务器(scfs,scfss) csv, txt
支持压缩(gzip,bzip2)
支持加密传输(ssl)
是(文本,gzip)
http_fdw apache/nginx等http服务器 csv, txt
ftp_fdw vsftp,filezilla等ftp服务器 csv, txt 是(文本)
seabox_fdw seaboxdb ,seaboxmpp 表,query
gp_fdw gpdb 表,query
pg_fdw postgresql 表,query
kafka_fdw kafka csv, txt,json
oracle_fdw oracle
dameng_fdw dameng
pxf_fdw(系列fdw) 连接pxf_server,配置pxf_server连接不同的数据源 text, parquet, orc, hive, hbase
hdfs_fdw hdfs数据源 txt, parquet, orc 是(文本)
hive_fdw hive数据源 hive表
mysql_fdw mysql mysql
odbc_fdw 支持odbc访问的数据库 数据库
s3_fdw 访问s3上的数据 text,parquet,parquettext

数据源使用

本节中提到的t1/t2为SeaboxMpp中已经创建好的表。

scfs

Seabox文件服务器,负责分发数据,支持的数据格式为文本文件和压缩文件。

参数说明
  • host : IP地址/hostname

  • port : scfs使用-p参数指定的端口号

  • resource :文件路径

使用说明
create server scfs_server foreign data wrapper scfs_fdw options (host 'localhost', port '8081');

-- 从数据源中copy到数据库
copy t1 from 'fdw://scfs_server/a.txt' (format 'csv', delimiter '|'); 

-- 外部表 
create foreign table t ( a int, b  varchar(10)) server scfs_server  options(resource 'a.txt', delimiter '|'); 
insert into t  select * from t1;

http

可以使用http服务访问的文本文件。

参数说明
  • host : IP地址/hostname
  • port : HTTP端口号
  • resource : 文件路径
使用说明
create server http_server foreign data wrapper http_fdw options (host 'localhost', port '8080', format 'txt');

-- 从数据源中copy到数据库
copy t1 from 'fdw://http_server/a.txt' (format 'csv', delimiter '|');

-- 外部表 
create foreign table t3 ( a int, b  varchar(10)) server http_server  options(resource 'a.txt', delimiter '|'); 
insert into t1  select * from t3;

ftp

可以使用ftp服务访问的文本文件。

参数说明
  • host : IP地址/hostname
  • port : ftp端口号,默认21
  • username : 用户名,对应操作系统的一个用户名
  • password : 用户名对应的密码
  • resource : 文件路径
使用说明
create server ftp_server foreign data wrapper ftp_fdw options (host 'localhost', port '21', username 'seabox', password 'seabox');

-- 从数据源中copy到数据库
copy t1 from 'fdw://ftp_server/a.txt' (format 'csv', delimiter '|');

-- 外部表 
create foreign table t3 ( a int, b  varchar(10)) server ftp_server  options(resource 'a.txt', delimiter '|'); 
insert into t1  select * from t3;

seabox

数据源为Seabox集群或者单机,此类的数据源并行加载的性能和源的executor数有关,目标集群的executor数比源集群多能充分并行。

参数说明
  • host : 源IP地址或hostname
  • port : 源集群coordinator端口号
  • user : 源集群登录用户名
  • user : 源集群password
  • dbname : 源集群要连接的数据库
  • resource:源集群表名
使用说明
create extension seabox_fdw;

create server seaboxmpp_server foreign data wrapper seabox_fdw options (host '127.0.0.1', port '3000', user 'seabox', password '', dbname 'seaboxsql');

-- 将数据源集群的t2表加载到本集群的t1表中
copy t1 from 'fdw://seaboxmpp_server/t2';

-- 源数据库集群的表t2 
create foreign table t (a int, b  varchar(10)) server seaboxmpp_server  options(resource 't2'); 
insert into t  select * from t2;

-- 源数据库上的sql语句,此类外部表没有并行。
create foreign table t (a int, b  varchar(10)) server seaboxmpp_server  options(resource 'select  a,c from t1 join t2 on a = b'); 
insert into t  select * from t2;

gpdb

数据源为gpdb集群,此类的数据源并行加载的性能和源集群的segment数有关,本集群的executor数比源集群多能充分并行。

参数说明
  • host : 源集群IP地址或hostname
  • port : 源集群master端口号
  • user : 源集群登录用户名
  • password : 源集群password
  • dbname : 源集群要连接的数据库
  • resource:源集群表名
使用说明
create extension gp_fdw;

create server gpdb_server foreign data wrapper gp_fdw options (host '127.0.0.1', port '7300', user 'postgres', password '', dbname 'postgres');

-- 将数据源集群的t2表加载到本集群的t1表中
copy t1 from 'fdw://gpdb_server/t2';

-- 源数据库集群的表t2 
create foreign table t (a int, b  varchar(10)) server gpdb_server  options(resource 't2'); 
insert into t  select * from t2;

-- 源数据库上的sql语句,此类外部表没有并行。
create foreign table t (a int, b  varchar(10)) server gpdb_server  options(resource 'select  a,c from t1 join t2 on a = b'); 
insert into t  select * from t2;

postgres

数据源为postgres,由集群中的一个executor负责从postgres中读数据

参数说明
  • host : IP地址或hostname
  • port : 端口号
  • user : 登录用户名
  • user : password
  • dbname : 数据库名
  • resource:源集群表名
使用说明
create extension pg_fdw;

create server postgres_server foreign data wrapper pg_fdw options (host '127.0.0.1', port '7300', user 'postgres', password '', dbname 'postgres');

-- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://postgres_server/t2';

-- 源数据库集群的表t2 
create foreign table t (a int, b  varchar(10)) server postgres_server  options(resource 't2'); 
insert into t  select * from t2;

create foreign table t (a int, b  varchar(10)) server postgres_server  options(resource 'select  a,c from t1 join t2 on a = b'); 
insert into t  select * from t2;

kafka消息中间件

数据源为kafka集群,每个executor从一个或者多个partition读取数据 kafka中存储的数据格式不同,接受的参数会有所不同,文本文件参考COPY语法。

参数说明
  • brokers: broker列表
  • resource:源topic名称
  • topic: 同resource一样,选一种即可
使用说明
-- 创建数据源
create extension kafka_fdw;

-- 创建数据源服务
create server kafka_server foreign data wrapper kafka_fdw options (brokers '172.17.0.7:9092,172.17.0.8:9092');

-- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://kafka_server/topic1' (delimiter '|', format  'csv');

-- 源数据库集群的表t2 
create foreign table t (a int, b  varchar(10)) server kafka_server  options(resource 'topic1'); 
create foreign table t (a int, b  varchar(10)) server kafka_server  options(topic 'topic1'); 
insert into t  select * from t2;

oracle数据源

数据源为oracle数据库,coordinator将表进行分割,每个executor处理表的一部分,以达到并行加速的目的。

参数说明
  • host: 域名
  • port: 端口
  • service: 服务名
  • server: 服务器名
  • user: 用户名
  • password: 用户密码
  • table/resource: oracle表名
  • encoding: Oracle数据库编码
使用说明
--- 创建数据源
create extension oracle_fdw;

--- 创建数据源服务
create server oracle_server foreign data wrapper oracle_fdw options(host '192.168.0.28', port '1521', service 'orcl', user 'test', password 'test');

--- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://oracle_server/t2';

--- 导入gbk编码的表
copy t3 from 'fdw://oracle_server/t4' (encoding 'gbk');

--- 创建外部表
create foreign table ftest(a int) server oracle_server options(table 't2');
insert into t1 select * from ftest;

dameng数据源

数据源为dameng数据库,coordinator将表进行分割,每个executor处理表的一部分,以达到并行加速的目的。

参数说明
  • host: 域名
  • port: 端口
  • user: 用户名
  • password: 用户密码
  • table/resource: dameng表名
  • encoding: dameng数据库编码
使用说明
--- 创建数据源
create extension dameng_fdw;

--- 创建数据源服务
CREATE SERVER dameng_server FOREIGN DATA WRAPPER dameng_fdw OPTIONS(host '192.168.0.28', port '5236', user 'test', password 'damengtest');

--- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://dameng_server/t2';

--- 导入gbk编码的表
copy t3 from 'fdw://dameng_server/t4' (encoding 'gbk');

--- 创建外部表
create foreign table ftest(a int) server dameng_server options(table 't2');
insert into t1 select * from ftest;

PXF服务

pxf支持的数据源都支持抽象为server,详细的描述参见使用PXF访问外部数据

create server hive_test foreign data wrapper hive_pxf_fdw options (host 'test-3', port '10000');  --HIVE  source
create server hdfs_test foreign data wrapper hdfs_pxf_fdw options (host 'test-3', port '8020') --HDFS source

hdfs数据源

  • 导入: 支持直接获取hdfs上的格式为text/parquet/orc的数据,mpp的executors并行拉取hdfs上的数据来提升速度.
  • 导出: 同时支持seaboxmpp上的数据导出到hdfs上的指定目录中,支持导出为csv,text格式。
hdfs导入

hadoop一般会采用kerberos访问控制机制,这里分成kerberos和非keberos两个类别进行举例说明

  • kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server 
-- host:namenode接入点hostname,当启用高可用namenode时,配置为可用nn 列表,自动选择active的nn进行连接
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证, 
-- principal: kerberbos中的用户身份 
-- keytab: kerberbos中的principal和加密密钥组合的存储文件 

create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp04.seabox.com',port '8020',mechanism 'GSSAPI',principal 'ci-test@SEABOX.COM', keytab '/home/seabox/ci-test.keytab');

create foreign table t (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/lineitem_small_parquet',format 'parquet'); 

create foreign table t (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/lineitem_small_parquet',format 'orc'); 

create foreign table t (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/lineitem_small_parquet',format 'text'); 

-- 本地数据表入库,也可以直接查询外部表
create foreign table t_input (a int, b  varchar(10)) using scolumn;
insert into t_input  select * from t;
  • 非kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server 
-- host:namenode接入点hostname,当启用高可用namenode时,配置为多nn点,自动选择active的nn进行连接
-- port:namenode服务端口号
create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp01.seabox.com,hdp02.seabox.com', port '8020');
create foreign table t (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/lineitem_small_parquet',format 'parquet'); 

create foreign table t (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/lineitem_small_parquet',format 'orc'); 

create foreign table t (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/lineitem_small_parquet',format 'text'); 

-- 本地数据表入库,也可以直接查询外部表
create foreign table t_input (a int, b  varchar(10)) using scolumn;
insert into t_input  select * from t;
hdfs导出

hadoop一般会采用kerberos访问控制机制,这里分成kerberos和非keberos两个类别进行举例说明

  • kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server 
-- host:namenode接入点hostname,当启用高可用namenode时,配置为可用nn 列表,自动选择active的nn进行连接
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证, 
-- principal: kerberbos中的用户身份 
-- keytab: kerberbos中的principal和加密密钥组合的存储文件 

create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp04.seabox.com',port '8020',mechanism 'GSSAPI',principal 'ci-test@SEABOX.COM', keytab '/home/seabox/ci-test.keytab');


-- 创建外部表写入对象 
-- resource: 必须是一个目录,否则报错,如果目录不是空的,默认行为是先清空目录下所有的内容,然后再写入。注意此hdfs目录要赋予seabox用户写权限。
-- format: 可配置为csv,text。
-- clean_dir: 可忽略不设置,可配置参数false,true,如果配置false时,目标目录非空,则会报错退出, 如果配置为true,清空目标目录内容,默认为true
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证。
-- principal: kerberbos中的用户身份 
-- keytab: kerberbos中的principal和加密密钥组合的存储文件

create foreign table t_csv (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/test_write/',format 'csv' , clean_dir 'true'); 

create foreign table t_csv (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/test_write/',format 'text' , clean_dir 'true'); 

-- 导出seaboxmpp的内部表到hdfs上。
insert into t_csv  select *  from  t_source;

select * from t_csv;
  • 非kerberos
create extension hdfs_fdw;
-- 创建 hdfs_server 
-- host:namenode接入点hostname,当启用高可用namenode时,配置为多nn点,自动选择active的nn进行连接
-- port:namenode服务端口号
create server hdfs_server foreign data wrapper hdfs_fdw options(host 'hdp01.seabox.com,hdp02.seabox.com', port '8020');
-- 创建外部表写入对象 
-- resource: 必须是一个目录,否则报错,如果目录不是空的,默认行为是先清空目录下所有的内容,然后再写入。注意此hdfs目录要赋予seabox用户写权限。
-- is_writable: 是否为可写外部表,可写外部表也可以直接查询数据。
-- format: 可配置为csv,text。
-- clean_dir: 可忽略不设置,可配置参数false,true,如果配置false时,目标目录非空,则会报错退出, 如果配置为true,清空目标目录内容,默认为true
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证。
-- principal: kerberbos中的用户身份 
-- keytab: kerberbos中的principal和加密密钥组合的存储文件

create foreign table t_csv (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/test_write/',format 'csv' , clean_dir 'true'); 

create foreign table t_csv (a int, b  varchar(10)) server hdfs_server  options(resource '/hive/warehouse/test_write/',format 'text' , clean_dir 'true'); 


-- 导出seaboxmpp的内部表到hdfs上。
insert into t_csv  select *  from  t_source;
select * from t_csv;

hive数据源

直接获取hive数据源, 格式为text/parquet/orc的数据,通过目标mpp的executors并行拉取hdfs上的数据来提升速度。其他格式数据直接走hive的客户端链接获取数据。

hadoop一般会采用kerberos访问控制机制,这里分成kerberos和非keberos两个类别进行举例说明。

  • kerberos
-- 创建 hive_server 
-- host: hiveServer2的服务的host,
-- port: hive服务端口号
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证, 'PLAIN'为用户名密码认证
-- principal: 用户在kerberbos中的身份 
-- keytab: kerberbos中的principal和加密密钥组合的存储文件
-- service,sfqdn:用户要访问的kerberos内的服务, 这两部分组成其在kerberos中的principal:service/sfqdn
create extension hive_fdw;
create server hive_server foreign data wrapper hive_fdw options (host 'hdp04.seabox.com', port '10000', mechanism 'GSSAPI', service 'hive', sfqdn 'hdp04.seabox.com', principal 'ci-test@SEABOX.COM', keytab '/home/seabox/ci-test.keytab');
create foreign table tf (a int, b  varchar(10)) server hive_server  options(resource 'lineitem_small_parquet'); 
insert into t  select * from tf;
  • 非kerberos
-- 创建 hive_server 
-- host: hiveServer2的服务的host,
-- port: hive服务端口号
-- mechanism 参数表示认证机制 :'GSSAPI'为kerberos认证, 'PLAIN'为用户名密码认证
-- namenodes: hdfs的namenodes的可选list,从中选择active的namenode进行连接
-- user,password: 用户名,密码,

create extension hive_fdw;
create server hive_server foreign data wrapper hive_fdw options (host 'hdp01.seabox.com', port '10000', mechanism 'PLAIN', namenodes 'hdp01.seabox.com,hdp02.seabox.com')

create foreign table tf (a int, b  varchar(10)) server hive_server  options(resource 'lineitem_small_parquet'); 
insert into t  select * from tf;

mysql数据源

数据源为mysql数据库, 如果是partition表会根据partition进行分割,每个executor处理部分分区,普通表只有一个executor参与抽取。

参数说明
  • host: 域名
  • port: 端口
  • username: 用户名
  • password: 用户密码
  • dbname: 库名
  • table_name: 要访问的表名
  • encoding: mysql数据库编码
使用说明
create extension if not exists mysql_fdw;
create server mysql_svr foreign data wrapper mysql_fdw options (host '192.168.0.28', port '3306');
create user mapping for public server mysql_svr options (username 'test', password 'test');
create foreign table tf(a int, b varchar(20)) server mysql_svr options (dbname 'mysql', table_name 'mysql_test');

--- 导入,t为seaboxmpp的表
insert into t  select * from tf;
--- 导出到MySQL数据库
insert into tf values(1,'test');

使用odbc访问数据库

大多数的数据库都提供了odbc驱动,正确的配置后提供了一种一致的访问数据库的方法。 要使用odbc_fdw需要宿主机安装unixODBC,并且安装了要访问数据库对应的驱动。 在centos上需要配置/etc/odbcinst.ini和/etc/odbc.ini。 odbc_fdw扩展只支持导入。

odbcinst

配置驱动的位置

[mysql]
Description = ODBC for MySQL
Driver = /usr/local/lib/libmyodbc8a.so
FileUsage = 1

odbc

配置数据源的访问方法

[mysql]
Description= mysql odbc driver setup
Driver=mysql
Database=citest
Server=192.168.0.28

参数说明
  • dsn: 在odbc一节中配置的数据源名称
  • user: 用户名
  • password: 用户密码
使用说明
--- 创建数据源
create extension if not exists odbc_fdw;

--- 创建数据源服务
create server mysql foreign data wrapper odbc_fdw options(dsn 'mysql', user 'citest', password '111111');

--- 将数据源的t2表加载到本集群的t1表中
copy t1 from 'fdw://mysql/t2';

--- 使用外部表
create foreign table ftest(a int) server mysql options(table 't2');
insert into t1 select * from ftest;
select * from ftest;

s3数据源

S3提供了对象(object)存储服务,数据文件可以通过s3的resetful接口进行访问。 支持文本和parquet格式的文件数据。

参数说明
  • host: s3存储的访问地址和端口号
  • bucket: 存放数据的存储桶
  • prefix: 数据的存储路径,如果路径类似一个文件夹,其下面的所有文件都会被访问
  • resource: 和prefix意义相同
  • accessid:用户的访问ID
  • secret: 用户的访问密钥
  • partkeys: 此参数中的列的值是从数据存储的路径中获取
  • format: 支持的文件数据格式csv/text/parquet/parquettext(parquettext格式指的是所有列都是字符串类型的parquet文件)
全局参数说明
  • foreign_use_file_column_name : 设置此参数可以用控制是否使用parquet文件中的列名字来对应数据库表中的列。
parquet文件中的列顺序:
a      |b      |c
stringa|stringb|stringc

建表:
create table (a text, c text, b text);

set foreign_use_file_column_name = off;

加载后查询的结果为:
a      |c     |b
stringa|stringb|stringc

set foreign_use_file_column_name = on;

加载后查询的结果为:
a      |c     |b
stringa|stringc|stringb
使用说明
--- 创建数据源
create extension s3_fdw;

--- 创建数据源服务
create server s3 foreign data wrapper s3_fdw options (host 'ss3:9000', bucket 'citest', accessid 'admin', secret 'adminadmin');

--- 将s3上citest存储桶下面的data/a.txt对象加载到本集群的t1表中
copy t1 from 'fdw://s3/data/a.txt';

--- 创建外部表访问s3上的文件
create foreign table test(a int, b varchar(20)) server s3 options(prefix 'parquet/test', format 'parquet');
create foreign table testtext(a int, b varchar(20)) server s3 options(prefix 'csv/a.csv', format 'csv');

--- 创建外部表访问s3上的文件,此表中的c列的值是路径上的`20230210`,代表一个时间分区
create foreign table part(a int, b varchar(20), c date) server s3 options(prefix 'parquet/part/20230210/100.parquet', format 'parquet', partkeys 'd');