此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Boot 3.4.3spring-doc.cadn.net.cn

Apache Pulsar 支持

通过提供 Spring for Apache Pulsar 项目的自动配置来支持 Apache Pulsarspring-doc.cadn.net.cn

Spring Boot 将在org.springframework.pulsar:spring-pulsar位于 Classpath 上。 当org.springframework.pulsar:spring-pulsar-reactive位于 Classpath 上。spring-doc.cadn.net.cn

spring-boot-starter-pulsarspring-boot-starter-pulsar-reactivestarters 分别方便地收集用于命令式和反应式使用的依赖项。spring-doc.cadn.net.cn

连接到 Pulsar

当您使用 Pulsar Starters时,Spring Boot 将自动配置并注册一个PulsarClient豆。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到位于pulsar://localhost:6650. 这可以通过设置spring.pulsar.client.service-url属性设置为不同的值。spring-doc.cadn.net.cn

该值必须是有效的 Pulsar 协议 URL

您可以通过指定任何spring.pulsar.client.*前缀 application 属性。spring-doc.cadn.net.cn

如果您需要对配置进行更多控制,请考虑注册一个或多个PulsarClientBuilderCustomizer豆。spring-doc.cadn.net.cn

认证

要连接到需要身份验证的 Pulsar 集群,您需要通过设置pluginClassName以及插件所需的任何参数。 您可以将参数设置为参数名称到参数值的映射。 以下示例显示如何配置AuthenticationOAuth2插件。spring-doc.cadn.net.cn

spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
  pulsar:
    client:
      authentication:
        plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
        param:
          issuerUrl: https://auth.server.cloud/
          privateKey: file:///Users/some-key.json
          audience: urn:sn:acme:dev:my-instance

您需要确保在spring.pulsar.client.authentication.param.*与您的身份验证插件(通常是驼峰式大小写)所期望的完全匹配。 Spring Boot 不会尝试对这些条目进行任何类型的松散绑定。spring-doc.cadn.net.cn

例如,如果要为AuthenticationOAuth2auth 插件,您必须使用spring.pulsar.client.authentication.param.issuerUrl. 如果您使用其他表单,例如issuerurlissuer-url,则该设置不会应用于插件。spring-doc.cadn.net.cn

这种缺乏松散的绑定还使得对身份验证参数使用环境变量成问题,因为在转换过程中会丢失区分大小写。 如果你对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤进行作,使其正常工作。spring-doc.cadn.net.cn

SSL认证

默认情况下,Pulsar 客户端以纯文本形式与 Pulsar 服务通信。 你可以按照 Spring for Apache Pulsar 参考文档中的这些步骤来启用 TLS 加密。spring-doc.cadn.net.cn

有关客户端和身份验证的完整详细信息,请参阅 Spring for Apache Pulsar 参考文档spring-doc.cadn.net.cn

连接到 Pulsar Reactors

当 Reactive 自动配置被激活时, Spring Boot 将自动配置并注册一个ReactivePulsarClient豆。spring-doc.cadn.net.cn

ReactivePulsarClient适配前面描述的PulsarClient. 因此,请按照上一节配置PulsarClientReactivePulsarClient.spring-doc.cadn.net.cn

连接到 Pulsar Administration

适用于 Apache Pulsar 的 SpringPulsarAdministration客户端也会自动配置。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到位于http://localhost:8080. 这可以通过设置spring.pulsar.admin.service-urlproperty 设置为表单中的其他值(http|https)://<host>:<port>.spring-doc.cadn.net.cn

如果您需要对配置进行更多控制,请考虑注册一个或多个PulsarAdminBuilderCustomizer豆。spring-doc.cadn.net.cn

认证

访问需要身份验证的 Pulsar 集群时,admin 客户端需要与常规 Pulsar 客户端相同的安全配置。 您可以通过将spring.pulsar.client.authenticationspring.pulsar.admin.authentication.spring-doc.cadn.net.cn

要在启动时创建主题,请添加PulsarTopic. 如果主题已存在,则忽略该 Bean。

发送消息

Spring的PulsarTemplate是自动配置的,您可以使用它来发送消息,如以下示例所示:spring-doc.cadn.net.cn

import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final PulsarTemplate<String> pulsarTemplate;

	public MyBean(PulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello");
	}

}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {

	@Throws(PulsarClientException::class)
	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello")
	}

}

