Apache Cassandra 支持

Apache Cassandra 支持

Spring 集成提供了通道适配器(从版本 6.0 开始),用于对 Apache Cassandra 集群执行数据库作。 它完全基于 Spring Data for Apache Cassandra 项目。spring-doc.cadn.net.cn

您需要将此依赖项包含在您的项目中:spring-doc.cadn.net.cn

Maven 系列
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-cassandra</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-cassandra:6.0.9"

Cassandra 出站组件

CassandraMessageHandler是一个AbstractReplyProducingMessageHandler实现,并且可以在单向(默认)和请求-回复模式(一个producesReply选项)。 默认情况下,它是异步的 (setAsync(false)重置)并执行反应式INSERT,UPDATE,DELETESTATEMENT对提供的ReactiveCassandraOperations. 作类型可以通过CassandraMessageHandler.Type选择。 这ingestQuery将模式设置为INSERT;这querystatementExpressionstatementProcessor将模式设置为STATEMENT.spring-doc.cadn.net.cn

以下代码片段演示了此通道适配器或网关的各种配置:spring-doc.cadn.net.cn

Java DSL
@Bean
IntegrationFlow cassandraSelectFlow(ReactiveCassandraOperations cassandraOperations) {
    return flow -> flow
            .handle(Cassandra.outboundGateway(cassandraOperations)
                    .query("SELECT * FROM book WHERE author = :author limit :size")
                    .parameter("author", "payload")
                    .parameter("size", m -> m.getHeaders().get("limit")))
            .channel(c -> c.flux("resultChannel"));
}
Kotlin DSL
@Bean
fun outboundReactive(cassandraOperations: ReactiveCassandraOperations) =
    integrationFlow {
        handle(
            Cassandra.outboundChannelAdapter(cassandraOperations)
                              .statementExpression("T(QueryBuilder).truncate('book').build()")
        ) { async(false) }
    }
Java
@ServiceActivator(inputChannel = "cassandraSelectChannel")
@Bean
public MessageHandler cassandraMessageHandler() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    cassandraMessageHandler.setQuery("SELECT * FROM book WHERE author = :author limit :size");

    Map<String, Expression> params = new HashMap<>();
    params.put("author", PARSER.parseExpression("payload"));
    params.put("size", PARSER.parseExpression("headers.limit"));

    cassandraMessageHandler.setParameterExpressions(params);

    cassandraMessageHandler.setOutputChannel(resultChannel());
    cassandraMessageHandler.setProducesReply(true);
    return cassandraMessageHandler;
}
XML 格式
<int-cassandra:outbound-channel-adapter id="outboundAdapter"
                                        cassandra-template="cassandraTemplate"
                                        write-options="writeOptions"
                                        auto-startup="false"
                                        async="false"/>

<int-cassandra:outbound-gateway id="outgateway"
                                request-channel="input"
                                cassandra-template="cassandraTemplate"
                                mode="STATEMENT"
                                write-options="writeOptions"
                                query="SELECT * FROM book limit :size"
                                reply-channel="resultChannel"
                                auto-startup="true">
    <int-cassandra:parameter-expression name="author" expression="payload"/>
    <int-cassandra:parameter-expression name="size" expression="headers.limit"/>
</int-cassandra:outbound-gateway>

如果CassandraMessageHandler在默认异步模式下用作网关,则Mono<WriteResult>生成,根据提供的MessageChannel实现。 对于真正的响应式处理,一个FluxMessageChannel建议用于 Output Channel 配置。 在同步模式下Mono.block()获取 Reply 值。spring-doc.cadn.net.cn

如果INSERT,UPDATEDELETE作时,一个实体(标记为org.springframework.data.cassandra.core.mapping.Table) 在请求消息负载中。 如果有效负载是实体列表,则执行相应的批处理作。spring-doc.cadn.net.cn

ingestQuerymode 期望有效负载以要插入的值矩阵的形式存在 -List<List<?>>. 例如,如果实体如下所示:spring-doc.cadn.net.cn

@Table("book")
public record Book(@PrimaryKey String isbn,
                   String title,
                   @Indexed String author,
                   int pages,
                   LocalDate saleDate,
                   boolean isInStock) {

}

channel adapter 有这样的配置:spring-doc.cadn.net.cn

@Bean
public MessageHandler cassandraMessageHandler3() {
    CassandraMessageHandler cassandraMessageHandler = new CassandraMessageHandler(this.template);
    String cqlIngest = "insert into book (isbn, title, author, pages, saleDate, isInStock) values (?, ?, ?, ?, ?, ?)";
    cassandraMessageHandler.setIngestQuery(cqlIngest);
    cassandraMessageHandler.setAsync(false);
    return cassandraMessageHandler;
}

请求消息有效负载必须按如下方式转换:spring-doc.cadn.net.cn

List<List<Object>> ingestBooks =
    payload.stream()
            .map(book ->
                    List.<Object>of(
                            book.isbn(),
                            book.title(),
                            book.author(),
                            book.pages(),
                            book.saleDate(),
                            book.isInStock()))
            .toList();

对于更复杂的用例,payload 可以作为com.datastax.oss.driver.api.core.cql.Statement. 这com.datastax.oss.driver.api.querybuilder.QueryBuilder建议使用 API 来构建各种语句以针对 Apache Cassandra 执行。 例如,要从Booktable 中,可以向CassandraMessageHandler:QueryBuilder.truncate("book").build(). 或者,对于基于请求消息的逻辑,使用statementExpressionstatementProcessor可以为CassandraMessageHandler要构建Statement基于该消息。 为方便起见,一个com.datastax.oss.driver.api.querybuilder已注册为import到 SPEL 评估上下文中,因此目标表达式可以像这样简单:spring-doc.cadn.net.cn

statement-expression="T(QueryBuilder).selectFrom("book").all()"

setParameterExpressions(Map<String, Expression> parameterExpressions)表示可绑定的命名查询参数,并且仅与setQuery(String query)选择。 请参阅上面提到的 Java 和 XML 示例。spring-doc.cadn.net.cn