对于最新的稳定版本,请使用 Spring Data MongoDB 4.4.0! |
会议和交易
从版本 3.6 开始,MongoDB 支持会话的概念。
使用会话可以启用 MongoDB 的因果一致性模型,该模型保证按照尊重其因果关系的顺序运行作。
这些被分为ServerSession
instances 和ClientSession
实例。
在本节中,当我们谈到会话时,我们指的是ClientSession
.
客户端会话中的作不会与会话外的作隔离。 |
双MongoOperations
和ReactiveMongoOperations
提供用于绑定ClientSession
到作。MongoCollection
和MongoDatabase
使用实现 MongoDB 的集合和数据库接口的会话代理对象,因此您无需在每次调用时添加会话。
这意味着对MongoCollection#find()
被委托给MongoCollection#find(ClientSession)
.
诸如(Reactive)MongoOperations#getCollection 返回本机 MongoDB Java Driver 网关对象(例如MongoCollection ),它们本身为ClientSession .
这些方法不是 session 代理的。
您应该提供ClientSession 在直接与MongoCollection 或MongoDatabase 而不是通过其中一个#execute 回调开启MongoOperations . |
ClientSession 支持
以下示例显示了会话的使用情况:
-
Imperative
-
Reactive
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); (1)
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); (2)
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); (3)
return azoth;
});
session.close() (4)
1 | 从服务器获取新会话。 |
2 | 用MongoOperation 方法。
这ClientSession 将自动应用。 |
3 | 确保关闭ClientSession . |
4 | 关闭会话。 |
在处理DBRef 实例,尤其是延迟加载的实例,则不要关闭ClientSession 在加载所有数据之前。
否则,延迟获取将失败。 |
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); (1)
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); (2)
});
}, ClientSession::close) (3)
.subscribe(); (4)
1 | 获取Publisher 以获取新会话检索。 |
2 | 用ReactiveMongoOperation 方法。
这ClientSession 将自动获取并应用。 |
3 | 确保关闭ClientSession . |
4 | 在您订阅之前,什么都不会发生。 有关详细信息,请参阅 Project Reactor Reference Guide。 |
通过使用Publisher
这提供了实际的会话,你可以将会话获取推迟到实际订阅的时间点。
尽管如此,您仍需要在完成后关闭会话,以免过时的会话污染服务器。
使用doFinally
挂接execute
调用ClientSession#close()
当您不再需要该会话时。
如果您希望对会话本身有更多的控制权,则可以获取ClientSession
通过驱动程序提供,并通过Supplier
.
反应性使用ClientSession 仅限于 Template API 使用。
目前没有与反应式存储库的会话集成。 |
MongoDB 事务
从版本 4 开始,MongoDB 支持 Transactions。
事务构建在 Session 之上,因此需要一个活跃的ClientSession
.
除非指定MongoTransactionManager 在您的应用程序上下文中,事务支持被禁用。
您可以使用setSessionSynchronization(ALWAYS) 参与正在进行的非原生 MongoDB 事务。 |
要对事务进行完全的编程控制,您可能需要在MongoOperations
.
以下示例显示了编程事务控制:
-
Imperative
-
Reactive
ClientSession session = client.startSession(options); (1)
template.withSession(session)
.execute(action -> {
session.startTransaction(); (2)
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); (3)
} catch (RuntimeException e) {
session.abortTransaction(); (4)
}
}, ClientSession::close) (5)
1 | 获取新的ClientSession . |
2 | 启动事务。 |
3 | 如果一切按预期进行,请提交更改。 |
4 | 有些东西坏了,所以回滚所有内容。 |
5 | 完成后不要忘记关闭会话。 |
前面的示例允许您在使用会话范围的MongoOperations
实例,以确保将会话传递给每个服务器调用。
为了避免此方法带来的一些开销,您可以使用TransactionTemplate
以消除一些手动交易流的噪音。
Mono<DeleteResult> result = Mono
.from(client.startSession()) (1)
.flatMap(session -> {
session.startTransaction(); (2)
return Mono.from(collection.deleteMany(session, ...)) (3)
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) (4)
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) (5)
.doFinally(signal -> session.close()); (6)
});
1 | 首先,我们显然需要启动会话。 |
2 | 一旦我们有了ClientSession 在手头,开始交易。 |
3 | 通过在事务中传递ClientSession 到作。 |
4 | 如果作异常完成,我们需要停止事务并保留错误。 |
5 | 或者当然,如果成功,请提交更改。 仍然保留作结果。 |
6 | 最后,我们需要确保关闭会话。 |
上述作的罪魁祸首是保持主流DeleteResult
而不是通过commitTransaction()
或abortTransaction()
,这会导致设置相当复杂。
除非指定ReactiveMongoTransactionManager 在您的应用程序上下文中,事务支持被禁用。
您可以使用setSessionSynchronization(ALWAYS) 参与正在进行的非原生 MongoDB 事务。 |
使用 TransactionTemplate / TransactionalOperator 的事务
Spring Data MongoDB 事务同时支持TransactionTemplate
和TransactionalOperator
.
TransactionTemplate
/ TransactionalOperator
-
Imperative
-
Reactive
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); (2)
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { (3)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | 在 Template API 配置期间启用事务同步。 |
2 | 创建TransactionTemplate 使用提供的PlatformTransactionManager . |
3 | 在回调中,ClientSession 和 transaction 都已注册。 |
更改 的状态MongoTemplate 在运行时(您可能认为在前面的清单的第 1 项中是可能的)可能会导致线程和可见性问题。 |
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); (2)
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) (3)
.then();
1 | 为 Transactional participation 启用事务同步。 |
2 | 创建TransactionalOperator 使用提供的ReactiveTransactionManager . |
3 | TransactionalOperator.transactional(…) 为所有上游作提供事务管理。 |
使用MongoTransactionManager和ReactiveMongoTransactionManager进行交易
MongoTransactionManager
/ ReactiveMongoTransactionManager
是众所周知的 Spring 事务支持的网关。
它允许应用程序使用 Spring 的 managed transaction 功能。
这MongoTransactionManager
将ClientSession
添加到线程中,而ReactiveMongoTransactionManager
正在使用ReactorContext
为了这个。MongoTemplate
检测会话并相应地对与事务关联的这些资源进行作。MongoTemplate
还可以参与其他正在进行的交易。
以下示例显示了如何使用MongoTransactionManager
:
MongoTransactionManager
/ ReactiveMongoTransactionManager
-
Imperative
-
Reactive
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { (1)
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { (2)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | 注册MongoTransactionManager 在应用程序上下文中。 |
2 | 将方法标记为事务性方法。 |
@Transactional(readOnly = true) 建议MongoTransactionManager 要同时启动一个事务,该事务会将ClientSession 到传出请求。 |
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { (1)
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { (2)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | 注册ReactiveMongoTransactionManager 在应用程序上下文中。 |
2 | 将方法标记为事务性方法。 |
@Transactional(readOnly = true) 建议ReactiveMongoTransactionManager 要同时启动一个事务,该事务会将ClientSession 到传出请求。 |
控制特定于 MongoDB 的事务选项
事务性服务方法可以要求特定的事务选项来运行事务。
Spring Data MongoDB 的事务管理器支持对事务标签进行评估,例如@Transactional(label = { "mongo:readConcern=available" })
.
默认情况下,使用mongo:
prefix 的计算方式为MongoTransactionOptionsResolver
这是默认配置的。
交易标签由TransactionAttribute
并可用于编程事务控制TransactionTemplate
和TransactionalOperator
.
由于它们的声明性质,@Transactional(label = …)
提供了一个很好的起点,也可以用作文档。
目前支持以下选项:
- 最大提交时间
-
控制 commitTransaction作在服务器上的最大执行时间。 该值的格式与 ISO-8601 持续时间格式对应,与
Duration.parse(…)
.用法:
mongo:maxCommitTime=PT1S
- 阅读关注
-
设置事务的读取关注点。
用法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- 读取首选项
-
设置事务的读取首选项。
用法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- 写入关注
-
设置事务的写关注点。
用法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
加入外部事务的嵌套事务不会影响初始事务选项,因为事务已启动。 仅当启动新事务时,才会应用事务选项。 |
事务中的特殊行为
在事务内部,MongoDB 服务器的行为略有不同。
连接设置
MongoDB 驱动程序提供了一个专用的副本集名称配置选项,用于将驱动程序引入自动检测模式。 此选项有助于在事务期间识别主副本集节点和命令路由。
请务必添加replicaSet 添加到 MongoDB URI。
有关更多详细信息,请参阅连接字符串选项。 |
集合作
MongoDB 不支持在事务中执行集合作,例如创建集合。 这也会影响首次使用时发生的动态集合创建。 因此,请确保所有必需的结构都已到位。
暂时性错误
MongoDB 可以为事务作期间引发的错误添加特殊标签。
这些可能表示暂时性故障,仅重试作即可消失。
我们强烈推荐 Spring Retry 来实现这些目的。
尽管如此,人们可以覆盖MongoTransactionManager#doCommit(MongoTransactionObject)
实施 Retry Commit Operation 行为,如 MongoDB 参考手册中所述。
计数
MongoDB 数据库count
根据可能无法反映交易中实际情况的集合统计信息进行作。
服务器在发出count
命令。
一次MongoTemplate
检测到一个活动事务,所有事务都已暴露count()
方法使用$match
和$count
运算符, 保留Query
设置,例如collation
.
在 aggregation count 帮助程序中使用 geo 命令时,限制适用。 以下运算符不能使用,必须替换为其他运算符:
-
$where
→$expr
-
$near
→$geoWithin
跟$center
-
$nearSphere
→$geoWithin
跟$centerSphere
使用Criteria.near(…)
和Criteria.nearSphere(…)
必须重写为Criteria.within(…)
各自Criteria.withinSphere(…)
.
这同样适用于near
存储库查询方法中的 query 关键字,必须更改为within
.
另请参阅 MongoDB JIRA 票证 DRIVERS-518 以进一步参考。
以下代码片段显示了count
session-bound 闭包内的用法:
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上面的代码段在以下命令中具体化:
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而不是:
db.collection.find( { state: "active" } ).count()