The PulsarTemplate relies on a PulsarProducerFactory to create the underlying Pulsar producer. Spring Boot auto-configuration also provides this producer factory, which by default, caches the producers that it creates. You can configure the producer factory and cache settings by specifying any of the spring.pulsar.producer.* and spring.pulsar.producer.cache.* prefixed application properties.spring-doc.cadn.net.cn

If you need more control over the producer factory configuration, consider registering one or more ProducerBuilderCustomizer beans. These customizers are applied to all created producers. You can also pass in a ProducerBuilderCustomizer when sending a message to only affect the current producer.spring-doc.cadn.net.cn

If you need more control over the message being sent, you can pass in a TypedMessageBuilderCustomizer when sending a message.spring-doc.cadn.net.cn

Sending a Message Reactively

When the Reactive auto-configuration is activated, Spring’s ReactivePulsarTemplate is auto-configured, and you can use it to send messages, as shown in the following example:spring-doc.cadn.net.cn

import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarTemplate<String> pulsarTemplate;

	public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
		this.pulsarTemplate = pulsarTemplate;
	}

	public void someMethod() {
		this.pulsarTemplate.send("someTopic", "Hello").subscribe();
	}

}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {

	fun someMethod() {
		pulsarTemplate.send("someTopic", "Hello").subscribe()
	}

}

The ReactivePulsarTemplate relies on a ReactivePulsarSenderFactory to actually create the underlying sender. Spring Boot auto-configuration also provides this sender factory, which by default, caches the producers that it creates. You can configure the sender factory and cache settings by specifying any of the spring.pulsar.producer.* and spring.pulsar.producer.cache.* prefixed application properties.spring-doc.cadn.net.cn

If you need more control over the sender factory configuration, consider registering one or more ReactiveMessageSenderBuilderCustomizer beans. These customizers are applied to all created senders. You can also pass in a ReactiveMessageSenderBuilderCustomizer when sending a message to only affect the current sender.spring-doc.cadn.net.cn

If you need more control over the message being sent, you can pass in a MessageSpecBuilderCustomizer when sending a message.spring-doc.cadn.net.cn

Receiving a Message

When the Apache Pulsar infrastructure is present, any bean can be annotated with @PulsarListener to create a listener endpoint. The following component creates a listener endpoint on the someTopic topic:spring-doc.cadn.net.cn

import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarListener(topics = "someTopic")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?) {
		// ...
	}

}

Spring Boot auto-configuration provides all the components necessary for PulsarListener, such as the PulsarListenerContainerFactory and the consumer factory it uses to construct the underlying Pulsar consumers. You can configure these components by specifying any of the spring.pulsar.listener.* and spring.pulsar.consumer.* prefixed application properties.spring-doc.cadn.net.cn

If you need more control over the configuration of the consumer factory, consider registering one or more ConsumerBuilderCustomizer beans. These customizers are applied to all consumers created by the factory, and therefore all @PulsarListener instances. You can also customize a single listener by setting the consumerCustomizer attribute of the @PulsarListener annotation.spring-doc.cadn.net.cn

If you need more control over the actual container factory configuration, consider registering one or more PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> beans.spring-doc.cadn.net.cn

Receiving a Message Reactively

When the Apache Pulsar infrastructure is present and the Reactive auto-configuration is activated, any bean can be annotated with @ReactivePulsarListener to create a reactive listener endpoint. The following component creates a reactive listener endpoint on the someTopic topic:spring-doc.cadn.net.cn

import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@ReactivePulsarListener(topics = "someTopic")
	public Mono<Void> processMessage(String content) {
		// ...
		return Mono.empty();
	}

}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono

@Component
class MyBean {

	@ReactivePulsarListener(topics = ["someTopic"])
	fun processMessage(content: String?): Mono<Void> {
		// ...
		return Mono.empty()
	}

}

Spring Boot auto-configuration provides all the components necessary for ReactivePulsarListener, such as the ReactivePulsarListenerContainerFactory and the consumer factory it uses to construct the underlying reactive Pulsar consumers. You can configure these components by specifying any of the spring.pulsar.listener.* and spring.pulsar.consumer.* prefixed application properties.spring-doc.cadn.net.cn

If you need more control over the configuration of the consumer factory, consider registering one or more ReactiveMessageConsumerBuilderCustomizer beans. These customizers are applied to all consumers created by the factory, and therefore all @ReactivePulsarListener instances. You can also customize a single listener by setting the consumerCustomizer attribute of the @ReactivePulsarListener annotation.spring-doc.cadn.net.cn

