跳转至

SeaboxSQL对JDBC API的扩展

对JDBC API的扩展

SeaboxSQL™是可扩展的数据库系统,可以将自己的函数添加到服务器,然后可以从查询中调用它,甚至可以添加自己的数据类型。由于这些是SeaboxSQL™特有的功能,因此我们通过Java和一组扩展API支持它们。 标准驱动程序核心内的某些功能实际上使用这些扩展来实现大对象等。

SeaboxSQL™JDBC驱动程序是非线程安全的。SeaboxSQL服务器未线程化。每个连接都会在服务器上创建一个新进程,因此,对该流程的任何并发请求都必须序列化。驱动程序不保证连接上的方法是同步的,由使用者控制同步的调用驱动程序。

一个明显的例外是org/seaboxsql/jdbc/TimestampUtils.java,它是线程安全的。

访问扩展

要访问某些扩展,需要在com.seaboxsql.SDConnection类中使用一些额外的方法。在这种情况下,将需要强制转换Driver.getConnection()的返回值,例如:

Connection db = Driver.getConnection(url, username, password);
// ...
// later on
Fastpath fp = db.unwrap(com.seaboxsql.SDConnection.class).getFastpathAPI();

几何数据类型

SeaboxSQL™具有一组可以将几何特征存储到表中的数据类型。这些包括单点,直线和多边形。通过com.seaboxsql.geometric软件包在Java中支持这些类型。

例 JDBC中使用CIRCLE类型

import java.sql.*;

import com.seaboxsql.geometric.SDpoint;import com.seaboxsql.geometric.SDcircle;

public class GeometricTest {
    public static void main(String args[]) throws Exception {
        String url = "jdbc:seaboxsql://localhost:5432/test";
        try (Connection conn = DriverManager.getConnection(url, "test", "")) {
            try (Statement stmt = conn.createStatement()) {
                stmt.execute("CREATE TEMP TABLE geomtest(mycirc circle)");
            }
            insertCircle(conn);
            retrieveCircle(conn);
        }
    }

    private static void insertCircle(Connection conn) throws SQLException {
        SDpoint center = new SDpoint(1, 2.5);
        double radius = 4;
        SDcircle circle = new SDcircle(center, radius);
        try (PreparedStatement ps = conn.prepareStatement("INSERT INTO geomtest(mycirc) VALUES (?)")) {
            ps.setObject(1, circle);
            ps.executeUpdate();
        }
    }

    private static void retrieveCircle(Connection conn) throws SQLException {
        try (Statement stmt = conn.createStatement()) {
            try (ResultSet rs = stmt.executeQuery("SELECT mycirc, area(mycirc) FROM geomtest")) {
                while (rs.next()) {
                    SDcircle circle = (SDcircle)rs.getObject(1);
                    double area = rs.getDouble(2);

                    System.out.println("Center (X, Y) = (" + circle.center.x + ", " + circle.center.y + ")");
                    System.out.println("Radius = " + circle.radius);
                    System.out.println("Area = " + area);
                }
            }
        }
    }
}

大对象

标准JDBC规范支持大对象。 但是,该接口是有限的,而SeaboxSQL™提供的API允许对对象内容的随机访问,就好像它是本地文件一样。

com.seaboxsql.largeobject包向Java提供了libpq C接口的大对象API。它由两个类组成,LargeObjectManager处理大对象的创建,打开和删除;LargeObject处理单个对象。有关此API的用法示例,请参见示例7.1“在JDBC中处理二进制数据”。

侦听/通知

侦听和通知为访问同一SeaboxSQL™数据库的一组进程提供了一种信号或进程间通信机制的简单形式。 有关通知的更多信息,请参阅数据库服务文档,本节仅涉及JDBC侦听的内容。

标准的LISTEN,NOTIFY和UNLISTEN命令是通过标准Statement接口发出的。 要检索和处理检索到的通知,必须将Connection强制转换为SeaboxSQL™特定的扩展接口SDConnection。 从那里可以使用getNotifications()方法检索任何未完成的通知。

