JDBC 消息存储
Spring 集成提供了两种特定于 JDBC 的消息存储实现。
这JdbcMessageStore
适合与聚合商和 Claim Check 模式一起使用。
这JdbcChannelMessageStore
implementation 专门为 Message Channel 提供了更具针对性和可扩展性的实现。
请注意,您可以使用JdbcMessageStore
要支持消息通道,JdbcChannelMessageStore
为此目的进行了优化。
从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore 已优化。
如果您在此类存储中有大型消息组,则可能需要更改索引。
此外,的PriorityChannel 被注释掉,因为除非您使用的是 JDBC 支持的此类通道,否则不需要它。 |
使用OracleChannelMessageStoreQueryProvider ,则必须添加优先级通道索引,因为它包含在查询的 hint 中。 |
初始化数据库
在开始使用 JDBC 消息存储组件之前,您应该使用适当的对象预置目标数据库。
Spring 集成附带了一些可用于初始化数据库的示例脚本。
在spring-integration-jdbc
JAR 文件中,您可以在org.springframework.integration.jdbc
包。
它为一系列常见数据库平台提供了示例 create 和 example drop 脚本。
使用这些脚本的常见方法是在 Spring JDBC 数据源初始化器中引用它们。
请注意,这些脚本作为示例以及所需表和列名称的规范提供。
您可能会发现需要增强它们以供 生产环境 使用(例如,通过添加索引声明)。
从版本 6.2 开始,JdbcMessageStore
,JdbcChannelMessageStore
,JdbcMetadataStore
和DefaultLockRepository
实现SmartLifecycle
并对其各自的表执行 'SELECT COUNT' 查询,在start()
方法确保目标数据库中存在所需的表(根据提供的前缀)。
如果所需的表不存在,则应用程序上下文无法启动。
可以通过以下方式禁用该检查setCheckDatabaseOnStart(false)
.
通用 JDBC 消息存储
JDBC 模块提供了 Spring 集成的实现MessageStore
(在索赔检查模式中很重要)和MessageGroupStore
(在 Aggregator 等有状态模式中很重要)由数据库提供支持。
这两个接口都是由JdbcMessageStore
,并且支持在 XML 中配置存储实例,如下例所示:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定JdbcTemplate
而不是DataSource
.
以下示例显示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource" table-prefix="MY_INT_"/>
在前面的示例中,我们为存储生成的查询中的表名称指定了前缀。
表名前缀默认为INT_
.
后备消息通道
如果您打算使用 JDBC 返回消息通道,我们建议使用JdbcChannelMessageStore
实现。
它只能与 Message Channel 结合使用。
支持的数据库
这JdbcChannelMessageStore
使用特定于数据库的 SQL 查询从数据库中检索消息。
因此,您必须将ChannelMessageStoreQueryProvider
属性JdbcChannelMessageStore
.
这channelMessageStoreQueryProvider
提供您指定的特定数据库的 SQL 查询。
Spring 集成为以下关系数据库提供支持:
-
PostgreSQL 数据库
-
HSQLDB 数据库
-
MySQL (MySQL的
-
神谕
-
德比
-
H2 系列
-
SqlServer 服务器
-
Sybase
-
DB2
如果您的数据库未列出,您可以实施ChannelMessageStoreQueryProvider
接口并提供您自己的自定义查询。
版本 4.0 添加了MESSAGE_SEQUENCE
列添加到表中,以确保先进先出 (FIFO) 排队,即使消息以相同的毫秒数存储也是如此。
从版本 6.2 开始,ChannelMessageStoreQueryProvider
暴露一个isSingleStatementForPoll
标志,其中PostgresChannelMessageStoreQueryProvider
返回true
它的 poll 查询现在基于单个DELETE…RETURNING
陈述。
这JdbcChannelMessageStore
咨询isSingleStatementForPoll
选项并跳过单独的DELETE
语句 (如果仅支持单个轮询语句)。
自定义消息插入
从 5.0 版本开始,通过重载ChannelMessageStorePreparedStatementSetter
类中,您可以在JdbcChannelMessageStore
.
您可以使用它来设置不同的列或更改表结构或序列化策略。
例如,而不是默认序列化为byte[]
,您可以将其结构存储为 JSON 字符串。
以下示例使用setValues
来存储公共列,并覆盖将消息负载存储为varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源: 如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中介绍。 |
并发轮询
轮询消息通道时,您可以选择配置关联的Poller
替换为TaskExecutor
参考。
但请记住,如果您使用 JDBC 支持的消息通道,并且您计划轮询该通道,从而使用多个线程轮询消息存储事务性消息存储,则应确保使用支持 Multiversion Concurrency Control (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且在使用多个线程时,性能可能无法按预期实现。 例如,Apache Derby 在这方面是有问题的。 实现更好的 JDBC 队列吞吐量并避免不同线程可能轮询相同的
|
优先通道
从版本 4.0 开始,JdbcChannelMessageStore
实现PriorityCapableChannelMessageStore
并提供priorityEnabled
选项,让它用作message-store
参考priority-queue
实例。
为此,INT_CHANNEL_MESSAGE
table 的MESSAGE_PRIORITY
column 来存储PRIORITY
消息标头。
此外,新的MESSAGE_SEQUENCE
column 让我们实现强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储多条消息也是如此。
消息从数据库中轮询(选择)order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
我们不建议使用相同的JdbcChannelMessageStore bean 用于优先级和非优先级队列通道,因为priorityEnabled 选项应用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。
然而,同样的INT_CHANNEL_MESSAGE 表(甚至region ) 可用于JdbcChannelMessageStore 类型。
要配置该场景,可以从一个消息存储 Bean 扩展另一个消息存储 Bean,如下例所示: |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
对消息存储进行分区
通常使用JdbcMessageStore
作为一组应用程序或同一应用程序中的节点的全局存储。
为了提供一些防止名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。
一种方法是通过更改前缀(如前所述)来使用单独的表名。
另一种方法是指定一个region
name 用于对单个表中的数据进行分区。
第二种方法的一个重要用例是MessageStore
正在管理支持 Spring 集成消息通道的持久队列。
持久通道的消息数据在存储区中的通道名称上键入。
因此,如果通道名称不是全局唯一的,则通道可以选取并非适用于它们的数据。
为避免此危险,您可以使用消息存储region
为具有相同逻辑名称的不同物理通道保持数据独立。
PostgreSQL:接收推送通知
PostgreSQL 提供了一个侦听和通知框架,用于在数据库表作时接收推送通知。
Spring 集成利用这种机制(从版本 6.0 开始)来允许在将新消息添加到JdbcChannelMessageStore
.
使用此功能时,必须定义一个数据库触发器,该触发器可以作为schema-postgresql.sql
文件,该文件包含在 Spring 集成的 JDBC 模块中。
推送通知通过PostgresChannelMessageTableSubscriber
类,它允许其订阅者在到达任何给定的新消息时接收回调region
和groupId
.
即使消息附加到不同的 JVM 上,但附加到同一个数据库,也会收到这些通知。
这PostgresSubscribableChannel
implementation 使用PostgresChannelMessageTableSubscriber.Subscription
Contract 从 store 中提取消息,作为对来自上述PostgresChannelMessageTableSubscriber
通知。
例如,推送通知some group
可以按以下方式接收:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从版本 6.0.5 开始,指定PlatformTransactionManager
在PostgresSubscribableChannel
将在交易中通知订阅者。
订户中的异常将导致事务回滚,并将消息放回消息存储中。
默认情况下,事务支持未激活。
重试
从版本 6.0.5 开始,可以通过提供RetryTemplate
到PostgresSubscribableChannel
.
默认情况下,不执行重试。
任何活动 对于独占连接的这种需求,还建议 JVM 只运行单个 |