If you need more control over the actual container factory configuration, consider registering one or more PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> beans.spring-doc.cadn.net.cn

Reading a Message

The Pulsar reader interface enables applications to manually manage cursors. When you use a reader to connect to a topic you need to specify which message the reader begins reading from when it connects to a topic.spring-doc.cadn.net.cn

When the Apache Pulsar infrastructure is present, any bean can be annotated with @PulsarReader to consume messages using a reader. The following component creates a reader endpoint that starts reading messages from the beginning of the someTopic topic:spring-doc.cadn.net.cn

import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@PulsarReader(topics = "someTopic", startMessageId = "earliest")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component

@Component
class MyBean {

	@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
	fun processMessage(content: String?) {
		// ...
	}

}

The @PulsarReader relies on a PulsarReaderFactory to create the underlying Pulsar reader. Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the spring.pulsar.reader.* prefixed application properties.spring-doc.cadn.net.cn

If you need more control over the configuration of the reader factory, consider registering one or more ReaderBuilderCustomizer beans. These customizers are applied to all readers created by the factory, and therefore all @PulsarReader instances. You can also customize a single listener by setting the readerCustomizer attribute of the @PulsarReader annotation.spring-doc.cadn.net.cn

If you need more control over the actual container factory configuration, consider registering one or more PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> beans.spring-doc.cadn.net.cn

Reading a Message Reactively

When the Apache Pulsar infrastructure is present and the Reactive auto-configuration is activated, Spring’s ReactivePulsarReaderFactory is provided, and you can use it to create a reader in order to read messages in a reactive fashion. The following component creates a reader using the provided factory and reads a single message from 5 minutes ago from the someTopic topic:spring-doc.cadn.net.cn

import java.time.Instant;
import java.util.List;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;

import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;

	public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
		this.pulsarReaderFactory = pulsarReaderFactory;
	}

	public void someMethod() {
		ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
			.topic("someTopic")
			.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
		Mono<Message<String>> message = this.pulsarReaderFactory
			.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
			.readOne();
		// ...
	}

}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant

@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {

	fun someMethod() {
		val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
			readerBuilder: ReactiveMessageReaderBuilder<String> ->
				readerBuilder
					.topic("someTopic")
					.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
		}
		val message = pulsarReaderFactory
				.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
				.readOne()
		// ...
	}

}

Spring Boot auto-configuration provides this reader factory which can be customized by setting any of the spring.pulsar.reader.* prefixed application properties.spring-doc.cadn.net.cn

If you need more control over the reader factory configuration, consider passing in one or more ReactiveMessageReaderBuilderCustomizer instances when using the factory to create a reader.spring-doc.cadn.net.cn

If you need more control over the reader factory configuration, consider registering one or more ReactiveMessageReaderBuilderCustomizer beans. These customizers are applied to all created readers. You can also pass one or more ReactiveMessageReaderBuilderCustomizer when creating a reader to only apply the customizations to the created reader.spring-doc.cadn.net.cn

For more details on any of the above components and to discover other available features, see the Spring for Apache Pulsar reference documentation.

Transaction Support

Spring for Apache Pulsar supports transactions when using PulsarTemplate and @PulsarListener.spring-doc.cadn.net.cn

Transactions are not currently supported when using the reactive variants.

Setting the spring.pulsar.transaction.enabled property to true will:spring-doc.cadn.net.cn

The transactional attribute of @PulsarListener can be used to fine-tune when transactions should be used with listeners.spring-doc.cadn.net.cn

For more control of the Spring for Apache Pulsar transaction features you should define your own PulsarTemplate and/or ConcurrentPulsarListenerContainerFactory beans. You can also define a PulsarAwareTransactionManager bean if the default auto-configured PulsarTransactionManager is not suitable.spring-doc.cadn.net.cn

Additional Pulsar Properties

The properties supported by auto-configuration are shown in the Integration Properties section of the Appendix. Note that, for the most part, these properties (hyphenated or camelCase) map directly to the Apache Pulsar configuration properties. See the Apache Pulsar documentation for details.spring-doc.cadn.net.cn

Only a subset of the properties supported by Pulsar are available directly through the PulsarProperties class. If you wish to tune the auto-configured components with additional properties that are not directly supported, you can use the customizer supported by each aforementioned component.spring-doc.cadn.net.cn