例 接收通知

import java.sql.*;

public class NotificationTest{
    public static void main(String args[]) throws Exception
    {
        Class.forName("com.seaboxsql.Driver");
        String url = "jdbc:seaboxsql://localhost:5432/test";


        // Create two distinct connections, one for the notifier
        // and another for the listener to show the communication
        // works across connections although this example would
        // work fine with just one connection.

        Connection lConn = DriverManager.getConnection(url,"test","");
        Connection nConn = DriverManager.getConnection(url,"test","");

        // Create two threads, one to issue notifications and
        // the other to receive them.

        Listener listener = new Listener(lConn);
        Notifier notifier = new Notifier(nConn);
        listener.start();
        notifier.start();
    }
}

class Listener extends Thread
{
    private Connection conn;
    private com.seaboxsql.SDConnection sdconn;

    Listener(Connection conn) throws SQLException
    {
        this.conn = conn;
        this.sdconn = conn.unwrap(com.seaboxsql.SDConnection.class);
        Statement stmt = conn.createStatement();
        stmt.execute("LISTEN mymessage");
        stmt.close();
    }

    public void run()
    {
        try
        {
            while (true)
            {
                com.seaboxsql.SDNotification notifications[] = sdconn.getNotifications();

                // If this thread is the only one that uses the connection, a timeout can be used to
                // receive notifications immediately:
                // com.seaboxsql.SDNotification notifications[] = sdconn.getNotifications(10000);

                if (notifications != null)
                {
                    for (int i=0; i < notifications.length; i++)
                        System.out.println("Got notification: " + notifications[i].getName());
                }

                // wait a while before checking again for new
                // notifications

                Thread.sleep(500);
            }
        }
        catch (SQLException sqle)
        {
            sqle.printStackTrace();
        }
        catch (InterruptedException ie)
        {
            ie.printStackTrace();
        }
    }
}

class Notifier extends Thread
{
    private Connection conn;

    public Notifier(Connection conn)
    {
        this.conn = conn;
    }

    public void run()
    {
        while (true)
        {
            try
            {
                Statement stmt = conn.createStatement();
                stmt.execute("NOTIFY mymessage");
                stmt.close();
                Thread.sleep(2000);
            }
            catch (SQLException sqle)
            {
                sqle.printStackTrace();
            }
            catch (InterruptedException ie)
            {
                ie.printStackTrace();
            }
        }
    }
}

服务器的预编译语句

目的

SeaboxSQL™服务器允许客户端编译预期将被重用的sql语句,以避免为每次执行语句分析和计划的开销。在SQL级别通过PREPARE和EXECUTE和协议级别都可以使用此功能,但是作为Java开发人员,我们真正想用的只是标准PreparedStatement接口。

服务器端预编译语句可以提升执行速度,因为

  1. 它只发送语句句柄(例如S_1),而不发送完整的SQL文本

  2. 它允许使用二进制传输(例如,二进制int4,二进制时间戳等);参数和结果解析起来要快得多

  3. 它启用了重用服务器端执行计划

  4. 客户端可以重用结果集列定义,因此不必在每次执行时都接收和解析元数据

执行

早期版本的驱动程序使用PREPARE和EXECUTE来实现服务器预编译语句。从7.3开始的所有服务器版本都支持此功能,但查询结果产生了应用程序可见的更改,例如缺少ResultSet元数据和行更新计数。当前驱动程序使用与V3协议等效的级别,以避免查询结果中的这些更改。

当使用PreparedStatement API时,驱动程序默认使用服务器端的预处理语句。为了进行服务器端的准备,您需要执行5次查询(可以通过prepareThreshold连接属性进行配置)。内部计数器跟踪该语句已执行了多少次,以及何时达到阈值,它将开始使用服务器端预编译的语句。

出于性能原因,重复使用相同的PreparedStatement对象通常是一个好主意,但是驱动程序能够自动跨connection.prepareStatement(…)调用使用服务器预编译语句。

例如

