此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 spring-cloud-stream 4.1.4! |
事件路由
在 Spring Cloud Stream 的上下文中,事件路由是 a) 将事件路由到特定事件订阅者或 b) 将事件订阅者生成的事件路由到特定目标的能力。 在这里,我们将其称为路由 'TO' 和路由 'FROM'。
路由到使用者
路由可以通过依赖来实现RoutingFunction
在 Spring Cloud Function 3.0 中可用。您需要做的就是通过以下方式启用它--spring.cloud.stream.function.routing.enabled=true
application 属性或提供spring.cloud.function.routing-expression
财产。
启用后RoutingFunction
将绑定到输入目标
接收所有消息,并根据提供的指令将它们路由到其他函数。
为了绑定,路由目标的名称为functionRouter-in-0 (请参阅 RoutingFunction.FUNCTION_NAME 和绑定命名约定 [功能绑定名称])。 |
指令可以通过单个消息以及应用程序属性来提供。
以下是几个示例:
使用消息标头
@SpringBootApplication
public class SampleApplication {
public static void main(String[] args) {
SpringApplication.run(SampleApplication.class,
"--spring.cloud.stream.function.routing.enabled=true");
}
@Bean
public Consumer<String> even() {
return value -> {
System.out.println("EVEN: " + value);
};
}
@Bean
public Consumer<String> odd() {
return value -> {
System.out.println("ODD: " + value);
};
}
}
通过向functionRouter-in-0
Binder 公开的目标(即 rabbit、Kafka ),
此类消息将被路由到适当的 ('even' 或 'odd') Consumer 。
默认情况下RoutingFunction
将查找spring.cloud.function.definition
或spring.cloud.function.routing-expression
(适用于 SPEL 的更多动态场景)
header 的 Header,如果找到,它的值将被视为路由指令。
例如
设置spring.cloud.function.routing-expression
header 到 valueT(java.lang.System).currentTimeMillis() % 2 == 0 ? 'even' : 'odd'
最终将请求半随机路由到odd
或even
功能。
此外,对于 SPEL,求值上下文的根对象是Message
因此,您也可以对单个 Headers(或 Message)进行评估….routing-expression=headers['type']
使用应用程序属性
这spring.cloud.function.routing-expression
和/或spring.cloud.function.definition
可以作为应用程序属性传递(例如spring.cloud.function.routing-expression=headers['type']
.
@SpringBootApplication
public class RoutingStreamApplication {
public static void main(String[] args) {
SpringApplication.run(RoutingStreamApplication.class,
"--spring.cloud.function.routing-expression="
+ "T(java.lang.System).nanoTime() % 2 == 0 ? 'even' : 'odd'");
}
@Bean
public Consumer<Integer> even() {
return value -> System.out.println("EVEN: " + value);
}
@Bean
public Consumer<Integer> odd() {
return value -> System.out.println("ODD: " + value);
}
}
通过应用程序属性传递指令对于响应式函数尤其重要,因为响应式 函数仅调用一次以传递 Publisher,因此对各个项目的访问是有限的。 |
路由功能和输出绑定
RoutingFunction
是一个Function
因此,对待其他任何功能的行为都无异。井。。。几乎。
什么时候RoutingFunction
路由到另一个Function
,则其输出将发送到RoutingFunction
哪
是functionRouter-in-0
不出所料。但是,如果RoutingFunction
路由到Consumer
?换句话说,调用
的RoutingFunction
可能不会产生任何要发送到 output binding 的内容,因此甚至需要一个。
因此,我们确实治疗RoutingFunction
当我们创建 Binding 时,情况略有不同。即使它对作为用户的您是透明的
(你真的没什么可做的),了解一些机制会帮助你理解它的内部运作。
所以,规则是;
我们从不为RoutingFunction
,仅输入因此,当您路由到Consumer
这RoutingFunction
有效
变为Consumer
没有任何 output bindings。但是,如果RoutingFunction
碰巧路由到另一个Function
它产生
output,即RoutingFunction
将在此时动态创建RoutingFunction
将充当常规Function
关于绑定(同时具有输入和输出绑定)。
路由 FROM Consumer
除了静态目标之外,Spring Cloud Stream 还允许应用程序将消息发送到动态绑定的目标。 例如,当需要在运行时确定目标时,这非常有用。 应用程序可以通过以下两种方式之一来实现此目的。
spring.cloud.stream.sendto.destination
您还可以委托给框架,通过指定spring.cloud.stream.sendto.destination
页眉
设置为 要解析的目标的名称。
请考虑以下示例:
@SpringBootApplication
@Controller
public class SourceWithDynamicDestination {
@Bean
public Function<String, Message<String>> destinationAsPayload() {
return value -> {
return MessageBuilder.withPayload(value)
.setHeader("spring.cloud.stream.sendto.destination", value).build();};
}
}
尽管在这个例子中你可以清楚地看到,但我们的输出是一个带有spring.cloud.stream.sendto.destination
页眉
设置为 He input 参数的值。框架将查阅此标头,并尝试创建或发现
具有该名称的目标,并将输出发送到该目标。
如果事先知道目标名称,则可以像配置任何其他目标一样配置创建者属性。
或者,如果您注册了NewDestinationBindingCallback<>
bean,则会在创建绑定之前调用它。
回调采用 Binder 使用的扩展生产者属性的泛型类型。
它有一个方法:
void configure(String destinationName, MessageChannel channel, ProducerProperties producerProperties,
T extendedProducerProperties);
以下示例显示了如何使用 RabbitMQ Binder:
@Bean
public NewDestinationBindingCallback<RabbitProducerProperties> dynamicConfigurer() {
return (name, channel, props, extended) -> {
props.setRequiredGroups("bindThisQueue");
extended.setQueueNameGroupOnly(true);
extended.setAutoBindDlq(true);
extended.setDeadLetterQueueName("myDLQ");
};
}
如果您需要支持具有多个 Binder 类型的动态目标,请使用Object 对于泛型类型,并强制转换extended 参数。 |
此外,请参阅 [使用 StreamBridge] 部分,了解如何将另一个选项 (StreamBridge) 用于类似情况。