Spring Cloud Flow与Apache Spark集成

2021-03-31 00:25

阅读:388

标签:collect   too   clu   poi   slice   name   交互   shel   spark   

Spring Cloud Flow与Apache Spark集成

点击左上角,关注:“锅外的大佬”

专注分享国外最新技术内容
帮助每位开发者更优秀地成长

1.简介

SpringCloudDataFlow是用于构建数据集成和实时数据处理管道的工具包。 在这种情况下,管道(Pipelines)是使用 SpringCloudStream或 SpringCloudTask框架构建的 SpringBoot应用程序。

在本教程中,我们将展示如何将 SpringCloudDataFlow与 ApacheSpark一起使用。

2.本地数据流服务

首先,我们需要运行数据流服务器(Data Flow Server)才能部署我们的作业( jobs)。要在本地运行数据流服务器,需要使用 spring-cloud-starter-dataflow-server-local依赖创建一个新项目:


org.springframework.cloud



spring-cloud-starter-dataflow-server-local



1.7.4.RELEASE

之后,使用 @EnableDataFlowServer来注解服务中的主类(main class):

@EnableDataFlowServer
@SpringBootApplication
public

class

SpringDataFlowServerApplication

{

public

static

void
 main
(
String
[]
 args
)

{

SpringApplication
.
run
(

SpringDataFlowServerApplication
.
class
,
 args
);

}
}

运行此应用程序后,本地数据流服务运行在端口 9393。

3.新建工程

我们将 SparkJob作为本地单体应用程序创建,这样我们就不需要任何集群来运行它。

3.1依赖

首先,添加 Spark依赖


org.apache.spark



spark-core_2.10



2.4.0

3.2 创建job

对 job来说,就是为了求 pi的近似值:

public

class

PiApproximation

{

public

static

void
 main
(
String
[]
 args
)

{

SparkConf
 conf 
=

new

SparkConf
().
setAppName
(
"BaeldungPIApproximation"
);

JavaSparkContext
 context 
=

new

JavaSparkContext
(
conf
);

int
 slices 
=
 args
.
length 
>=

1

?

Integer
.
valueOf
(
args
[
0
])

:

2
;

int
 n 
=

(
100000L

*
 slices
)

>

Integer
.
MAX_VALUE 
?

Integer
.
MAX_VALUE 
:

100000

*
 slices
;

List

 xs 
=

IntStream
.
rangeClosed
(
0
,
 n
)

.
mapToObj
(
element 
->

Integer
.
valueOf
(
element
))

.
collect
(
Collectors
.
toList
());

JavaRDD

 dataSet 
=
 context
.
parallelize
(
xs
,
 slices
);

JavaRDD

 pointsInsideTheCircle 
=
 dataSet
.
map
(
integer 
->

{

double
 x 
=

Math
.
random
()

*

2

-

1
;

double
 y 
=

Math
.
random
()

*

2

-

1
;

return

(
x 
*
 x 
+
 y 
*
 y 
)


 integer 
+
 integer2
);

System
.
out
.
println
(
"The pi was estimated as:"

+
 count 
/
 n
);
        context
.
stop
();

}
}

4. Data Flow Shell

DataFlowShell是一个 允许我们与服务器交互的应用程序。 Shell使用 DSL命令来描述数据流。

要使用 DataFlowShell,我们要创建一个运行它的项目。 首先,需要 spring-cloud-dataflow-shell依赖:


org.springframework.cloud



spring-cloud-dataflow-shell



1.7.4.RELEASE

添加依赖项后,我们可以创建主类来运行 DataFlowShell:

@EnableDataFlowShell
@SpringBootApplication
public

class

SpringDataFlowShellApplication

{

public

static

void
 main
(
String
[]
 args
)

{

SpringApplication
.
run
(
SpringDataFlowShellApplication
.
class
,
 args
);

}
}

5.部署项目

为了部署我们的项目,可在三个版本( cluster, yarn和 client)中使用 ApacheSpark所谓 任务运行器(task runner)—— 我们将使用 client版本。 任务运行器(task runner)是真正运行 Sparkjob的实例。为此,我们首先需要使用 DataFlowShell注册 task:

app 
register

--
type task 
--
name spark
-
client 
--
uri 
maven
:
//org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

task允许我们指定多个不同的参数,其中一些参数是可选的,但是一些参数是正确部署 Sparkjob所必需的:

  • spark.app-class,已提交 job的主类
  • spark.app-jar,包含 job的 fat-jar路径
  • spark.app-name, job的名称
  • spark.app-args,将传递给 job的参数 我们可以使用注册的任务 spark-client提交我们的工作,记住提供所需的参数:
    task create spark1 
    definition 
    "spark-client spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

    请注意, spark.app-jar是我们 job中 fat-jar的路径。成功创建任务后,我们可以使用以下命令继续运行它:

    task launch spark1

    这将调用 task的执行。

6.总结

在本教程中,我们展示了如何使用 SpringCloudDataFlow框架来处理 ApacheSpark数据。 有关 SpringCloudDataFlow框架的更多信息,请参阅文档。

所有代码示例都可以在 GitHub上找到。

原文链接:https://www.baeldung.com/spring-cloud-data-flow-spark

作者:baeldung

译者:Leesen

推荐阅读:快速掌握FileChannel
上篇好文:Spring Boot中使用RSocket

点击在看,和我一起帮助更多开发者!技术图片

Spring Cloud Flow与Apache Spark集成

标签:collect   too   clu   poi   slice   name   交互   shel   spark   

原文地址:https://blog.51cto.com/14901350/2524842


评论


亲,登录后才可以留言!