PreparedStatement ps = con.prepareStatement("select /*test*/ ?::int4");
ps.setInt(1, 42);
ps.executeQuery().close();
ps.close();

PreparedStatement ps = con.prepareStatement("select /*test*/ ?::int4");
ps.setInt(1, 43);
ps.executeQuery().close();
ps.close();

效率不如

PreparedStatement ps = con.prepareStatement("select /*test*/ ?::int4");
ps.setInt(1, 42);
ps.executeQuery().close();

ps.setInt(1, 43);
ps.executeQuery().close();

sdjdbc在两种情况下都可以使用服务器端预编译语句。

注意

Statement对象绑定到一个Connection,并且从多个并发线程访问相同的Statement和/或Connection并不是一个好主意(cancel(),close()和类似情况除外)。Close()操作语句而不是尝试以某种方式对其进行缓存可能更安全。

服务器预编译语句将消耗客户端和服务器上的内存,因此sdjdbc限制了每个连接中服务器准备的语句的数量。可以通过prepareStatementCacheQueries(默认256,sdjdbc知道的查询数)和prepareStatementCacheSizeMiB(默认5,即每个连接的客户端缓存大小,以兆字节为单位)进行配置。服务器仅准备了语句缓存的一个子集,因为某些语句可能无法达到prepareThreshold。

消除

在某些情况下,你可能想禁用服务器编译预处理。例如,应用程序通过负载均衡器与服务器连接,这种情况别无选择。

可以通过设置prepareThreshold = 0来禁用服务器端预处理的使用

核心案例

DDL

V3协议可以避免在每次执行时发送列元数据,BIND消息定义了输出列格式。这带来了带来了问题,诸如

SELECT * FROM mytable;
ALTER mytable ADD column ...;
SELECT * FROM mytable;

导致缓存的计划一定不能更改结果类型的错误,并且会引起事务失败。

推荐的方式是:

1. 使用SELECT列表中使用明确的列名;

2. 避免列的类型发生变化

DEALLOCATE ALLDISCARD ALL

有明确的命令可以释放所有服务器端预编译的语句。这将导致服务器端产生以下错误消息:预编译语句名称无效。当然,它可能会使sdjdbc执行失败,然而在某些情况下,需要丢弃语句(例如,执行大量DDL之后)

我们推荐的是:

  1. 使用简单的DEALLOCATE ALL和/或DISCARD ALL命令,避免将命令嵌套到pl/sdsql等中。驱动程序确实了解上层DEALLOCATE/DISCARD命令,并且让客户端缓存无效

  2. 重新连接。缓存是在连接范畴内的,因此如果您重新连接,它将失效

设置search_path = …

SeaboxSQL允许自定义search_path,它为开发人员提供了强大的功能。在它的帮助下,可以实现:

set search_path='app_v1';
SELECT * FROM mytable;
set search_path='app_v2';
SELECT * FROM mytable; -- Does mytable mean app_v1.mytable or app_v2.mytable here?

服务器端预编译语句与数据库对象ID相关联,因此它可以从"原有" app_v1.mytable表中获取数据。很难说出预期的行为,但是sdjdbc试图跟踪search_path的变化,并使相应的缓存无效。

推荐的方式是:

1. 避免经常更改search_path,因为它会使服务器端预编译语句无效

2. 使用简单的set search_path …命令,避免将命令嵌套到pl/pgsql等中,否则sdjdbc将无法识别search_path的更改

重新执行失败的语句

遗憾的是,缓存的计划不能更改结果类型这样简单的问题可能会导致整个事务失败。在某些情况下,驱动程序可以自动重新执行该语句。

  1. 如果事务没有失败(例如,在执行导致缓存计划…错误的语句之前该事务不存在),则sdjdbc会自动重新执行该语句。这将满足应用程序的执行需求,并避免了不必要的错误。

  2. 如果事务处于失败状态,则除了回滚外别无其他。 sdjdbc具有“自动保存”功能,它可以自动回滚并重试该语句。该行为通过autosave属性控制(默认为never)。conservative设定将自动回滚无效的服务器预编译语句有关的错误。

    注意: autosave可能会导致长时间事务进而出现严重性能问题,因为SeaboxSQL后端并未针对长时间事务和大量保存点进行优化。

