JDBC 支持
JDBC 支持
Spring 集成提供了通道适配器,用于使用数据库查询来接收和发送消息。 通过这些适配器, Spring 集成不仅支持普通的 JDBC SQL 查询,还支持存储过程和存储函数调用。
您需要将此依赖项包含在您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-jdbc:6.0.9"
默认情况下,以下 JDBC 组件可用:
Spring 集成 JDBC 模块还提供了一个 JDBC 消息存储。
入站通道适配器
入站通道适配器的主要功能是执行 SQLSELECT
query 并将结果集转换为 message。
消息有效负载是整个结果集(表示为List
),并且列表中的项的类型取决于行映射策略。
默认策略是一个通用映射器,它返回一个Map
对于查询结果中的每一行。
(可选)您可以通过添加对RowMapper
实例(有关行映射的更多详细信息,请参阅 Spring JDBC 文档)。
如果要转换SELECT query result 添加到单个消息中,您可以使用下游拆分器。 |
入站适配器还需要对JdbcTemplate
实例或DataSource
.
以及SELECT
语句来生成消息,则适配器还具有UPDATE
将记录标记为已处理的语句,以便它们不会显示在下一次轮询中。
更新可以通过原始 select 中的 ID 列表进行参数化。
默认情况下,这是通过命名约定(输入结果集中名为id
将转换为名为id
).
以下示例定义了一个入站通道适配器,其中包含一个 update 查询和一个DataSource
参考。
<int-jdbc:inbound-channel-adapter query="select * from item where status=2"
channel="target" data-source="dataSource"
update="update item set status=10 where id in (:id)" />
更新查询中的参数使用冒号 (: ) 前缀添加到参数名称中(在前面的示例中,该参数是要应用于轮询结果集中的每一行的表达式)。
这是 Spring JDBC 中命名参数 JDBC 支持的标准功能,与 Spring 集成中采用的约定(投影到轮询结果列表)相结合。
Spring 的基础 JDBC 功能限制了可用的表达式(例如,除句点以外的大多数特殊字符都是不允许的),但是由于目标通常是可通过 bean 路径寻址的对象列表(可能是一的列表),因此并没有过度限制。 |
要更改参数生成策略,您可以注入SqlParameterSourceFactory
到适配器中以覆盖默认行为(适配器具有sql-parameter-source-factory
属性)。
Spring 集成提供了ExpressionEvaluatingSqlParameterSourceFactory
,这将创建一个基于 SPEL 的参数源,并将查询结果作为#root
对象。
(如果update-per-row
为 true,则根对象为该行)。
如果相同的参数名称在 update 查询中多次出现,则仅计算一次,并缓存其结果。
您还可以将参数源用于 select 查询。 在这种情况下,由于没有要评估的 “result” 对象,因此每次都使用单个参数源(而不是使用参数源工厂)。 从版本 4.0 开始,你可以使用 Spring 创建基于 SPEL 的参数源,如下例所示:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="status" value="@statusBean.which()" />
</map>
</property>
</bean>
<bean id="statusBean" class="foo.StatusDetermination" />
这value
在每个参数表达式中可以是任何有效的 SpEL 表达式。
这#root
object 是parameterSource
豆。
对于所有评估,它都是静态的(在前面的示例中,一个空的String
).
从版本 5.0 开始,您可以供应ExpressionEvaluatingSqlParameterSourceFactory
跟sqlParameterTypes
为特定参数指定目标 SQL 类型。
以下示例为查询中使用的参数提供了 SQL 类型:
<int-jdbc:inbound-channel-adapter query="select * from item where status=:status"
channel="target" data-source="dataSource"
select-sql-parameter-source="parameterSource" />
<bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<constructor-arg value="" />
</bean>
<bean id="parameterSourceFactory"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="sqlParameterTypes">
<map>
<entry key="status" value="#{ T(java.sql.Types).BINARY}" />
</map>
</property>
</bean>
使用createParameterSourceNoCache Factory 方法。
否则,参数源将缓存计算结果。
另请注意,由于禁用了缓存,因此如果相同的参数名称多次出现在 select 查询中,则会针对每次出现的情况重新评估该名称。 |
轮询和事务
入站适配器接受常规的 Spring 集成 Poller 作为子元素。 因此,可以控制轮询的频率(以及其他用途)。 用于 JDBC 的 Poller 的一个重要功能是可以选择将 poll作包装在事务中,如下例所示:
<int-jdbc:inbound-channel-adapter query="..."
channel="target" data-source="dataSource" update="...">
<int:poller fixed-rate="1000">
<int:transactional/>
</int:poller>
</int-jdbc:inbound-channel-adapter>
如果未显式指定 Poller,则使用默认值。 与 Spring Integration 一样,它可以被定义为顶级 bean)。 |
在前面的示例中,数据库每 1000 毫秒轮询一次(或每秒轮询一次),并且 update 和 select 查询都在同一事务中执行。 未显示事务管理器配置。 但是,只要它知道数据源,轮询就是事务性的。 一个常见的用例是将 downstream channels 作为 direct channels(默认),以便在同一个线程中调用端点,从而在同一个 transaction 中调用。 这样,如果其中任何一个失败,事务将回滚,并且 Importing 数据将恢复到其原始状态。
max-rows
对max-messages-per-poll
JDBC 入站通道适配器定义了一个名为max-rows
.
当你指定适配器的 Poller 时,你还可以定义一个名为max-messages-per-poll
.
虽然这两个属性看起来相似,但它们的含义却大不相同。
max-messages-per-poll
指定每个轮询间隔执行查询的次数,而max-rows
指定每次执行返回的行数。
在正常情况下,您可能不希望将 poller 的max-messages-per-poll
属性。
其默认值为1
,这意味着 JDBC 入站通道适配器的receive()
method 对于每个轮询间隔只执行一次。
设置max-messages-per-poll
属性设置为更大的值意味着查询将连续执行多次。
有关max-messages-per-poll
属性,请参见配置入站通道适配器。
相比之下,max-rows
属性,如果大于0
中,指定要从receive()
方法。
如果该属性设置为0
,则所有行都包含在生成的消息中。
该属性默认为0
.
建议通过特定于供应商的查询选项(例如 MySQL)使用结果集限制LIMIT 或 SQL ServerTOP 或 Oracle 的ROWNUM .
有关更多信息,请参阅特定供应商文档。 |
出站通道适配器
出站通道适配器是入站通道适配器的反面:它的作用是处理消息并使用它来执行 SQL 查询。 默认情况下,消息有效负载和标头可用作查询的输入参数,如下例所示:
<int-jdbc:outbound-channel-adapter
query="insert into foos (id, status, name) values (:headers[id], 0, :payload[something])"
data-source="dataSource"
channel="input"/>
在前面的示例中,到达标记为input
具有 Key 为something
,因此运算符会从 map 中取消引用该值。
标头也作为映射进行访问。[]
上述查询中的参数是传入消息上的 Bean 属性表达式(不是 SPEL 表达式)。
此行为是SqlParameterSource ,这是出站适配器创建的默认源。
您可以注入不同的SqlParameterSourceFactory 以获得不同的行为。 |
出站适配器需要引用DataSource
或JdbcTemplate
.
您还可以注入SqlParameterSourceFactory
控制每个传入消息与查询的绑定。
如果 input 通道是直接通道,则出站适配器在与消息发送方相同的线程中运行其查询,因此,与消息的发送者相同的事务(如果有的话)。
使用 SPEL 表达式传递参数
大多数 JDBC 通道适配器的常见要求是将参数作为 SQL 查询或存储过程或函数的一部分进行传递。
如前所述,默认情况下,这些参数是 Bean 属性表达式,而不是 SPEL 表达式。
但是,如果需要将 SpEL 表达式作为参数传递,则必须显式注入SqlParameterSourceFactory
.
以下示例使用ExpressionEvaluatingSqlParameterSourceFactory
要达到该要求,请执行以下作:
<jdbc:outbound-channel-adapter data-source="dataSource" channel="input"
query="insert into MESSAGES (MESSAGE_ID,PAYLOAD,CREATED_DATE) values (:id, :payload, :createdDate)"
sql-parameter-source-factory="spelSource"/>
<bean id="spelSource"
class="o.s.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<property name="parameterExpressions">
<map>
<entry key="id" value="headers['id'].toString()"/>
<entry key="createdDate" value="new java.util.Date()"/>
<entry key="payload" value="payload"/>
</map>
</property>
</bean>
有关详细信息,请参阅 定义参数源。
使用PreparedStatement
回调
有时,的SqlParameterSourceFactory
没有做我们需要的目标PreparedStatement
或者我们需要做一些低级的 JDBC 工作。
Spring JDBC 模块提供了 API 来配置执行环境(例如ConnectionCallback
或PreparedStatementCreator
) 和作参数值(例如SqlParameterSource
).
它甚至可以访问用于低级作的 API,例如StatementCallback
.
从 Spring Integration 4.2 开始,MessagePreparedStatementSetter
允许在PreparedStatement
manually,在requestMessage
上下文。
该类的作用与PreparedStatementSetter
在标准 Spring JDBC API 中。
实际上,它是直接从内联PreparedStatementSetter
implementation 时,JdbcMessageHandler
调用execute
在JdbcTemplate
.
此功能接口选项与sqlParameterSourceFactory
,并且可以用作更强大的替代方案来填充PreparedStatement
从requestMessage
.
例如,当我们需要存储File
data 到 DataBaseBLOB
列。
以下示例显示了如何执行此作:
@Bean
@ServiceActivator(inputChannel = "storeFileChannel")
public MessageHandler jdbcMessageHandler(DataSource dataSource) {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(dataSource,
"INSERT INTO imagedb (image_name, content, description) VALUES (?, ?, ?)");
jdbcMessageHandler.setPreparedStatementSetter((ps, m) -> {
ps.setString(1, m.getHeaders().get(FileHeaders.FILENAME));
try (FileInputStream inputStream = new FileInputStream((File) m.getPayload()); ) {
ps.setBlob(2, inputStream);
}
catch (Exception e) {
throw new MessageHandlingException(m, e);
}
ps.setClob(3, new StringReader(m.getHeaders().get("description", String.class)));
});
return jdbcMessageHandler;
}
从 XML 配置的角度来看,prepared-statement-setter
属性在<int-jdbc:outbound-channel-adapter>
元件。
它允许您指定一个MessagePreparedStatementSetter
bean 引用。
批量更新
从版本 5.1 开始,JdbcMessageHandler
执行JdbcOperations.batchUpdate()
如果请求消息的有效负载是Iterable
实例。
每个元素的Iterable
包装到Message
替换为请求消息中的标头(如果此类元素不是Message
已经。
在常规SqlParameterSourceFactory
的配置,这些消息用于构建SqlParameterSource[]
对于上述JdbcOperations.batchUpdate()
功能。
当MessagePreparedStatementSetter
配置时,会生成一个BatchPreparedStatementSetter
variant 用于遍历每个项目的这些消息,并且提供的MessagePreparedStatementSetter
被召来反对他们。
在以下情况下不支持批量更新keysGenerated
模式。
出站网关
出站网关就像出站和入站适配器的组合:它的作用是处理消息并使用它来执行 SQL 查询,然后通过将其发送到回复通道来响应结果。 默认情况下,消息有效负载和标头可用作查询的输入参数,如下例所示:
<int-jdbc:outbound-gateway
update="insert into mythings (id, status, name) values (:headers[id], 0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource" />
前面示例的结果是将一条记录插入到mythings
table 并返回一条消息,指示受影响的行数(有效负载是一个 Map:{UPDATED=1}
) 拖动到 output channel 。
如果更新查询是带有自动生成的键的插入,则可以通过在keys-generated="true"
添加到前面的示例(这不是默认值,因为某些数据库平台不支持它)。
以下示例显示了更改后的配置:
<int-jdbc:outbound-gateway
update="insert into mythings (status, name) values (0, :payload[thing])"
request-channel="input" reply-channel="output" data-source="dataSource"
keys-generated="true"/>
除了更新计数或生成的密钥,您还可以提供 select 查询来执行并从结果(例如入站适配器)生成回复消息,如下例所示:
<int-jdbc:outbound-gateway
update="insert into foos (id, status, name) values (:headers[id], 0, :payload[foo])"
query="select * from foos where id=:headers[$id]"
request-channel="input" reply-channel="output" data-source="dataSource"/>
从 Spring Integration 2.2 开始,更新 SQL 查询不再是必需的。
现在,您可以使用query
属性或query
元素。
如果您需要使用通用网关或有效负载扩充器等方式主动检索数据,这将非常有用。
然后从结果生成回复消息(类似于入站适配器的工作方式)并传递给回复通道。
以下示例演示如何使用query
属性:
<int-jdbc:outbound-gateway
query="select * from foos where id=:headers[id]"
request-channel="input"
reply-channel="output"
data-source="dataSource"/>
默认情况下, |
与通道适配器一样,您还可以提供SqlParameterSourceFactory
请求和回复的实例。
默认值与出站适配器相同,因此请求消息可用作表达式的根。
如果keys-generated="true"
,表达式的根是生成的键(如果只有一个 map,则为 map,如果为多值,则为 map 列表)。
出站网关需要引用DataSource
或JdbcTemplate
.
它还可以具有SqlParameterSourceFactory
injected 来控制传入消息与查询的绑定。
从版本 4.2 开始,request-prepared-statement-setter
属性在<int-jdbc:outbound-gateway>
作为request-sql-parameter-source-factory
.
它允许您指定一个MessagePreparedStatementSetter
Bean 引用,它实现了更复杂的PreparedStatement
执行前的准备工作。
从版本 6.0 开始,JdbcOutboundGateway
按原样返回空列表结果,而不是将其转换为null
和以前一样,意思是 “没有回答”。
这会导致在处理空列表是 downstream logic一部分的应用程序中进行额外的配置。
请参阅 Splitter Discard Channel 了解可能的空列表处理选项。
请参阅 出站通道适配器 以了解有关MessagePreparedStatementSetter
.
JDBC 消息存储
Spring 集成提供了两种特定于 JDBC 的消息存储实现。
这JdbcMessageStore
适合与聚合商和 Claim Check 模式一起使用。
这JdbcChannelMessageStore
implementation 专门为 Message Channel 提供了更具针对性和可扩展性的实现。
请注意,您可以使用JdbcMessageStore
要支持消息通道,JdbcChannelMessageStore
为此目的进行了优化。
从版本 5.0.11、5.1.2 开始,JdbcChannelMessageStore 已优化。
如果您在此类存储中有大型消息组,则可能需要更改索引。
此外,的PriorityChannel 被注释掉,因为除非您使用的是 JDBC 支持的此类通道,否则不需要它。 |
使用OracleChannelMessageStoreQueryProvider ,则必须添加优先级通道索引,因为它包含在查询的 hint 中。 |
初始化数据库
在开始使用 JDBC 消息存储组件之前,您应该为目标数据库配置适当的对象。
Spring 集成附带了一些可用于初始化数据库的示例脚本。
在spring-integration-jdbc
JAR 文件中,您可以在org.springframework.integration.jdbc
包。
它为一系列常见数据库平台提供了示例 create 和 example drop 脚本。
使用这些脚本的常见方法是在 Spring JDBC 数据源初始化器中引用它们。
请注意,这些脚本作为示例以及所需表和列名称的规范提供。
您可能会发现需要增强它们以供 生产环境 使用(例如,通过添加索引声明)。
通用 JDBC 消息存储
JDBC 模块提供了 Spring 集成的实现MessageStore
(在索赔检查模式中很重要)和MessageGroupStore
(在 Aggregator 等有状态模式中很重要)由数据库提供支持。
这两个接口都是由JdbcMessageStore
,并且支持在 XML 中配置存储实例,如下例所示:
<int-jdbc:message-store id="messageStore" data-source="dataSource"/>
您可以指定JdbcTemplate
而不是DataSource
.
以下示例显示了一些其他可选属性:
<int-jdbc:message-store id="messageStore" data-source="dataSource"
lob-handler="lobHandler" table-prefix="MY_INT_"/>
在前面的示例中,我们指定了LobHandler
用于将消息作为大型对象处理(这通常对 Oracle 是必需的)和存储生成的查询中的表名的前缀。
表名前缀默认为INT_
.
后备消息通道
如果您打算使用 JDBC 返回消息通道,我们建议使用JdbcChannelMessageStore
实现。
它只能与 Message Channel 结合使用。
支持的数据库
这JdbcChannelMessageStore
使用特定于数据库的 SQL 查询从数据库中检索消息。
因此,您必须将ChannelMessageStoreQueryProvider
属性JdbcChannelMessageStore
.
这channelMessageStoreQueryProvider
提供您指定的特定数据库的 SQL 查询。
Spring 集成为以下关系数据库提供支持:
-
PostgreSQL 数据库
-
HSQLDB 数据库
-
MySQL (MySQL的
-
神谕
-
德比
-
H2 系列
-
SqlServer 服务器
-
Sybase
-
DB2
如果您的数据库未列出,则可以扩展AbstractChannelMessageStoreQueryProvider
类并提供您自己的自定义查询。
版本 4.0 添加了MESSAGE_SEQUENCE
列添加到表中,以确保先进先出 (FIFO) 排队,即使消息以相同的毫秒数存储也是如此。
自定义消息插入
从 5.0 版本开始,通过重载ChannelMessageStorePreparedStatementSetter
类中,您可以在JdbcChannelMessageStore
.
您可以使用它来设置不同的列或更改表结构或序列化策略。
例如,而不是默认序列化为byte[]
,您可以将其结构存储为 JSON 字符串。
以下示例使用setValues
来存储公共列,并覆盖将消息负载存储为varchar
:
public class JsonPreparedStatementSetter extends ChannelMessageStorePreparedStatementSetter {
@Override
public void setValues(PreparedStatement preparedStatement, Message<?> requestMessage,
Object groupId, String region, boolean priorityEnabled) throws SQLException {
// Populate common columns
super.setValues(preparedStatement, requestMessage, groupId, region, priorityEnabled);
// Store message payload as varchar
preparedStatement.setString(6, requestMessage.getPayload().toString());
}
}
通常,我们不建议使用关系数据库进行排队。 相反,如果可能,请考虑改用 JMS 或 AMQP 支持的通道。 有关进一步参考,请参阅以下资源: 如果您仍计划将数据库用作队列,请考虑使用 PostgreSQL 及其通知机制,这将在后续部分中介绍。 |
并发轮询
轮询消息通道时,您可以选择配置关联的Poller
替换为TaskExecutor
参考。
但请记住,如果您使用 JDBC 支持的消息通道,并且计划使用多个线程对通道进行事务性轮询,从而轮询消息存储,则应确保使用支持多版本并发控制 (MVCC) 的关系数据库。 否则,锁定可能是一个问题,并且在使用多个线程时,性能可能无法按预期实现。 例如,Apache Derby 在这方面是有问题的。 实现更好的 JDBC 队列吞吐量并避免不同线程可能轮询相同的
|
优先通道
从版本 4.0 开始,JdbcChannelMessageStore
实现PriorityCapableChannelMessageStore
并提供priorityEnabled
选项,让它用作message-store
参考priority-queue
实例。
为此,INT_CHANNEL_MESSAGE
table 的MESSAGE_PRIORITY
column 来存储PRIORITY
消息标头。
此外,新的MESSAGE_SEQUENCE
column 让我们实现强大的先进先出 (FIFO) 轮询机制,即使在同一毫秒内以相同的优先级存储多条消息也是如此。
消息从数据库中轮询(选择)order by MESSAGE_PRIORITY DESC NULLS LAST, CREATED_DATE, MESSAGE_SEQUENCE
.
我们不建议使用相同的JdbcChannelMessageStore bean 用于优先级和非优先级队列通道,因为priorityEnabled 选项应用于整个存储,并且不会为队列通道保留正确的 FIFO 队列语义。
然而,同样的INT_CHANNEL_MESSAGE 表(甚至region ) 可用于JdbcChannelMessageStore 类型。
要配置该场景,可以从一个消息存储 Bean 扩展另一个消息存储 Bean,如下例所示: |
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
对消息存储进行分区
通常使用JdbcMessageStore
作为一组应用程序或同一应用程序中的节点的全局存储。
为了提供一些防止名称冲突的保护并控制数据库元数据配置,消息存储允许以两种方式对表进行分区。
一种方法是通过更改前缀(如前所述)来使用单独的表名。
另一种方法是指定一个region
name 用于对单个表中的数据进行分区。
第二种方法的一个重要用例是MessageStore
正在管理支持 Spring 集成消息通道的持久队列。
持久通道的消息数据在存储区中的通道名称上键入。
因此,如果通道名称不是全局唯一的,则通道可以选取并非适用于它们的数据。
为避免此危险,您可以使用消息存储region
为具有相同逻辑名称的不同物理通道保持数据独立。
PostgreSQL:接收推送通知
PostgreSQL 提供了一个侦听和通知框架,用于在数据库表作时接收推送通知。
Spring 集成利用这种机制(从版本 6.0 开始)来允许在将新消息添加到JdbcChannelMessageStore
.
使用此功能时,必须定义一个数据库触发器,该触发器可以作为schema-postgresql.sql
文件,该文件包含在 Spring 集成的 JDBC 模块中。
推送通知通过PostgresChannelMessageTableSubscriber
类,它允许其订阅者在到达任何给定的新消息时接收回调region
和groupId
.
即使消息附加到不同的 JVM 上,但附加到同一个数据库,也会收到这些通知。
这PostgresSubscribableChannel
implementation 使用PostgresChannelMessageTableSubscriber.Subscription
Contract 从 store 中提取消息,作为对来自上述PostgresChannelMessageTableSubscriber
通知。
例如,推送通知some group
可以按以下方式接收:
@Bean
public JdbcChannelMessageStore messageStore(DataSource dataSource) {
JdbcChannelMessageStore messageStore = new JdbcChannelMessageStore(dataSource);
messageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return messageStore;
}
@Bean
public PostgresChannelMessageTableSubscriber subscriber(
@Value("${spring.datasource.url}") String url,
@Value("${spring.datasource.username}") String username,
@Value("${spring.datasource.password}") String password) {
return new PostgresChannelMessageTableSubscriber(() ->
DriverManager.getConnection(url, username, password).unwrap(PgConnection.class));
}
@Bean
public PostgresSubscribableChannel channel(
PostgresChannelMessageTableSubscriber subscriber,
JdbcChannelMessageStore messageStore) {
return new PostgresSubscribableChannel(messageStore, "some group", subscriber);
}
事务支持
从版本 6.0.5 开始,指定PlatformTransactionManager
在PostgresSubscribableChannel
将在交易中通知订阅者。
订户中的异常将导致事务回滚,并将消息放回消息存储中。
默认情况下,事务支持未激活。
重试
从版本 6.0.5 开始,可以通过提供RetryTemplate
到PostgresSubscribableChannel
.
默认情况下,不执行重试。
任何活动 对于独占连接的这种需求,还建议 JVM 只运行单个 |
存储过程
在某些情况下,普通的 JDBC 支持是不够的。 也许您处理的是旧式关系数据库架构,或者您有复杂的数据处理需求,但最终您必须使用存储过程或存储函数。 从 Spring Integration 2.1 开始,我们提供了三个组件来执行存储过程或存储函数:
-
存储过程入站通道适配器
-
存储过程出站通道适配器
-
存储过程出站网关
支持的数据库
为了启用对存储过程和存储函数的调用,存储过程组件使用org.springframework.jdbc.core.simple.SimpleJdbcCall
类。
因此,完全支持以下数据库执行存储过程:
-
阿帕奇德比
-
DB2
-
MySQL (MySQL的
-
Microsoft SQL 服务器
-
神谕
-
PostgreSQL 数据库
-
Sybase
如果您想改为执行存储函数,则完全支持以下数据库:
-
MySQL (MySQL的
-
Microsoft SQL 服务器
-
神谕
-
PostgreSQL 数据库
即使您的特定数据库可能没有得到完全支持,只要您的 RDBMS 支持存储过程或存储函数,您很可能可以非常成功地使用存储过程 Spring Integration 组件。 事实上,一些提供的集成测试使用 H2 数据库。 尽管如此,全面测试这些使用场景非常重要。 |
常见配置属性
所有存储过程组件共享某些配置参数:
-
auto-startup
:生命周期属性,指示是否应在应用程序上下文启动期间启动此组件。 它默认为true
. 自选。 -
data-source
:对javax.sql.DataSource
,用于访问数据库。 必填。 -
id
:标识底层 Spring Bean 定义,它是EventDrivenConsumer
或PollingConsumer
,具体取决于出站通道适配器的channel
属性引用SubscribableChannel
或PollableChannel
. 自选。 -
ignore-column-meta-data
:对于完全支持的数据库,底层SimpleJdbcCall
class 可以从 JDBC 元数据中自动检索存储过程或存储函数的参数信息。但是,如果数据库不支持元数据查找,或者您需要提供自定义参数定义,则可以将此标志设置为
true
. 它默认为false
. 自选。 -
is-function
:如果true
时,将调用 SQL 函数。 在这种情况下,stored-procedure-name
或stored-procedure-name-expression
attributes 定义被调用函数的名称。 它默认为false
. 自选。 -
stored-procedure-name
:此属性指定存储过程的名称。 如果is-function
属性设置为true
,则此属性指定函数名称。 此属性或stored-procedure-name-expression
必须指定。 -
stored-procedure-name-expression
:此属性使用 SpEL 表达式指定存储过程的名称。 通过使用 SPEL,您可以访问完整的消息(如果可用),包括其 Headers 和有效负载。 您可以使用此属性在运行时调用不同的存储过程。 例如,您可以提供要作为消息标头执行的存储过程名称。 表达式必须解析为String
.如果
is-function
属性设置为true
,则此属性指定存储函数。 此属性或stored-procedure-name
必须指定。 -
jdbc-call-operations-cache-size
:定义缓存的最大数量SimpleJdbcCallOperations
实例。 基本上,对于每个存储过程名称,一个新的SimpleJdbcCallOperations
实例,作为回报,该实例将被缓存。Spring Integration 2.2 添加了 stored-procedure-name-expression
属性和jdbc-call-operations-cache-size
属性。默认缓存大小为
10
. 值0
禁用缓存。 不允许使用负值。如果启用 JMX,则有关
jdbc-call-operations-cache
作为 MBean 公开。 有关更多信息,请参阅 MBean Exporter 。 -
sql-parameter-source-factory
:(不适用于存储过程入站通道适配器。 对SqlParameterSourceFactory
. 默认情况下,传入的Message
payload 用作存储过程输入参数的源,方法是使用BeanPropertySqlParameterSourceFactory
.对于基本用例,这可能就足够了。 对于更复杂的选项,请考虑传入一个或多个
ProcedureParameter
值。 请参阅定义参数源。 自选。 -
use-payload-as-parameter-source
:(不适用于存储过程入站通道适配器。 如果设置为true
,则Message
用作提供参数的源。 如果设置为false
,但是,整个Message
可用作参数的源。如果未传入过程参数,则此属性默认为
true
. 这意味着,通过使用默认的BeanPropertySqlParameterSourceFactory
中,有效负载的 Bean 属性将用作存储过程或存储函数的参数值的源。但是,如果传入了过程参数,则此属性(默认情况下)的计算结果为
false
.ProcedureParameter
允许提供 SpEL 表达式。 因此,访问整个Message
. 该属性在底层StoredProcExecutor
. 自选。
常见配置子元素
存储过程组件共享一组通用的子元素,您可以使用这些子元素来定义参数并将其传递给存储过程或存储函数。 以下元素可用:
-
parameter
-
returning-resultset
-
sql-parameter-definition
-
poller
-
parameter
:提供存储过程参数的机制。 参数可以是静态的,也可以是使用 SPEL 表达式提供的。<int-jdbc:parameter name="" (1) type="" (2) value=""/> (3) <int-jdbc:parameter name="" expression=""/> (4)
+ <1> 要传递到存储过程或存储函数中的参数的名称。 必填。 <2> 此属性指定值的类型。 如果未提供任何内容,则此属性默认为
java.lang.String
. 仅当value
属性。 自选。 <3> 参数的值。 您必须提供此属性或expression
属性。 自选。 <4>而不是value
属性,您可以指定一个 SPEL 表达式来传递参数的值。 如果指定expression
这value
属性。 自选。自选。
-
returning-resultset
:存储过程可能会返回多个结果集。 通过设置一个或多个returning-resultset
元素中,您可以指定RowMappers
以转换每个返回的ResultSet
转换为有意义的对象。 自选。<int-jdbc:returning-resultset name="" row-mapper="" />
-
sql-parameter-definition
:如果使用完全支持的数据库,则通常不必指定存储过程参数定义。 相反,这些参数可以自动从 JDBC 元数据派生。 但是,如果您使用的数据库不完全受支持,则必须使用sql-parameter-definition
元素。您还可以选择关闭对通过 JDBC 获取的参数元数据信息的任何处理,方法是使用
ignore-column-meta-data
属性。<int-jdbc:sql-parameter-definition name="" (1) direction="IN" (2) type="STRING" (3) scale="5" (4) type-name="FOO_STRUCT" (5) return-type="fooSqlReturnType"/> (6)
1 指定 SQL 参数的名称。 必填。 2 指定 SQL 参数定义的方向。 默认为 IN
. 有效值为:IN
,OUT
和INOUT
. 如果您的过程返回结果集,请使用returning-resultset
元素。 自选。3 用于此 SQL 参数定义的 SQL 类型。 转换为整数值,定义如下 java.sql.Types
. 或者,您也可以提供整数值。 如果未明确设置此属性,则默认为 'VARCHAR'。 自选。4 SQL 参数的比例。 仅用于数字和十进制参数。 自选。 5 这 typeName
对于用户命名的类型,例如:STRUCT
,DISTINCT
,JAVA_OBJECT
和命名数组类型。 此属性与scale
属性。 自选。6 对复杂类型的自定义值处理程序的引用。 的实现 SqlReturnType
. 此属性与scale
属性,仅适用于 OUT 和 INOUT 参数。 自选。 -
poller
:允许您配置消息轮询器(如果此端点是PollingConsumer
. 自选。
定义参数源
参数源控制检索 Spring 集成消息属性并将其映射到相关存储过程输入参数的技术。
存储过程组件遵循某些规则。
默认情况下,Message
payload 用作存储过程的输入参数的源。
在这种情况下,一个BeanPropertySqlParameterSourceFactory
被使用。
对于基本用例,这可能就足够了。
下一个示例说明了该默认行为。
要使用BeanPropertySqlParameterSourceFactory 要工作,您的 Bean 属性必须以小写形式定义。
这是因为在org.springframework.jdbc.core.metadata.CallMetaDataContext (Java 方法是matchInParameterValuesWithCallParameters() ),则检索到的存储过程参数声明将转换为小写。
因此,如果您有驼峰式 bean 属性(例如lastName ),则查找失败。
在这种情况下,请提供显式的ProcedureParameter . |
假设我们有一个有效负载,它由一个具有以下三个属性的简单 bean 组成:id
,name
和description
.
此外,我们有一个名为INSERT_COFFEE
接受三个 input 参数:id
,name
和description
.
我们还使用完全支持的数据库。
在这种情况下,存储过程出站适配器的以下配置就足够了:
<int-jdbc:stored-proc-outbound-channel-adapter data-source="dataSource"
channel="insertCoffeeProcedureRequestChannel"
stored-procedure-name="INSERT_COFFEE"/>
对于更复杂的选项,请考虑传入一个或多个ProcedureParameter
值。
如果您提供ProcedureParameter
值显式地显示,默认情况下,ExpressionEvaluatingSqlParameterSourceFactory
用于参数处理,以启用 SpEL 表达式的全部功能。
如果您需要对参数的检索方式进行更多控制,请考虑传入SqlParameterSourceFactory
通过使用sql-parameter-source-factory
属性。
存储过程入站通道适配器
下面的清单列出了对存储过程入站通道适配器很重要的属性:
<int-jdbc:stored-proc-inbound-channel-adapter
channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
skip-undeclared-results="" (2)
return-value-required="false" (3)
<int:poller/>
<int-jdbc:sql-parameter-definition name="" direction="IN"
type="STRING"
scale=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
</int-jdbc:stored-proc-inbound-channel-adapter>
1 | 轮询消息发送到的通道。
如果存储过程或函数未返回任何数据,则Message 为 null。
必填。 |
2 | 如果此属性设置为true ,则存储过程调用的所有结果都没有相应的SqlOutParameter 声明。
例如,存储过程可以返回更新计数值,即使存储过程仅声明一个 result 参数。
确切的行为取决于数据库实现。
该值在底层JdbcTemplate .
该值默认为true .
自选。 |
3 | 指示是否应包含此过程的返回值。 从 Spring Integration 3.0 开始。 自选。 |
存储过程出站通道适配器
下面的清单列出了对存储过程出站通道适配器很重要的属性:
<int-jdbc:stored-proc-outbound-channel-adapter channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
order="" (2)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int:poller fixed-rate=""/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name=""/>
</int-jdbc:stored-proc-outbound-channel-adapter>
1 | 此终端节点的接收消息通道。 必填。 |
2 | 指定此终端节点作为订阅者连接到通道时的调用顺序。
当该通道使用failover 调度策略。
当此终端节点本身是具有队列的通道的轮询使用者时,它不起作用。
自选。 |
存储过程出站网关
下面的清单列出了对存储过程出站通道适配器很重要的属性:
<int-jdbc:stored-proc-outbound-gateway request-channel="" (1)
stored-procedure-name=""
data-source=""
auto-startup="true"
id=""
ignore-column-meta-data="false"
is-function="false"
order=""
reply-channel="" (2)
reply-timeout="" (3)
return-value-required="false" (4)
skip-undeclared-results="" (5)
sql-parameter-source-factory=""
use-payload-as-parameter-source="">
<int-jdbc:sql-parameter-definition name="" direction="IN"
type=""
scale="10"/>
<int-jdbc:sql-parameter-definition name=""/>
<int-jdbc:parameter name="" type="" value=""/>
<int-jdbc:parameter name="" expression=""/>
<int-jdbc:returning-resultset name="" row-mapper="" />
1 | 此终端节点的接收消息通道。 必填。 |
2 | 收到数据库响应后应将回复发送到的消息通道。 自选。 |
3 | 允许您指定此网关在引发异常之前等待成功发送回复消息的时间。
请记住,当发送到DirectChannel ,则调用发生在发送方的线程中。
因此,send作的失败可能是由下游的其他组件引起的。
默认情况下,网关无限期等待。
该值以毫秒为单位指定。
自选。 |
4 | 指示是否应包含此过程的返回值。 自选。 |
5 | 如果skip-undeclared-results 属性设置为true ,则存储过程调用的所有结果都没有相应的SqlOutParameter 声明。
例如,存储过程可能会返回更新计数值,即使存储过程只声明了一个 result 参数。
确切的行为取决于数据库。
该值在底层JdbcTemplate .
该值默认为true .
自选。 |
例子
本节包含两个调用 Apache Derby 存储过程的示例。
第一个过程调用一个存储过程,该存储过程返回一个ResultSet
.
通过使用RowMapper
,数据将转换为域对象,然后成为 Spring Integration 消息有效负载。
在第二个示例中,我们调用了一个存储过程,该过程使用输出参数来返回数据。
该项目包含此处引用的 Apache Derby 示例,以及有关如何运行它的说明。 Spring Integration Samples 项目还提供了一个使用 Oracle 存储过程的示例。 |
在第一个示例中,我们将名为FIND_ALL_COFFEE_BEVERAGES
,它不定义任何输入参数,但返回一个ResultSet
.
在 Apache Derby 中,存储过程是用 Java 实现的。 下面的清单显示了方法签名:
public static void findAllCoffeeBeverages(ResultSet[] coffeeBeverages)
throws SQLException {
...
}
下面的清单显示了相应的 SQL:
CREATE PROCEDURE FIND_ALL_COFFEE_BEVERAGES() \
PARAMETER STYLE JAVA LANGUAGE JAVA MODIFIES SQL DATA DYNAMIC RESULT SETS 1 \
EXTERNAL NAME 'o.s.i.jdbc.storedproc.derby.DerbyStoredProcedures.findAllCoffeeBeverages';
在 Spring Integration 中,您现在可以通过使用stored-proc-outbound-gateway
,如下例所示:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-all"
data-source="dataSource"
request-channel="findAllProcedureRequestChannel"
expect-single-result="true"
stored-procedure-name="FIND_ALL_COFFEE_BEVERAGES">
<int-jdbc:returning-resultset name="coffeeBeverages"
row-mapper="org.springframework.integration.support.CoffeBeverageMapper"/>
</int-jdbc:stored-proc-outbound-gateway>
在第二个示例中,我们将名为FIND_COFFEE
,该参数具有一个 input 参数。
而不是返回ResultSet
,则它使用 output 参数。
以下示例显示了方法签名:
public static void findCoffee(int coffeeId, String[] coffeeDescription)
throws SQLException {
...
}
下面的清单显示了相应的 SQL:
CREATE PROCEDURE FIND_COFFEE(IN ID INTEGER, OUT COFFEE_DESCRIPTION VARCHAR(200)) \
PARAMETER STYLE JAVA LANGUAGE JAVA EXTERNAL NAME \
'org.springframework.integration.jdbc.storedproc.derby.DerbyStoredProcedures.findCoffee';
在 Spring 集成中,您现在可以通过使用stored-proc-outbound-gateway
,如下例所示:
<int-jdbc:stored-proc-outbound-gateway id="outbound-gateway-storedproc-find-coffee"
data-source="dataSource"
request-channel="findCoffeeProcedureRequestChannel"
skip-undeclared-results="true"
stored-procedure-name="FIND_COFFEE"
expect-single-result="true">
<int-jdbc:parameter name="ID" expression="payload" />
</int-jdbc:stored-proc-outbound-gateway>
JDBC Lock Registry
版本 4.3 引入了JdbcLockRegistry
.
某些组件(例如,aggregator 和 resequencer)使用从LockRegistry
实例来确保一次只有一个线程作一个组。
这DefaultLockRegistry
在单个组件中执行此功能。
现在,您可以在这些组件上配置外部锁注册表。
当与共享的MessageGroupStore
中,您可以使用JdbcLockRegistry
跨多个应用程序实例提供此功能,以便一次只有一个实例可以作组。
当本地线程释放锁时,另一个本地线程通常可以立即获取该锁。 如果锁由使用其他注册表实例的线程释放,则获取锁可能需要长达 100 毫秒的时间。
这JdbcLockRegistry
基于LockRepository
abstraction 的 API 中,它有一个DefaultLockRepository
实现。
数据库架构脚本位于org.springframework.integration.jdbc
软件包,该软件包针对特定的 RDBMS 供应商进行划分。
例如,下面的清单显示了 lock table 的 H2 DDL:
CREATE TABLE INT_LOCK (
LOCK_KEY CHAR(36),
REGION VARCHAR(100),
CLIENT_ID CHAR(36),
CREATED_DATE TIMESTAMP NOT NULL,
constraint INT_LOCK_PK primary key (LOCK_KEY, REGION)
);
这INT_
可以根据目标数据库设计要求进行更改。
因此,您必须使用prefix
属性DefaultLockRepository
bean 定义。
有时,一个应用程序已移动到无法释放分布式锁并删除数据库中的特定记录的状态。
为此,此类死锁可以在下一次锁定调用时由其他应用程序过期。
这timeToLive
(TTL) 选项DefaultLockRepository
为此目的而提供。
您可能还需要指定CLIENT_ID
对于为给定DefaultLockRepository
实例。
如果是这样,您可以指定id
以与DefaultLockRepository
作为构造函数参数。
从版本 5.1.8 开始,JdbcLockRegistry
可以使用idleBetweenTries
-一个Duration
在锁定记录插入/更新执行之间休眠。
默认情况下,它是100
毫秒,并且在某些环境中,非领导者会过于频繁地污染与数据源的连接。
从版本 5.4 开始,RenewableLockRegistry
interface 已引入并添加到JdbcLockRegistry
.
这renewLock()
方法必须在锁定的进程期间调用,以防锁定的进程比锁定的生存时间长。
因此,生存时间可以大大缩短,部署可以快速重新获得丢失的锁。
仅当锁由当前线程持有时,才能进行锁续订。 |
String 中,使用JdbcLockRegistry
支持自动清理 JdbcLock 的缓存JdbcLockRegistry.locks
通过JdbcLockRegistry.setCacheCapacity()
.
有关更多信息,请参阅其 JavaDocs。
String 的 6.0 版本,则DefaultLockRepository
可随附PlatformTransactionManager
而不是依赖于应用程序上下文中的主 bean。
JDBC 元数据存储
版本 5.0 引入了 JDBCMetadataStore
(请参阅 元数据存储) 实现。
您可以使用JdbcMetadataStore
在应用程序重启时保持元数据状态。
这MetadataStore
implementation 可以与适配器一起使用,例如:
要将这些适配器配置为使用JdbcMetadataStore
中,使用 Bean 名称metadataStore
.
Feed 入站通道适配器和 Feed 入站通道适配器都会自动获取并使用声明的JdbcMetadataStore
,如下例所示:
@Bean
public MetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
这org.springframework.integration.jdbc
包包含多个 RDMBS 供应商的数据库架构脚本。
例如,下面的清单显示了元数据表的 H2 DDL:
CREATE TABLE INT_METADATA_STORE (
METADATA_KEY VARCHAR(255) NOT NULL,
METADATA_VALUE VARCHAR(4000),
REGION VARCHAR(100) NOT NULL,
constraint INT_METADATA_STORE_PK primary key (METADATA_KEY, REGION)
);
您可以将INT_
前缀以匹配目标数据库设计要求。
您还可以配置JdbcMetadataStore
以使用自定义前缀。
这JdbcMetadataStore
实现ConcurrentMetadataStore
,使其在多个应用程序实例之间可靠地共享,其中只有一个实例可以存储或修改键的值。
由于交易保证,所有这些作都是原子的。
事务管理必须使用JdbcMetadataStore
.
入站通道适配器可以提供对TransactionManager
在 Poller 配置中。
与非事务性MetadataStore
implementations, 使用JdbcMetadataStore
,则只有在事务提交后,该条目才会显示在目标表中。
发生回滚时,不会向INT_METADATA_STORE
桌子。
从版本 5.0.7 开始,您可以配置JdbcMetadataStore
使用 RDBMS 供应商特定的lockHint
对元数据存储条目进行基于锁的查询的选项。
默认情况下,它是FOR UPDATE
如果目标数据库不支持行锁定功能,则可以使用空字符串进行配置。
请咨询您的供应商,以获取SELECT
expression 在更新之前锁定行。