长沙分类信息网-长沙新闻网

Spring Cloud数据流中的组合函数支持

2024-4-14 18:36:27发布次查看发布人:
spring cloud stream最近添加了一项function,可将函数定义组合到现有的spring cloud stream应用程序中。在本博客中,我们将看到spring cloud data flow如何利用此功能在streaming管道中组合函数。
它有什么不同?在spring cloud data flow中,流数据管道由spring cloud stream应用程序组成,开发人员可以选择开箱即用的流应用程序,其中包含许多常见用例。开发人员还可以使用spring cloud stream框架扩展这些开箱即用的应用程序或创建自定义应用程序。
spring cloud stream 2.1.0 ga 已经集成了一个基于spring cloud function-based编程模型,可以使用java.util.function,一个java.util.consumer,和一个java.util.supplier表示业务逻辑,其相应的对应的角色是spring cloud stream中的processor,sink和source。
鉴于这种两者结合映射的灵活性,spring cloud stream框架现在支持一种简单但功能强大的函数组合方法。这种函数组合可以是源source和处理器processor组合成一个单个应用程序:“新源source”;或者,它可能是处理器processor+接收器sink组合到一个新的应用程序中:“新的sink”。这种灵活性为流应用程序开发人员开辟了有趣的新方式。
让我们看看如何通过三个应用程序创建管道来执行简单转换,然后使用两个使用函数组合的应用程序来了解如何将其实现为管道。
streaming pipeline有三个应用程序
对于第一个流,我们将使用开箱即用的.7.3.release.jar
然后,启动spring cloud数据流shell:
java -jar spring-cloud-dataflow-shell-1.7.3.release.jar
现在让我们使用rabbitmq绑定器(或kafka绑定器)分别注册.1.0.m2/.1.0.m2.jar
注册处理器:
dataflow:>app register --name transformer --type processor --uri .1.0.m2/transform-processor-rabbit-2.1.0.m2.jar
注册接收器sink:
dataflow:>app register --name log --type sink --uri .1.0.m2/log-sink-rabbit-2.1.0.m2.jar
现在我们可以创建一个没有函数组合的简单流:
dataflow:>stream create hello --definition | transformer --expression=(\hello \+payload.tostring().touppercase()) | log
然后我们可以部署流:
dataflow:>stream deploy hello --properties deployer.*.local.inheritlogging=true
dataflow:>
post (text/plain) friend
202 accepted
您可以在log应用程序中看到以下日志消息:
[sformer.hello-1] log-sink : hello friend
在此流中,我们将.1.0.build-snapshot.jar
现在让我们创建一个流:
dataflow:>stream create hellocomposed --definition | log
在部署流时,我们传递spring.cloud.stream.function.definition属性以定义组合函数dsl(在spring cloud function中定义)。在这种情况下,它是:
dataflow:>stream deploy hellocomposed --properties app.
> post (text/plain) friend
> 202 accepted
然后你可以在log应用程序中看到输出:
[hellocomposed-1] log-sink : hello friend
请注意,函数组合支持不适用于开箱即用的spring cloud stream processor应用程序,因为在现有处理器的应用程序逻辑之前或之后是否需要应用该功能存在不确定性。
但是,您可以使用标准java.util.function api创建自己的处理器应用程序,使用函数组合,如以下示例所示:
@configuration
public static class functionprocessorconfiguration {
@bean
public function upperandconcat() {
return upper().andthen(concat());
}
@bean
public function upper() {
return value -> value.touppercase();
}
@bean
public function concat() {
return value -> hello + value;
}
}
然后,您需要使用以下属性进行部署:
spring.cloud.stream.function.definition=upperandconcat
kotlin支持另一个有趣的特性是spring cloud function支持kotlin函数的功能组合。这允许我们将任何kotlin函数bean添加到组合函数source或sink应用程序中。
要看到这个工作,让我们使用.0.0.release
cd function-composition/.1.0.build-snapshot.jar
要使用此应用程序创建流,请执行以下操作source:
dataflow:>stream create hellocomposedkotlin --definition | log
正如我们在
> post (text/plain) friend
> 202 accepted
并且,您可以在log应用程序中看到输出为:
[omposedkotlin-1] log-sink : hello how are you friend
在此示例中,http-transformer还包含函数的源代码。但是,您可以通过在单独的工件中定义函数bean来使应用程序更加模块化。然后,您可以通过仅向项目添加maven依赖项并设置spring.cloud.stream.function.definition属性来构建应用程序。通过这种方式,您可以将大部分业务逻辑编码为函数,并且如果需要,可以使用source或sink组合它。
该用户其它信息

推荐信息

长沙分类信息网-长沙新闻网
关于本站