本节介绍 Spring for Apache Pulsar 如何支持事务。

概述

Spring for Apache Pulsar 事务支持建立在 Spring Framework 提供的事务支持之上。 在高级层次上,事务资源向事务管理器注册,而事务管理器又处理已注册资源的事务状态(提交、回滚等)。

Spring for Apache Pulsar 提供以下功能:

  • PulsarTransactionManager- 与普通的 Spring 事务支持(、等)一起使用@TransactionalTransactionTemplate

  • 事务PulsarTemplate

  • 事务@PulsarListener

  • 与其他事务管理器的事务同步

事务支持尚未添加到反应式组件中

默认情况下,事务支持处于禁用状态。 若要在使用 Spring Boot 时启用支持,只需设置该属性即可。 下面的每个组件部分概述了其他配置选项。spring.pulsar.transaction.enabled

事务支持尚未添加到反应式组件中

事务性发布PulsarTemplate

事务上的所有发送操作都会查找活动事务,并在事务中登记每个发送操作(如果找到)。PulsarTemplate

非交易性使用

默认情况下,事务性操作也可用于非事务性操作。 当找不到现有事务时,它将以非事务方式继续发送操作。 但是,如果模板配置为需要事务,则任何在事务范围之外使用模板的尝试都会导致异常。PulsarTemplate

事务可以由 、方法、调用或事务侦听器容器启动。TransactionTemplate@TransactionalexecuteInTransaction

本地交易

我们使用术语“本地”事务来表示不受 Spring 事务管理工具管理或与 Spring 事务管理工具(即 )关联的 Pulsar 原生事务。 相反,“同步”事务是由 管理或与 关联的事务。PulsarTransactionManagerPulsarTransactionManager

您可以使用 在本地事务中执行一系列操作。 以下示例演示如何执行此操作:PulsarTemplate

var results = pulsarTemplate.executeInTransaction((template) -> {
    var rv = new HashMap<String, MessageId>();
    rv.put("msg1", template.send(topic, "msg1"));
    rv.put("msg2", template.send(topic, "msg2"));
    return rv;
});

回调中的参数是调用该方法的模板实例。 模板上的所有操作都将登记在当前事务中。 如果回调正常退出,则交易已提交。 如果引发异常,则事务将回滚。executeInTransaction

如果正在处理同步事务,则忽略该事务,并使用新的“嵌套”事务。

配置

以下交易设置可直接在(通过字段)上获得:PulsarTemplatetransactions

  • enabled- 模板是否支持交易(默认false)

  • required- 模板是否需要交易(默认false)

  • timeout- 事务超时的持续时间(默认null)

不使用 Spring Boot 时,可以在提供的模板上调整这些设置。 但是,在使用 Spring Boot 时,模板是自动配置的,并且没有影响属性的机制。 在这种情况下,您可以注册一个可用于调整设置的 bean。 以下示例演示如何在自动配置的模板上设置超时:PulsarTemplateCustomizer

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
    return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
事务可以由 、方法、调用或事务侦听器容器启动。TransactionTemplate@TransactionalexecuteInTransaction
如果正在处理同步事务,则忽略该事务,并使用新的“嵌套”事务。

事务接收@PulsarListener

启用侦听器事务后,将在同步事务的作用域中调用带批注的侦听器方法。@PulsarListener

在方法调用之前,使用配置了 a 的 Spring 来启动事务。DefaultPulsarMessageListenerContainerTransactionTemplatePulsarTransactionManager

每封已接收消息的确认都登记在作用域内的事务中。

消费-流程-生产方案

一种常见的事务模式是,消费者从 Pulsar 主题读取消息,转换消息,最后生产者将生成的消息写入另一个 Pulsar 主题。 当启用事务并且侦听器方法使用事务生成转换后的消息时,框架支持此用例。PulsarTemplate

给定以下侦听器方法:

@PulsarListener(topics = "my-input-topic") (1)
void listen(String msg) { (2)
    var transformedMsg = msg.toUpperCase(); (3)
    this.transactionalTemplate.send("my-output-topic", transformedMsg); (4)
} (5) (6)

启用侦听器事务时,将发生以下交互:

1 侦听器容器启动新事务,并在事务范围内调用侦听器方法
2 侦听器方法接收消息
3 侦听器方法转换消息
4 侦听器方法使用事务模板发送转换后的消息,该模板在活动事务中登记发送操作
5 侦听器容器自动确认消息,并在活动事务中登记确认操作
6 侦听器容器(通过 )提交事务TransactionTemplate

如果不使用侦听器容器,而是直接使用侦听器容器,则提供与上述相同的事务支持。 请记住,将 Java 方法注册为侦听器容器消息侦听器只是为了方便起见。@PulsarListener@PulsarListener

与记录侦听器的事务

上面的示例使用记录侦听器。 使用记录侦听器时,会在每次侦听器方法调用时创建一个新事务,这相当于每条消息一个事务。

