此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.1! |
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.3.1! |
Apache Pulsar 通过为 Apache Pulsar 项目提供 Spring 的自动配置来支持。
Spring Boot 将在类路径上自动配置和注册经典(命令式)Spring for Apache Pulsar 组件。
当位于类路径上时,它将对反应式组件执行相同的操作。org.springframework.pulsar:spring-pulsar
org.springframework.pulsar:spring-pulsar-reactive
有 和 “启动器”,分别用于方便地收集命令式和被动使用的依赖项。spring-boot-starter-pulsar
spring-boot-starter-pulsar-reactive
连接到 Pulsar
当您使用 Pulsar 启动器时,Spring Boot 将自动配置并注册一个 bean。PulsarClient
默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。
这可以通过将属性设置为不同的值来调整。pulsar://localhost:6650
spring.pulsar.client.service-url
该值必须是有效的 Pulsar 协议 URL |
您可以通过指定任何带前缀的应用程序属性来配置客户端。spring.pulsar.client.*
如果您需要对配置进行更多控制,请考虑注册一个或多个 Bean。PulsarClientBuilderCustomizer
认证
要连接到需要身份验证的 Pulsar 集群,您需要通过设置插件所需的 和 任何参数来指定要使用的身份验证插件。
您可以将参数设置为参数名称与参数值的映射。
以下示例演示如何配置插件。pluginClassName
AuthenticationOAuth2
-
Properties
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
您需要确保 下定义的名称与您的身份验证插件(通常是驼峰大小写)所期望的名称完全匹配。
Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。 例如,如果要配置身份验证插件的颁发者 URL,则必须使用 .
如果您使用其他形式,例如 或 ,则该设置将不会应用于插件。 这种缺乏宽松的绑定也使得使用环境变量作为身份验证参数成为问题,因为在转换过程中会丢失区分大小写。 如果对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤操作,才能使其正常工作。 |
该值必须是有效的 Pulsar 协议 URL |
您需要确保 下定义的名称与您的身份验证插件(通常是驼峰大小写)所期望的名称完全匹配。
Spring Boot 不会尝试对这些条目进行任何形式的宽松绑定。 例如,如果要配置身份验证插件的颁发者 URL,则必须使用 .
如果您使用其他形式,例如 或 ,则该设置将不会应用于插件。 这种缺乏宽松的绑定也使得使用环境变量作为身份验证参数成为问题,因为在转换过程中会丢失区分大小写。 如果对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤操作,才能使其正常工作。 |
被动地连接到 Pulsar
当反应式自动配置被激活时,Spring Boot 将自动配置并注册一个 bean。ReactivePulsarClient
将调整前面描述的 .
因此,请按照上一节配置 .ReactivePulsarClient
PulsarClient
PulsarClient
ReactivePulsarClient
连接到 Pulsar 管理
Apache Pulsar 客户端的 Spring 也是自动配置的。PulsarAdministration
默认情况下,应用程序会尝试连接到位于 的本地 Pulsar 实例。
这可以通过在 形式 中将属性设置为不同的值来调整。http://localhost:8080
spring.pulsar.admin.service-url
(http|https)://<host>:<port>
如果您需要对配置进行更多控制,请考虑注册一个或多个 Bean。PulsarAdminBuilderCustomizer
认证
当访问需要认证的 Pulsar 集群时,管理客户端需要与常规 Pulsar 客户端相同的安全配置。
您可以通过将 替换为 来使用上述身份验证配置。spring.pulsar.client.authentication
spring.pulsar.admin.authentication
要在启动时创建主题,请添加类型为 的 Bean 。
如果该主题已存在,则忽略该 bean。PulsarTopic |
要在启动时创建主题,请添加类型为 的 Bean 。
如果该主题已存在,则忽略该 bean。PulsarTopic |
发送消息
Spring's 是自动配置的,您可以使用它来发送消息,如以下示例所示:PulsarTemplate
-
Java
-
Kotlin
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() throws PulsarClientException {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
它依赖于 a 来创建底层的 Pulsar 生产者。
Spring Boot 自动配置还提供了此生产者工厂,默认情况下,它缓存它创建的生产者。
您可以通过指定任何带有前缀的应用程序属性来配置生产者工厂和缓存设置。PulsarTemplate
PulsarProducerFactory
spring.pulsar.producer.*
spring.pulsar.producer.cache.*
如果您需要对生产者工厂配置进行更多控制,请考虑注册一个或多个 Bean。
这些定制器将应用于所有创建的生产者。
您还可以在发送消息时传入 仅影响当前生产者。ProducerBuilderCustomizer
ProducerBuilderCustomizer
如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入。TypedMessageBuilderCustomizer
被动发送消息
当 Reactive 自动配置被激活时,Spring 的会自动配置,你可以用它来发送消息,如以下示例所示:ReactivePulsarTemplate
-
Java
-
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
它依赖于 a 来实际创建基础发送方。
Spring Boot 自动配置还提供了此发送方工厂,默认情况下,该工厂会缓存它创建的生产者。
您可以通过指定任何带前缀的应用程序属性来配置发件人工厂和缓存设置。ReactivePulsarTemplate
ReactivePulsarSenderFactory
spring.pulsar.producer.*
spring.pulsar.producer.cache.*
如果您需要对发送方工厂配置进行更多控制,请考虑注册一个或多个 Bean。
这些定制器将应用于所有创建的发件人。
您还可以在发送消息时传入 仅影响当前发件人。ReactiveMessageSenderBuilderCustomizer
ReactiveMessageSenderBuilderCustomizer
如果您需要对正在发送的消息进行更多控制,则可以在发送消息时传入。MessageSpecBuilderCustomizer
接收消息
当 Apache Pulsar 基础设施存在时,任何 Bean 都可以被注释以创建侦听器端点。
以下组件在主题上创建侦听器终结点:@PulsarListener
someTopic
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供了 所需的所有组件,例如它用来构建底层 Pulsar 消费者的 和 消费者工厂。
您可以通过指定任何带有前缀的应用程序属性来配置这些组件。PulsarListener
PulsarListenerContainerFactory
spring.pulsar.listener.*
spring.pulsar.consumer.*
如果您需要对使用者工厂配置进行更多控制,请考虑注册一个或多个 Bean。
这些定制器适用于工厂创建的所有使用者,因此也适用于所有实例。
您还可以通过设置注释的属性来自定义单个侦听器。ConsumerBuilderCustomizer
@PulsarListener
consumerCustomizer
@PulsarListener
被动接收消息
当 Apache Pulsar 基础设施存在并激活了反应式自动配置时,可以对任何 Bean 进行注释以创建响应式侦听器端点。
以下组件在主题上创建反应式侦听器终结点:@ReactivePulsarListener
someTopic
-
Java
-
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自动配置提供了 所需的所有组件,例如它用来构建底层反应式 Pulsar 消费者的 和 消费者工厂。
您可以通过指定任何 和 spring.pulsar.consumer.
前缀的应用程序属性来配置这些组件。ReactivePulsarListener
ReactivePulsarListenerContainerFactory
spring.pulsar.listener.
如果您需要对使用者工厂配置进行更多控制,请考虑注册一个或多个 Bean。
这些定制器适用于工厂创建的所有使用者,因此也适用于所有实例。
您还可以通过设置注释的属性来自定义单个侦听器。ReactiveMessageConsumerBuilderCustomizer
@ReactivePulsarListener
consumerCustomizer
@ReactivePulsarListener
阅读消息
Pulsar 读取器界面使应用程序能够手动管理光标。 使用阅读器连接到主题时,需要指定阅读器在连接到主题时从哪条消息开始阅读。
当 Apache Pulsar 基础设施存在时,任何 Bean 都可以使用读取器进行注释以使用消息。
以下组件创建一个读取器终结点,该终结点从主题的开头开始读取消息:@PulsarReader
someTopic
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
它依赖于 a 来创建底层的 Pulsar 读取器。
Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何前缀应用程序属性来自定义。@PulsarReader
PulsarReaderFactory
spring.pulsar.reader.*
如果您需要对读卡器工厂配置进行更多控制,请考虑注册一个或多个 Bean。
这些定制器适用于工厂创建的所有读取器,因此也适用于所有实例。
您还可以通过设置注释的属性来自定义单个侦听器。ReaderBuilderCustomizer
@PulsarReader
readerCustomizer
@PulsarReader
被动阅读消息
当 Apache Pulsar 基础设施存在并且激活了反应式自动配置时,将提供 Spring 的,您可以使用它来创建一个阅读器,以便以响应式方式读取消息。
以下组件使用提供的工厂创建读取器,并从主题中读取 5 分钟前的一条消息:ReactivePulsarReaderFactory
someTopic
-
Java
-
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自动配置提供了此读取器工厂,可以通过设置任何前缀应用程序属性来自定义。spring.pulsar.reader.*
如果需要对读取器出厂配置进行更多控制,请考虑在使用出厂创建读取器时传入一个或多个实例。ReactiveMessageReaderBuilderCustomizer
如果您需要对读卡器工厂配置进行更多控制,请考虑注册一个或多个 Bean。
这些定制器将应用于所有创建的读取器。
您还可以在创建读取器时传递一个或多个,以仅将自定义项应用于创建的读取器。ReactiveMessageReaderBuilderCustomizer
ReactiveMessageReaderBuilderCustomizer
有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。 |
有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。 |
交易支持
Spring for Apache Pulsar 在使用 和 时支持事务。PulsarTemplate
@PulsarListener
使用反应式变体时,当前不支持事务。 |
将属性设置为:spring.pulsar.transaction.enabled
true
-
配置 Bean
PulsarTransactionManager
-
启用事务支持
PulsarTemplate
-
启用对方法的事务支持
@PulsarListener
的属性可用于微调何时应将事务用于侦听器。transactional
@PulsarListener
为了更好地控制 Spring for Apache Pulsar 事务功能,您应该定义自己的和/或 bean。
如果默认的自动配置不合适,也可以定义 Bean。PulsarTemplate
ConcurrentPulsarListenerContainerFactory
PulsarAwareTransactionManager
PulsarTransactionManager
使用反应式变体时,当前不支持事务。 |
Pulsar 的其他属性
自动配置支持的属性显示在附录的“集成属性”部分。 请注意,在大多数情况下,这些属性(连字符或 camelCase)直接映射到 Apache Pulsar 配置属性。 有关详细信息,请参阅 Apache Pulsar 文档。
只有 Pulsar 支持的属性子集可以直接通过该类获得。
如果希望使用不直接支持的其他属性来调整自动配置的组件,则可以使用上述每个组件支持的定制器。PulsarProperties