在以下示例中,我们发布到一个名为 的主题。 这是一个分区的主题,对于此示例,我们假定该主题已使用三个分区创建。hello-pulsar-partitioned

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前面的示例中,我们发布到一个分区主题,并希望将某个数据段发布到特定分区。 如果你把它留给 Pulsar 的默认值,它会遵循分区分配的循环模式,我们想覆盖它。 为此,我们提供了一个带有该方法的消息路由器对象。 考虑实现的三个消息路由器。 总是将数据发送到分区,发送到分区,并发送到分区。 另请注意,我们现在使用 that 返回 . 在运行应用程序时,我们还需要将 on the producer 设置为 ()。sendFooRouter0BarRouter1BuzzRouter2sendAsyncPulsarTemplateCompletableFuturemessageRoutingModeCustomPartitionspring.pulsar.producer.message-routing-mode

在消费者方面,我们使用具有独家订阅类型的订阅。 这意味着来自所有分区的数据最终都属于同一个使用者,并且没有排序保证。PulsarListener

如果我们希望每个分区都由一个不同的使用者使用,我们该怎么办? 我们可以切换到订阅模式,并添加三个单独的消费者:failover

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

当您遵循此方法时,单个分区始终由专用使用者使用。

同样,如果你想使用 Pulsar 的共享消费者类型,你可以使用订阅类型。 但是,当您使用该模式时,您将失去任何排序保证,因为单个使用者可能会在另一个使用者获得机会之前收到来自所有分区的消息。sharedshared

请看以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}