由于事务边界是每条消息,并且每个消息确认都在每个事务中登记,因此批处理确认模式不能用于事务记录侦听器。

与批处理侦听器的事务

使用批处理侦听器时,会在每次侦听器方法调用时创建一个新事务,这相当于每批消息一个事务。

事务批处理侦听器当前不支持自定义错误处理程序。

配置

侦听器容器工厂

以下事务设置可直接在创建侦听器容器时使用。 这些设置会影响所有侦听器容器,包括 使用的侦听器容器。PulsarContainerPropertiesConcurrentPulsarListenerContainerFactory@PulsarListener

  • enabled- 容器是否支持事务(默认false)

  • required- 容器是否需要事务(默认false)

  • timeout- 事务超时的持续时间(默认null)

  • transactionDefinition- 具有属性的蓝图事务定义,这些属性将被复制到容器的事务模板(默认null)

  • transactionManager- 用于启动事务的事务管理器

不使用 Spring Boot 时,可以在提供的容器工厂上调整这些设置。 但是,在使用 Spring Boot 时,容器工厂是自动配置的。 在这种情况下,您可以注册一个 Bean 来访问和定制容器属性。 以下示例演示如何在容器工厂上设置超时:ConcurrentPulsarListenerContainerFactoryCustomizer

@Bean
ConcurrentPulsarListenerContainerFactoryCustomizer<?> containerCustomizer() {
    return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}

@PulsarListener

默认情况下,每个侦听器都遵循其相应侦听器容器工厂的事务设置。 但是,用户可以设置每个属性以覆盖容器工厂设置,如下所示:transactional@PulsarListener

  • 如果容器工厂启用了事务,则将禁用单个侦听器的事务。transactional = false

  • 如果容器工厂启用了事务并且是必需的,则尝试设置将导致引发异常,指出需要事务。transactional = false

  • 如果容器工厂禁用了事务,则将忽略设置尝试,并记录警告。transactional = true

1 侦听器容器启动新事务,并在事务范围内调用侦听器方法
2 侦听器方法接收消息
3 侦听器方法转换消息
4 侦听器方法使用事务模板发送转换后的消息,该模板在活动事务中登记发送操作
5 侦听器容器自动确认消息,并在活动事务中登记确认操作
6 侦听器容器(通过 )提交事务TransactionTemplate
由于事务边界是每条消息,并且每个消息确认都在每个事务中登记,因此批处理确认模式不能用于事务记录侦听器。
事务批处理侦听器当前不支持自定义错误处理程序。

PulsarTransactionManager

是 Spring Framework 的 . 您可以将 与普通的 Spring 事务支持(、 等)一起使用。PulsarTransactionManagerPlatformTransactionManagerPulsarTransactionManager@TransactionalTransactionTemplate

如果事务处于活动状态,则在事务范围内执行的任何操作都会登记并参与正在进行的事务。 经理提交或回滚事务,具体取决于成功或失败。PulsarTemplate

您可能不需要直接使用,因为大多数事务用例都包含在 和 中。PulsarTransactionManagerPulsarTemplate@PulsarListener
您可能不需要直接使用,因为大多数事务用例都包含在 和 中。PulsarTransactionManagerPulsarTemplate@PulsarListener

Pulsar 与其他事务管理器的交易

仅限生产者事务

如果你想向 Pulsar 发送记录并在单个事务中执行一些数据库更新,你可以使用普通的 Spring 事务管理和 .DataSourceTransactionManager

以下示例假定存在一个名为“dataSourceTransactionManager”的注册 BeanDataSourceTransactionManager
@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.pulsarTemplate.send("my-topic", msg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}

注解的拦截器启动数据库事务,并将事务与数据库事务管理器同步;每次发送都将参与该交易。 当该方法退出时,数据库事务将提交,然后是 Pulsar 事务。@TransactionalPulsarTemplate

如果您希望先提交 Pulsar 事务,并且仅在 Pulsar 事务成功时才提交 DB 事务,请使用嵌套方法,将外部方法配置为使用 ,将内部方法配置为使用 。@TransactionalDataSourceTransactionManagerPulsarTransactionManager

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
    var msg = calculateMessage();
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
    this.sendToPulsar(msg);
}

@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
    this.pulsarTemplate.send("my-topic", msg);
}

消费者 + 生产者事务

如果你想使用来自 Pulsar 的记录,将记录发送到 Pulsar,并在事务中执行一些数据库更新,你可以将普通的 Spring 事务管理(使用 )与容器发起的事务结合起来。DataSourceTransactionManager

在下面的例子中,监听器容器启动 Pulsar 事务,注解启动 DB 事务。 首先提交数据库事务;如果 Pulsar 事务提交失败,则将重新传送记录,因此数据库更新应该是幂等的。@Transactional

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
    var transformedMsg = msg.toUpperCase();
    this.pulsarTemplate.send("my-output-topic", transformedMsg);
    this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}
以下示例假定存在一个名为“dataSourceTransactionManager”的注册 BeanDataSourceTransactionManager