复制连接

SeaboxSQL复制连接不允许使用服务器端预编译的语句,因此在激活复制连接属性的情况下sdjdbc使用简单查询。

服务器端预编译语句用于con.createStatement().

默认情况下,sdjdbc仅将服务器预编译语句用于PreparedStatement,但是您可能还希望为常规Statement使用服务器端预编译语句。例如,如果通过con.createStatement().executeQuery(...)执行同一条语句,则可以通过该缓存的语句提高性能。当然,最好显式地使用PreparedStatements,但是驱动程序也可以选择缓存简单的语句。

您可以通过将preferredQueryMode设置为extendedCacheEverything来实现。

注意: 该选项更像是诊断/调试功能类别,因此请谨慎使用。

绑定占位符数据类型

数据库针对给定的参数类型优化执行计划。考虑以下情况:

-- create table rooms (id int4, name varchar);
-- create index name__rooms on rooms(name);
PreparedStatement ps = con.prepareStatement("select id from rooms where name=?");
ps.setString(1, "42");

看上去按照预期工作了,但是如果换成setInt会怎么样?

ps.setInt(1, 42);

即使结果相同,但第一个变体(使用setString的情况)使数据库能够使用索引name__rooms,而后者却不能。如果数据库以整数形式获取42,则它使用的计划类似于where cast(name int4) = ?

该计划必须特定于(SQL text;参数类型)组合,因此,驱动程序必须使服务器端预编译语句无效,以免不同的参数类型作用于该语句。

这对于批处理操作特别痛苦,因为您不想因为使用不同的数据类型而中断批处理。

最典型的场景如下(永远不要在生产中使用):

PreparedStatement ps = con.prepareStatement("select id from rooms where ...");
if (param instanceof String) {
    ps.setString(1, param);
} else if (param instanceof Integer) {
    ps.setInt(1, ((Integer) param).intValue());
} else {
    // Does it really matter which type of NULL to use?
    // In fact, it does since data types specify which server-procedure to call
    ps.setNull(1, Types.INTEGER);
}

您可能会猜到,setString与setNull(…,Types.INTEGER)会导致数据类型交替出现,这会迫使驱动程序使服务器端已有预编译语句无效并重新预编译。

建议对每个绑定占位符使用一致的数据类型,对setNull使用相同的类型。 查看com.seaboxsql.test.jdbc2.PreparedStatementTest.testAlternatingBindType示例以获取更多详细信息。

调试

一旦遇到缓存的计划不能更改结果类型或预编译语句“ S_2 ”不存在,则以下内容可能有助于调试。

  1. 客户端日志记录。 如果设定loggerLevel = TRACE&loggerFile = sdjdbc-trace.log,你将可以跟踪驱动程序和后端之间的消息传送

  2. 可以参考com.seaboxsql.test.jdbc2.AutoRollbackTestSuite,因为它检查了很多组合情况

例 使用服务器端的预编译语句

import java.sql.*;

public class ServerSidePreparedStatement{

    public static void main(String args[]) throws Exception
    {
        Class.forName("com.seaboxsql.Driver");
        String url = "jdbc:seaboxsql://localhost:5432/test";
        Connection conn = DriverManager.getConnection(url,"test","");

        PreparedStatement pstmt = conn.prepareStatement("SELECT ?");

        // cast to the sd extension interface
        com.seaboxsql.SDStatement sdstmt = pstmt.unwrap(com.seaboxsql.SDStatement.class);

        // on the third execution start using server side statements
        sdstmt.setPrepareThreshold(3);

        for (int i=1; i<=5; i++)
        {
            pstmt.setInt(1,i);
            boolean usingServerPrepare = sdstmt.isUseServerPrepare();
            ResultSet rs = pstmt.executeQuery();
            rs.next();
            System.out.println("Execution: "+i+", Used server side: " + usingServerPrepare + ", Result: "+rs.getInt(1));
            rs.close();
        }

        pstmt.close();
        conn.close();
    }
}
将在第三次执行时产生使用服务器端预编译语句产生预期的结果。

