Spring Integration 提供了两个特定于 JDBC 的消息存储库实现。
适用于聚合器和声明检查模式。
该实现专门针对消息通道提供了更有针对性和可伸缩性的实现。JdbcMessageStore
JdbcChannelMessageStore
请注意,您可以使用 a 来支持消息通道,该通道已针对此目的进行了优化。JdbcMessageStore
JdbcChannelMessageStore
从 5.0.11、5.1.2 版本开始,优化了 的索引。
如果此类存储中有大型消息组,则可能希望更改索引。
此外,索引被注释掉,因为除非您使用 JDBC 支持的此类通道,否则不需要它。JdbcChannelMessageStore PriorityChannel |
使用 时,必须添加优先级通道索引,因为它包含在查询的提示中。OracleChannelMessageStoreQueryProvider |
从 5.0.11、5.1.2 版本开始,优化了 的索引。
如果此类存储中有大型消息组,则可能希望更改索引。
此外,索引被注释掉,因为除非您使用 JDBC 支持的此类通道,否则不需要它。JdbcChannelMessageStore PriorityChannel |
使用 时,必须添加优先级通道索引,因为它包含在查询的提示中。OracleChannelMessageStoreQueryProvider |
初始化数据库
在开始使用 JDBC 消息存储库组件之前,应使用适当的对象置备目标数据库。
Spring Integration 附带了一些可用于初始化数据库的示例脚本。
在 JAR 文件中,您可以在包中找到脚本。
它为一系列常见的数据库平台提供了一个示例创建和一个示例删除脚本。
使用这些脚本的常用方法是在 Spring JDBC 数据源初始值设定项中引用它们。
请注意,这些脚本是作为示例和所需表和列名称的规范提供的。
您可能会发现需要增强它们以供生产使用(例如,通过添加索引声明)。spring-integration-jdbc
org.springframework.integration.jdbc
从版本 6.2 开始,在方法中对各自的表实现并执行“SELECT COUNT”查询,以确保目标数据库中存在所需的表(根据提供的前缀)。
如果所需的表不存在,则应用程序上下文无法启动。
可以通过 禁用检查。JdbcMessageStore
JdbcChannelMessageStore
JdbcMetadataStore
DefaultLockRepository
SmartLifecycle
start()
setCheckDatabaseOnStart(false)
通用 JDBC 消息存储库
JDBC 模块提供了由数据库支持的 Spring 集成(在声明检查模式中很重要)和(在有状态模式(如聚合器)中很重要)的实现。
这两个接口都由 实现,并且支持在 XML 中配置存储实例,如以下示例所示:MessageStore
MessageGroupStore
JdbcMessageStore
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定 a 而不是 .JdbcTemplate
DataSource
以下示例显示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前面的示例中,我们指定了用于将消息作为大型对象处理(这对于 Oracle 来说通常是必需的)和存储生成的查询中的表名的前缀。
表名前缀默认为 。LobHandler
INT_
后备消息通道
如果您打算使用 JDBC 支持消息通道,我们建议使用该实现。
它仅与消息通道结合使用。JdbcChannelMessageStore
支持的数据库
它使用特定于数据库的 SQL 查询从数据库中检索消息。
因此,必须在 上设置属性。
这将提供您指定的特定数据库的 SQL 查询。
Spring Integration 提供对以下关系数据库的支持:JdbcChannelMessageStore
ChannelMessageStoreQueryProvider
JdbcChannelMessageStore
channelMessageStoreQueryProvider
-
PostgreSQL的
-
HSQLDB数据库
-
MySQL的
-
神谕
-
德比
-
H2型
-
SqlServer
-
Sybase
-
数据库2
如果未列出数据库,则可以实现该接口并提供自己的自定义查询。ChannelMessageStoreQueryProvider
版本 4.0 将列添加到表中,以确保先进先出 (FIFO) 排队,即使消息存储在同一毫秒内也是如此。MESSAGE_SEQUENCE
从版本 6.2 开始,公开一个标志,其中返回及其对轮询的查询现在基于单个语句。
如果仅支持单个轮询语句,则会咨询该选项并跳过单独的语句。ChannelMessageStoreQueryProvider
isSingleStatementForPoll
PostgresChannelMessageStoreQueryProvider
true
DELETE…RETURNING
JdbcChannelMessageStore
isSingleStatementForPoll
DELETE
自定义消息插入
从 5.0 版开始,通过重载类,可以在 .
您可以使用它来设置不同的列或更改表结构或序列化策略。
例如,可以将其结构存储为 JSON 字符串,而不是默认序列化为 。ChannelMessageStorePreparedStatementSetter
JdbcChannelMessageStore
byte[]
以下示例使用 to 存储公共列的默认实现,并重写将消息负载存储为 :setValues
varchar
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源: 如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中介绍。 |
并发轮询
轮询消息通道时,可以选择配置与引用关联的通道。Poller
TaskExecutor
但请记住,如果您使用 JDBC 支持的消息通道,并且您计划轮询该通道,从而轮询具有多个线程的消息存储区事务,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且使用多个线程时,性能可能无法按预期实现。 例如,Apache Derby 在这方面是有问题的。 为了实现更好的 JDBC 队列吞吐量,并避免不同线程可能从队列轮询相同的线程时出现的问题,在使用不支持 MVCC 的数据库时,将 的属性设置为 非常重要。
以下示例演示如何执行此操作:
|
优先通道
从版本 4.0 开始,实现并提供该选项,使其用作实例的参考。
为此,该表具有一列来存储消息标头的值。
此外,新列使我们能够实现强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储多条消息也是如此。
从数据库中轮询(选择)消息。JdbcChannelMessageStore
PriorityCapableChannelMessageStore
priorityEnabled
message-store
priority-queue
INT_CHANNEL_MESSAGE
MESSAGE_PRIORITY
PRIORITY
MESSAGE_SEQUENCE
order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
我们不建议对优先级和非优先级队列通道使用相同的 Bean,因为该选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。
但是,同一表(甚至)可用于两种类型。
要配置该方案,可以从另一个消息存储库 Bean 扩展另一个消息存储 Bean,如以下示例所示:JdbcChannelMessageStore priorityEnabled INT_CHANNEL_MESSAGE region JdbcChannelMessageStore |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源: 如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中介绍。 |
但请记住,如果您使用 JDBC 支持的消息通道,并且您计划轮询该通道,从而轮询具有多个线程的消息存储区事务,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且使用多个线程时,性能可能无法按预期实现。 例如,Apache Derby 在这方面是有问题的。 为了实现更好的 JDBC 队列吞吐量,并避免不同线程可能从队列轮询相同的线程时出现的问题,在使用不支持 MVCC 的数据库时,将 的属性设置为 非常重要。
以下示例演示如何执行此操作:
|
我们不建议对优先级和非优先级队列通道使用相同的 Bean,因为该选项适用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。
但是,同一表(甚至)可用于两种类型。
要配置该方案,可以从另一个消息存储库 Bean 扩展另一个消息存储 Bean,如以下示例所示:JdbcChannelMessageStore priorityEnabled INT_CHANNEL_MESSAGE region JdbcChannelMessageStore |
对消息存储区进行分区
通常将 a 用作同一应用程序中的一组应用程序或节点的全局存储。
为了提供一些防止名称冲突的保护,并控制数据库元数据配置,消息存储库允许以两种方式对表进行分区。
一种方法是通过更改前缀(如前所述)来使用单独的表名。
另一种方法是指定用于在单个表中对数据进行分区的名称。
第二种方法的一个重要用例是管理支持Spring Integration Message Channel的持久队列。
持久通道的消息数据在通道名称的存储中键入。
因此,如果通道名称不是全局唯一的,则通道可以选取不适合它们的数据。
若要避免这种危险,可以使用消息存储库将具有相同逻辑名称的不同物理通道的数据分开。JdbcMessageStore
region
MessageStore
region
PostgreSQL:接收推送通知
PostgreSQL 提供了一个侦听和通知框架,用于在数据库表操作时接收推送通知。
Spring Integration 利用此机制(从 6.0 版开始)允许在将新消息添加到 .
使用此功能时,必须定义一个数据库触发器,该触发器可以作为文件注释的一部分找到,该文件包含在 Spring Integration 的 JDBC 模块中。JdbcChannelMessageStore
schema-postgresql.sql
推送通知通过类接收,该类允许其订阅者在收到任何给定 和 的新消息时收到回调。
即使消息附加到不同的 JVM 上,但附加到同一数据库,也会收到这些通知。
该实现使用协定从存储中拉取消息,作为对上述通知的通知的反应。PostgresChannelMessageTableSubscriber
region
groupId
PostgresSubscribableChannel
PostgresChannelMessageTableSubscriber.Subscription
PostgresChannelMessageTableSubscriber
例如,可以按如下方式接收推送通知:some group
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
交易支持
从版本 6.0.5 开始,在 a 上指定 a 将在事务中通知订阅者。
订阅服务器中的异常将导致事务回滚,并将消息放回邮件存储中。
默认情况下,事务支持未激活。PlatformTransactionManager
PostgresSubscribableChannel
重试
从版本 6.0.5 开始,可以通过向 .
默认情况下,不执行重试。RetryTemplate
PostgresSubscribableChannel
任何活动在其活动生命周期期间都占用独占 JDBC。
因此,重要的是此连接不是源自池化。
此类连接池通常期望已发出的连接在预定义的超时窗口内关闭。 对于这种独占连接的需求,还建议 JVM 只运行一个可用于注册任意数量的订阅的单个订阅。 |
任何活动在其活动生命周期期间都占用独占 JDBC。
因此,重要的是此连接不是源自池化。
此类连接池通常期望已发出的连接在预定义的超时窗口内关闭。 对于这种独占连接的需求,还建议 JVM 只运行一个可用于注册任意数量的订阅的单个订阅。 |