交易
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库添加了对事务的支持。 Spring for Apache Kafka 通过以下方式添加支持:
-
KafkaTransactionManager
:与常规 Spring 事务支持 (@Transactional
,TransactionTemplate
等) -
事务
KafkaMessageListenerContainer
-
本地事务
KafkaTemplate
-
与其他事务管理器的事务同步
通过提供DefaultKafkaProducerFactory
替换为transactionIdPrefix
.
在这种情况下,而不是管理单个共享的Producer
,工厂维护事务性生产者的缓存。
当用户调用close()
在 producer 上,它被返回到缓存以供重用,而不是实际关闭。
这transactional.id
property 是transactionIdPrefix
+ n
哪里n
开头为0
,并且对于每个新生产者都会递增。
在早期版本的 Spring for Apache Kafka 中,transactional.id
对于由具有基于记录的侦听器的侦听器容器启动的事务,以不同方式生成,以支持隔离僵尸,这不再需要,使用EOSMode.V2
是从 3.0 开始的唯一选项。
对于使用多个实例运行的应用程序,transactionIdPrefix
每个实例必须唯一。
另请参阅 Exactly Once 语义。
另请参阅transactionIdPrefix
.
使用 Spring Boot,只需要将spring.kafka.producer.transaction-id-prefix
属性 - Spring Boot 将自动配置KafkaTransactionManager
bean 并将其连接到侦听器容器中。
从版本 2.5.8 开始,您现在可以配置maxAge 属性。
当使用可能为代理的transactional.id.expiration.ms .
使用电流kafka-clients ,这可能会导致ProducerFencedException 没有再平衡。
通过设置maxAge 更改为小于transactional.id.expiration.ms ,如果生产者超过其最大年龄,工厂将刷新生产者。 |
用KafkaTransactionManager
这KafkaTransactionManager
是 Spring Framework 的PlatformTransactionManager
.
在其构造函数中提供了对 producer 工厂的引用。
如果您提供自定义生产者工厂,则它必须支持事务。
看ProducerFactory.transactionCapable()
.
您可以使用KafkaTransactionManager
使用常规的 Spring 事务支持 (@Transactional
,TransactionTemplate
等)。
如果事务处于活动状态,则任何KafkaTemplate
在事务范围内执行的作使用事务的Producer
.
管理器提交或回滚事务,具体取决于成功或失败。
您必须配置KafkaTemplate
以使用相同的ProducerFactory
作为事务管理器。
事务同步
本节引用仅限生产者事务(不是由侦听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅Using Consumer-Initiated Transaction。
如果要将记录发送到 kafka 并执行一些数据库更新,则可以使用常规的 Spring 事务管理,例如,使用DataSourceTransactionManager
.
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
用于@Transactional
annotation 启动事务,并且KafkaTemplate
将事务与该事务管理器同步;每次发送都将参与该交易。
当方法退出时,数据库事务将提交,然后是 Kafka 事务。
如果你希望以相反的顺序执行提交(Kafka 优先),请使用 nested@Transactional
方法,并将外部方法配置为使用DataSourceTransactionManager
,而 inner 方法配置为使用KafkaTransactionManager
.
有关在 Kafka 优先或 DB 优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参阅与其他事务管理器的 Kafka 事务示例。
从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果同步事务上的提交失败(在主事务提交之后),将向调用方抛出异常。 以前,这会被静默忽略(记录在 debug 级别)。 如有必要,应用程序应采取补救措施来补偿已提交的主事务。 |
使用使用者发起的事务
这ChainedKafkaTransactionManager
从 2.7 版本开始,现已弃用;请参阅 JavaDocs 的 super classChainedTransactionManager
了解更多信息。
相反,请使用KafkaTransactionManager
以启动 Kafka 事务,并使用@Transactional
以启动另一个事务。
有关链接 JDBC 和 Kafka 事务的示例应用程序,请参阅使用其他事务管理器的 Kafka 事务示例。
KafkaTemplate
本地事务
您可以使用KafkaTemplate
在本地事务中执行一系列作。
以下示例显示了如何执行此作:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身 (this
).
如果回调正常退出,则提交事务。
如果引发异常,则回滚事务。
如果存在KafkaTransactionManager (或 synchronized) 事务,则不使用它。
相反,使用新的 “嵌套” 事务。 |
TransactionIdPrefix
跟EOSMode.V2
(又名BETA
),则唯一支持的模式,则不再需要使用相同的transactional.id
,即使对于消费者发起的交易;事实上,它在每个实例上都必须是唯一的,这与生产者发起的事务相同。
此属性在每个应用程序实例上必须具有不同的值。
TransactionIdSuffix Fixed
从 3.2 开始,新的TransactionIdSuffixStrategy
引入了 interface 来管理transactional.id
后缀。
默认实现为DefaultTransactionIdSuffixStrategy
当设置maxCache
大于零可以重复使用transactional.id
在特定范围内,否则将通过递增 counter 动态生成后缀。
当请求事务 producer 并且transactional.id
全部使用,抛出一个NoProducerAvailableException
.
然后,用户可以使用RetryTemplate
配置为重试该异常,并适当配置回退。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
设置maxCache
到 5,transactional.id
是my.txid.
+'{0-4}'。
使用KafkaTransactionManager 使用ConcurrentMessageListenerContainer 并启用maxCache ,需要设置maxCache 的值大于或等于concurrency .
如果MessageListenerContainer 无法获取transactional.id 后缀,它将抛出一个NoProducerAvailableException .
在ConcurrentMessageListenerContainer ,则必须调整 maxCache 设置以处理增加的嵌套事务数。 |
KafkaTemplate
事务性和非事务性发布
通常,当KafkaTemplate
是事务性的(配置了支持事务的 producer 工厂),事务是必需的。
该交易可以通过TransactionTemplate
一个@Transactional
方法, 调用executeInTransaction
或由侦听器容器配置时,如果配置了KafkaTransactionManager
.
任何在事务范围之外使用模板的尝试都会导致模板抛出IllegalStateException
.
从版本 2.4.3 开始,你可以将模板的allowNonTransactional
property 设置为true
.
在这种情况下,模板将允许作在没有事务的情况下运行,方法是调用ProducerFactory
的createNonTransactionalProducer()
方法;Producer 将像往常一样被缓存或线程绑定,以便重用。
看用DefaultKafkaProducerFactory
.
使用 Batch 侦听器的事务
当侦听器在使用事务时失败时,AfterRollbackProcessor
调用以在回滚发生后执行一些作。
当使用默认的AfterRollbackProcessor
使用 Record 侦听器时,将执行 seek 以重新传递失败的记录。
但是,使用批处理侦听器时,将重新交付整个批处理,因为框架不知道批处理中的哪条记录失败。
有关更多信息,请参阅 After-rollback Processor 。
使用批处理侦听器时,版本 2.4.2 引入了一种替代机制来处理处理批处理时的故障:BatchToRecordAdapter
.
当具有batchListener
设置为 true 时,将BatchToRecordAdapter
,则一次使用一条记录调用侦听器。
这将启用批处理中的错误处理,同时仍可以停止处理整个批处理,具体取决于异常类型。
默认的BatchToRecordAdapter
,可以使用标准的ConsumerRecordRecoverer
例如DeadLetterPublishingRecoverer
.
以下测试用例配置代码段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}