Execution: 1, Used server side: false, Result: 1
Execution: 2, Used server side: false, Result: 2
Execution: 3, Used server side: true, Result: 3
Execution: 4, Used server side: true, Result: 4
Execution: 5, Used server side: true, Result: 5

上面的示例要求程序员在所谓的可移植API中使用SeaboxSQL™特定代码,这并不理想。 此外,它仅为该特定语句设置阈值,如果我们想为每个语句使用该阈值,则需要额外输入一些内容。 让我们看一下设置阈值以启用服务器预编译语句的其他方法。 在PreparedStatement上方已有一个层次结构,从它创建连接,在该连接上方可以是数据源或URL。 可以这些层次级别的任何位置设定服务器预编译语句阈值,以便所有子等级默认使用该设定。

// sd extension interfacescom.seaboxsql.SDConnection sdconn;com.seaboxsql.SDStatement sdstmt;

// set a prepared statement threshold for connections created from this url
String url = "jdbc:seaboxsql://localhost:5432/test?prepareThreshold=3";

// see that the connection has picked up the correct threshold from the url
Connection conn = DriverManager.getConnection(url,"test","");
sdconn = conn.unwrap(com.seaboxsql.SDConnection.class);
System.out.println(sdconn.getPrepareThreshold()); // Should be 3

// see that the statement has picked up the correct threshold from the connection
PreparedStatement pstmt = conn.prepareStatement("SELECT ?");
sdstmt = pstmt.unwrap(com.seaboxsql.SDStatement.class);
System.out.println(sdstmt.getPrepareThreshold()); // Should be 3

// change the connection's threshold and ensure that new statements pick it up
sdconn.setPrepareThreshold(5);
PreparedStatement pstmt = conn.prepareStatement("SELECT ?");
sdstmt = pstmt.unwrap(com.seaboxsql.SDStatement.class);
System.out.println(sdstmt.getPrepareThreshold()); // Should be 5

物理和逻辑复制API

参数状态消息

SeaboxSQL支持服务器参数,也称为服务器变量,或者在内部称为Grand Unified Configuration(GUC)变量。这些变量可以由SET命令seaboxsql.confALTER SYSTEM SETALTER USER SETALTER DATABASE SETset_config(...)SQL调用函数等方式操作。请参见SeaboxSQL手册。

对于这些变量的子集,服务器将自动将该值的更改报告给客户端驱动程序和应用程序。在启用该功能之后,这些变量在内部称为GUC_REPORT变量。

服务器会跟踪所有变量作用域,并在变量恢复原有数值时进行报告,因此客户端不必猜测当前值是什么,以及服务器端某些函数是否已经将其更改。每当值更改时,无论为什么更改或如何更改,服务器都会在“参数状态”协议消息中向客户端报告新的有效值。 SdJDBC在内部使用了许多此类报告。

SdJDBC还通过SDConnection扩展接口向用户应用程序提供参数状态信息。

方法

com.seaboxsql.SDConnection上的两个方法提供了报告参数的客户端接口。参数名称不区分大小写且保留大小写。

Map SDConnection.getParameterStatuses()
返回所有报告的参数及其值的映射。
String SDConnection.getParameterStatus()
按名称检索一个值的简写;如果未报告任何值,则为null。

有关详细信息,请参见SDConnection JavaDoc。

例如果您直接使用java.sql.Connection,则可以

import com.seaboxsql.SDConnection;

void my_function(Connection conn) {

    System.out.println("My application name is " +
        ((SDConnection)conn).getParameterStatus("application_name"));

}
其他客户端驱动

与libpq等效的是PQparameterStatus(…)API函数。

总览

