此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4spring-doc.cadn.net.cn

事务性活页夹

通过设置spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix设置为非空值,例如tx-. 在处理器应用程序中使用时,使用者启动事务;在 Consumer 线程上发送的任何记录都参与同一个事务。 当监听器正常退出时,监听器容器会将偏移量发送给事务并提交。 通用的 producer 工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*性能;将忽略单个绑定 Kafka 生产者属性。spring-doc.cadn.net.cn

事务不支持正常的 Binder 重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将被回滚。 启用重试后(通用属性maxAttempts大于零),retry 属性用于配置DefaultAfterRollbackProcessor在容器级别启用重试。 同样,此功能不是在事务中发布死信记录,而是再次通过DefaultAfterRollbackProcessor它在主事务回滚后运行。

如果您希望在源应用程序中使用事务,或者从某个任意线程中为仅限生产者事务(例如@Scheduled方法),您必须获取对事务性生产者工厂的引用,并定义一个KafkaTransactionManagerbean 使用它。spring-doc.cadn.net.cn

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用BinderFactory;用null当只配置了一个 Binder 时。 如果配置了多个 Binder,请使用 Binder 名称获取引用。 一旦我们有了对 binder 的引用,我们就可以获得对ProducerFactory并创建一个事务管理器。spring-doc.cadn.net.cn

然后你会使用普通的 Spring 事务支持,例如TransactionTemplate@Transactional例如:spring-doc.cadn.net.cn

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者事务与来自其他事务管理器的事务同步,请使用ChainedTransactionManager.spring-doc.cadn.net.cn

如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix.