此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Data MongoDB 4.4.0! |
更改流
从 MongoDB 3.6 开始,Change Streams 允许应用程序获得有关更改的通知,而无需跟踪 oplog。
Change Stream 支持仅适用于副本集或分片集群。 |
Change Streams 可以与命令式和反应式 MongoDB Java 驱动程序一起使用。强烈建议使用 reactive 变体,因为它的资源密集度较低。但是,如果您无法使用反应式 API,您仍然可以通过使用 Spring 生态系统中已经流行的消息传递概念来获取更改事件。
可以在集合和数据库级别上同时进行监视,而数据库级别的变体发布
数据库中所有集合的更改。订阅数据库更改流时,请确保使用
适合事件类型的类型,因为 conversion 可能无法正确应用于不同的实体类型。
如有疑问,请使用Document
.
更改 Streams withMessageListener
使用 Sync Driver 侦听 Change Stream 会创建一个长时间运行的阻塞任务,该任务需要委派给单独的组件。
在这种情况下,我们需要首先创建一个MessageListenerContainer
这将是运行特定SubscriptionRequest
任务。
Spring Data MongoDB 已经附带了一个默认实现,该实现在MongoTemplate
并且能够创建和运行Task
实例的ChangeStreamRequest
.
以下示例显示了如何将 Change Streams 与MessageListener
实例:
MessageListener
实例MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println; (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)
Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class); (4)
// ...
container.stop(); (5)
1 | 启动容器会初始化资源并启动Task 已注册的实例SubscriptionRequest 实例。启动后添加的请求将立即运行。 |
2 | 定义在Message 已收到。这Message#getBody() 将转换为请求的域类型。用Document 接收 Raw 结果而不进行转换。 |
3 | 设置要侦听的集合,并通过ChangeStreamOptions . |
4 | 注册请求。返回的Subscription 可用于检查当前Task state 并取消它以释放资源。 |
5 | 一旦您确定不再需要容器,请不要忘记停止容器。这样做会停止所有运行Task 实例。 |
处理过程中的错误会传递到 |
反应式变更流
使用反应式 API 订阅 Change Streams 是一种更自然的流处理方法。尽管如此,基本的构建块(例如ChangeStreamOptions
保持不变。以下示例演示如何使用 Change Streams 发出ChangeStreamEvent
s:
ChangeStreamEvent
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
.watchCollection("people")
.filter(where("age").gte(38)) (2)
.listen(); (3)
1 | 基础文档应转换为的事件目标类型。省略此项以接收不进行转换的原始结果。 |
2 | 使用聚合管道或仅使用查询Criteria 以筛选事件。 |
3 | 获取Flux 的 change stream 事件。这ChangeStreamEvent#getBody() 将从 (2) 转换为请求的域类型。 |
恢复更改流
可以恢复 Change Streams 并从您离开的位置继续发出事件。要恢复流,您需要提供 resume
token 或最后已知的服务器时间 (UTC) 。用ChangeStreamOptions
以相应地设置该值。
以下示例显示如何使用服务器时间设置恢复偏移量:
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
.watchCollection("people")
.resumeAt(Instant.now().minusSeconds(1)) (1)
.listen();
1 | 您可以获取ChangeStreamEvent 通过getTimestamp 方法或使用resumeToken 暴露getResumeToken . |
在某些情况下,Instant 在恢复 Change Stream 时,可能不是足够精确的度量。为此,请使用 MongoDB 原生 BsonTimestamp。 |