SeaboxSQL JDBC引入了一项称为逻辑复制的新功能。逻辑复制允许将数据库中的更改实时流式传输到外部系统。物理复制和逻辑复制之间的区别在于,后者以逻辑格式发送数据,而物理复制以二进制格式发送数据。另外,逻辑复制可以通过单个表或数据库发送。二进制复制以全有或全无的方式复制整个集群。也就是说,无法使用二进制复制来获取特定的表或数据库

在进行逻辑复制之前,保持外部系统实时同步是有问题的。应用程序将必须更新/释放的缓存条目,在搜索引擎中为数据重新编制索引,将其发送到分析系统等等。这回造成竞争和可靠性方面的困扰。例如,如果将略有不同的数据写入两个不同的数据存储区(可能是由于错误或竞争访问状况所致),则随着时间的流逝数据存储区的内容会渐行渐远,它们将变得越来越不一致。从这种逐渐演变的数据损坏中恢复是非常困难的。

逻辑解码采用数据库的预写日志(WAL),并允许我们访问行级变更事件:每次插入,更新或删除表中的一行时,这都是事件。这些事件按事务分组,并按照它们提交到数据库的顺序显示。中止/回滚的事务不会出现在流中。因此,如果以相同顺序应用更改事件,最终将获得数据库的精确且事务一致的副本。它看起来像前面应用程序中实现的事件源模式,但现在可以从SeaboxSQL数据库中直接使用。

为了访问实时变更,SeaboxSQL提供了流复制协议。复制协议可以是物理的或逻辑的。物理复制协议用于主/从复制。逻辑复制协议可用于将更改流式传输到外部系统。

由于JDBC API不包括复制,因此SDConnection实现了SeaboxSQL API。

配置数据库

您的数据库应配置为启用逻辑或物理复制

seaboxsql.conf配置文件说明

  • 属性max_wal_senders应该至少等于复制cunsumer的数量

  • 属性wal_keep_segments应该包含WAL Segment的数量,它是无法从数据库中删除的。

  • 用于逻辑复制的属性wal_level应该等于逻辑。

  • 对于逻辑复制,属性max_replication_slots应该大于零,因为没有复制slot就无法进行逻辑复制。

sd_hba.conf为具有复制权限的用户启用复制流。

local   replication   all                   trust
host    replication   all   127.0.0.1/32    md5
host    replication   all   ::1/128         md5

配置实例

seaboxsql.conf
max_wal_senders = 4             # max number of walsender processes
wal_keep_segments = 4           # in logfile segments, 16MB each; 0 disables
wal_level = logical             # minimal, replica, or logical
max_replication_slots = 4       # max number of replication slots
sd_hba.conf
### Allow replication connections from localhost, by a user with the
### replication privilege.
local   replication   all                   trust
host    replication   all   127.0.0.1/32    md5
host    replication   all   ::1/128         md5
逻辑复制

逻辑复制用复制slot以在服务器上保留WAL日志,并且还定义了将WAL日志解码为所需格式的解码插件。例如,您可以将其解码为json,protobuf等。为了演示如何使用 sdjdbc复制API,我们将使用seaboxsql-contrib中包含的test_decoding插件,而且你也可以使用自己的解码插件。 github上有一些示例。

为了使用复制API,必须以复制模式创建连接。在这种模式下,该连接不可用于执行SQL命令,只能是复制API。 这是SeaboxSQL施加的限制。

示例 创建复制连接。

    String url = "jdbc:seaboxsql://localhost:5432/seaboxs";
    Properties props = new Properties();
    SDProperty.USER.set(props, "seabox");
    SDProperty.PASSWORD.set(props, "seabox");
    SDProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    SDProperty.REPLICATION.set(props, "database");
    SDProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection con = DriverManager.getConnection(url, props);
    SDConnection replConnection = con.unwrap(SDConnection.class);

整个复制API分组在com.seaboxsql.replication.SDReplicationConnection中,可通过com.seaboxsql.SDConnection#getReplicationAPI获得。

开始复制协议之前,需要具有复制slot,该slot也可以通过sdjdbc API创建。

示例 通过sdjdbc API创建复制插槽

    replConnection.getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName("demo_logical_slot")
        .withOutputPlugin("test_decoding")
        .make();
一旦具有了复制slot,就可以创建ReplicationStream。

示例 创建逻辑复制流

    SDReplicationStream stream =
        replConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("demo_logical_slot")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

复制流将发送从创建复制slot之后的所有更改,或者从复制slot重新启动LSN(如果该slot已用于复制)发送所有更改。 你也可以从特定的LSN位置开始流式传输变更,在这种情况下,创建复制流时应指定LSN位置。

示例 从特定位置创建逻辑复制流。

    LogSequenceNumber waitLSN = LogSequenceNumber.valueOf("6F/E3C53568");

    SDReplicationStream stream =
        replConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("demo_logical_slot")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .withStartPosition(waitLSN)
            .start();

通过withSlotOption,我们还可以指定发送到输出插件的选项,这将允许自定义解码。 例如,我有自己的输出插件,该插件具有属性sensitive = true,它将包含敏感列的更改以更改事件。

示例 包含include-xids = true的示例输出

BEGIN 105779
table public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'previous value'
COMMIT 105779

复制操作期间,数据库和使用者定期交换ping消息。 如果数据库或客户端在配置的超时时间内未收到ping消息,则复制被视为已停止,并且将抛出异常,数据库将释放资源。在SeaboxSQL中,ping超时由属性wal_sender_timeout配置(默认= 60秒)。可以将sdjdc中的复制流配置为在需要时或按时间间隔发送反馈(ping)。建议比wal_sender_timeout配置的间隔更频繁地向数据库发送反馈(ping)。 在生产环境中,我使用等于wal_sender_timeout / 3的值。它避免了网络潜在的问题,并且可以变更为不会超时断开连接的流式传输。 要指定反馈间隔,请使用withStatusInterval方法。

示例 将反馈间隔设定为20秒的复制流

    SDReplicationStream stream =
        replConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("demo_logical_slot")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .withStatusInterval(20, TimeUnit.SECONDS)
            .start();

创建SDReplicationStream之后,该开始实时接收更改了。更改可以是阻塞(com.seaboxsql.replication.SDReplicationStream#read)或非阻塞(com.seaboxsql.replication.SDReplicationStream#readPending)从流中接收。两种方法都接收java.nio.ByteBuffer,它是来自发送输出插件的有效载荷。 我们无法接收消息的一部分,只能接收输出插件发送的完整消息。 ByteBuffer包含解码输出插件定义的格式的消息,它可以是简单的String,json或插件确定的任何内容。 这就是为什么sdjdbc返回原始ByteBuffer而不是进行格式假设的原因。

示例 从输出插件发送消息

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);

示例 通过复制流接收变更

    while (true) {
      //non blocking receive message
      ByteBuffer msg = stream.readPending();

      if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10L);
        continue;
      }

      int offset = msg.arrayOffset();
      byte[] source = msg.array();
      int length = source.length - offset;
      System.out.println(new String(source, offset, length));
    }

如前所述,复制流应定期向数据库发送反馈,以防止由于超时而断开连接。如果需要发送反馈,则在调用read或readPending时会自动发送反馈。不管超时如何,也可以通过com.seaboxsql.replication.SDReplicationStream#forceUpdateStatus()发送反馈。反馈的另一个重要职责是为服务器提供已经成功接收并应用于消费者的逻辑序列号(LSN),这对于监控和截断/存档不再需要的WAL是必要的。如果复制已重新启动,它将从上一个反馈给数据库并成功处理的LSN开始。

API提供以下反馈机制,以指示当前使用者成功应用LSN。可以截断或存档之前的LSN。 com.seaboxsql.replication.SDReplicationStream#setFlushedLSNcom.seaboxsql.replication.SDReplicationStream#setAppliedLSN。始终可以通过com.seaboxsql.replication.SDReplicationStream#getLastReceiveLSN获得最后一个接收LSN。

示例 增加成功处理LSN的反馈

    while (true) {
      //Receive last successfully send to queue message. LSN ordered.
      LogSequenceNumber successfullySendToQueue = getQueueFeedback();
      if (successfullySendToQueue != null) {
        stream.setAppliedLSN(successfullySendToQueue);
        stream.setFlushedLSN(successfullySendToQueue);
      }

      //non blocking receive message
      ByteBuffer msg = stream.readPending();

      if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10L);
        continue;
      }

      asyncSendToQueue(msg, stream.getLastReceiveLSN());
    }

示例 逻辑复制的完整示例

    String url = "jdbc:seaboxsql://localhost:5432/test";
    Properties props = new Properties();
    SDProperty.USER.set(props, "seabox");
    SDProperty.PASSWORD.set(props, "seabox");
    SDProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    SDProperty.REPLICATION.set(props, "database");
    SDProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection con = DriverManager.getConnection(url, props);
    SDConnection replConnection = con.unwrap(SDConnection.class);

    replConnection.getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName("demo_logical_slot")
        .withOutputPlugin("test_decoding")
        .make();

    //some changes after create replication slot to demonstrate receive it
    sqlConnection.setAutoCommit(true);
    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table(name) values('first tx changes')");
    st.close();

    st = sqlConnection.createStatement();
    st.execute("update test_logic_table set name = 'second tx change' where pk = 1");
    st.close();

    st = sqlConnection.createStatement();
    st.execute("delete from test_logic_table where pk = 1");
    st.close();

    SDReplicationStream stream =
        replConnection.getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("demo_logical_slot")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .withStatusInterval(20, TimeUnit.SECONDS)
            .start();

    while (true) {
      //non blocking receive message
      ByteBuffer msg = stream.readPending();

      if (msg == null) {
        TimeUnit.MILLISECONDS.sleep(10L);
        continue;
      }

      int offset = msg.arrayOffset();
      byte[] source = msg.array();
      int length = source.length - offset;
      System.out.println(new String(source, offset, length));

      //feedback
      stream.setAppliedLSN(stream.getLastReceiveLSN());
      stream.setFlushedLSN(stream.getLastReceiveLSN());
    }

输出结果如下,每一行是分开的消息

BEGIN
table public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'first tx changes'
COMMIT
BEGIN
table public.test_logic_table: UPDATE: pk[integer]:1 name[character varying]:'second tx change'
COMMIT
BEGIN
table public.test_logic_table: DELETE: pk[integer]:1
COMMIT
物理复制

用于物理复制的API看起来像用于逻辑复制的API。 物理复制不需要复制slot。ByteBuffer将包含WAL日志的二进制形式。二进制WAL格式是一种非常基础的API,并且可以随版本而变化。因此无法在SeaboxSQL主版本之间进行复制。但是物理复制可以包含许多重要数据,而这些数据无法通过逻辑复制获得。 这就是为什么sdjdc包含两个实现的原因。

示例 使用物理复制

    LogSequenceNumber lsn = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_physic_table(name) values('previous value')");
    st.close();

    SDReplicationStream stream =
        sdConnection
            .getReplicationAPI()
            .replicationStream()
            .physical()
            .withStartPosition(lsn)
            .start();

    ByteBuffer read = stream.read();

数组

SeaboxSQL™提供了数组类型对where子句中的列类型,函数参数和复数进行支持。有几种使用sdjdbc创建数组的方法。

java.sql.Connection.createArrayOf(String,Object [])可用于从Object[]实例创建java.sql.Array(注意:这包括基本多维数组和对象多维数组)。类似的方法com.seaboxsql.SDConnection.createArrayOf(String,Object)提供对原始数组类型的支持。 从这些方法返回的java.sql.Array对象可以在其他方法中使用,例如PreparedStatement.setArray(int,Array)

此外,可以在PreparedStatement.setObject方法中使用以下类型的数组,并将使用定义的类型映射:

Java Type Default SeaboxSQL™ Type

short[]      int2[]
int[]        int4[]
long[]       int8[]
float[]      float4[]
double[]     float8[]
boolean[]    bool[]
String[]     varchar[]