Spark on K8S (Kubernetes Native)

2021-01-21 06:12

阅读:572

标签:nload   binding   info   export   github   stop   yam   uber   环境   

Spark on K8S 的几种模式

  • Standalone:在 K8S 启动一个长期运行的集群,所有 Job 都通过 spark-submit 向这个集群提交
  • Kubernetes Native:通过 spark-submit 直接向 K8S 的 API Server 提交,申请到资源后启动 Pod 做为 Driver 和 Executor 执行 Job,参考 http://spark.apache.org/docs/2.4.6/running-on-kubernetes.html
  • Spark Operator:安装 Spark Operator,然后定义 spark-app.yaml,再执行 kubectl apply -f spark-app.yaml,这种申明式 API 和调用方式是 K8S 的典型应用方式,参考 https://github.com/GoogleCloudPlatform/spark-on-k8s-operator

Start Minikube

sudo minikube start --driver=none --image-repository=registry.cn-hangzhou.aliyuncs.com/google_containers

如果启动失败可以尝试先删除集群 minikube delete

Spark on K8S 官网

https://spark.apache.org/docs/latest/running-on-kubernetes.html

上面没说 Spark 版本和 K8S 版本的兼容问题,但是是有影响的

Download Spark

https://archive.apache.org/dist/spark/

Spark 可能和 Hadoop 关系比较紧密,可以下载带 Hadoop 的版本,这样会有 Hadoop 的 jar 包可以用,不然可能会出现找不到包和类的错误,哪怕其实没用到 Hadoop

Build Spark Image

Spark 提供 bin/docker-image-tool.sh 工具用于 build image

这个工具会找到 kubernetes/dockerfiles 下的 docker 文件,根据 docker file 会把需要的 Spark 命令、工具、库、jar 包、java、example、entrypoint.sh 等 build 进 image

2.3 只支持 Java/Scala,从 2.4 开始支持 Python 和 R,会有三个 docker file,会 build 出三个 image,其中 Python 和 R 是基于 Java/Scala 版的

sudo ./bin/docker-image-tool.sh -t my_spark_2.4_hadoop_2.7 build

遇到类似下面的错误

WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.9/main/x86_64/APKINDEX.tar.gz: temporary error (try again later)
WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.9/community/x86_64/APKINDEX.tar.gz: temporary error (try again later)
ERROR: unsatisfiable constraints:
  bash (missing):
    required by: world[bash]

这是网络问题,可以修改 ./bin/docker-image-tool.sh,在里面的 docker build 命令加上 --network=host 使容器使用宿主机网络 (要确保宿主机网络是 OK 的)

提交 Job

bin/spark-submit     --master k8s://https://:     --deploy-mode cluster     --name spark-pi     --class org.apache.spark.examples.SparkPi     --conf spark.executor.instances=5     --conf spark.kubernetes.container.image=     local:///path/to/examples.jar

注意这里的 local:///path/to/examples.jar 指的是 容器的文件系统,不是执行 spark-submit 的机器的文件系统,官网的说法:Note that using application dependencies from the submission client‘s local file system is currently not yet supported.

如果不使用 local 的话,也可以用 HTTP、HDFS 等系统,没指定的话默认是 local 模式

因为一开始没用带 Hadoop 包的 Spark,结果 spark-submit 会报 classNotFound
然后指定 --jars 或是在宿主机的 conf/spark-env.sh 添加

export SPARK_DIST_CLASSPATH=$(/home/lin/Hadoop/hadoop-2.8.3/bin/hadoop classpath)

这样 spark-submit 过了,但容器跑起来后还是报 classNotFound
实际上启动的 driver 容器又调用了 spark-submit,只是改了一些参数,比如把 cluster 模式改成 client 模式
后来改成使用带 Hadoop 包的 Spark,这个问题就没出现了
所以推测 spark-submit 使用 --jars 指定的包,可能也需要在 容器里有

获取 K8S Api Server 的地址

sudo kubectl cluster-info

假设返回

https://192.168.0.107:8443

那么 spark-submit 命令是

# --master 指定 k8s api server
# --conf spark.kubernetes.container.image 指定通过 docker-image-tool.sh 创建的镜像
# 第一个 wordcount.py 是要执行的命令
# 第二个 wordcount.py 是参数,即统计 wordcount.py 文件的单词量
bin/spark-submit     --master k8s://https://192.168.0.107:8443     --deploy-mode cluster     --name spark-test     --conf spark.executor.instances=3     --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7     /opt/spark/examples/src/main/python/wordcount.py     /opt/spark/examples/src/main/python/wordcount.py

这样可能会报证书错误,无法启动 Pod,可能需要配置证书
在测试环境可以用下面的命令使用 proxy

kubectl proxy

然后 spark-submit 命令变成

# Api Server 的地址变成 http://127.0.0.1:8001
bin/spark-submit     --master k8s://http://127.0.0.1:8001     --deploy-mode cluster     --name spark-test     --conf spark.executor.instances=3     --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7     /opt/spark/examples/src/main/python/wordcount.py     /opt/spark/examples/src/main/python/wordcount.py

这样还是会报错,在宿主机或容器里报,没有权限,需要在 K8S 配置一个有权限的用户

准备一个 role.yaml 文件

apiVersion: v1
kind: ServiceAccount
metadata:
  name: spark
  namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: default
  name: spark-role
rules:
- apiGroups: [""]
  resources: ["pods"]
  verbs: ["*"]
- apiGroups: [""]
  resources: ["services"]
  verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: spark-role-binding
  namespace: default
subjects:
- kind: ServiceAccount
  name: spark
  namespace: default
roleRef:
  kind: Role
  name: spark-role
  apiGroup: rbac.authorization.k8s.io

可以参考 https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/manifest/spark-rbac.yaml

执行命令

sudo kubectl apply -f role.yaml

查看配置

sudo kubectl get role
sudo kubectl get role spark-role -o yaml
sudo kubectl get rolebinding
sudo kubectl get rolebinding spark-role-binding -o yaml

重新提交

# 添加了 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
bin/spark-submit     --master k8s://http://127.0.0.1:8001     --deploy-mode cluster     --name spark-test     --conf spark.executor.instances=3     --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark     --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7     /opt/spark/examples/src/main/python/wordcount.py     /opt/spark/examples/src/main/python/wordcount.py

没报权限错误了,但可能还会有其他错误

20/07/09 06:32:23 INFO SparkContext: Successfully stopped SparkContext
Traceback (most recent call last):
  File "/opt/spark/examples/src/main/python/wordcount.py", line 33, in 
    .appName("PythonWordCount")  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
: org.apache.spark.SparkException: External scheduler cannot be instantiated
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
        at org.apache.spark.SparkContext.(SparkContext.scala:493)
        at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:58)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [spark-test-1594276334218-driver]  in namespace: [default]  failed.
        at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
        at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:237)
        at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:170)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:57)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:55)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.(ExecutorPodsAllocator.scala:55)
        at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:89)
        at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
        ... 13 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)

这个 Broken pipe 应该是 Spark 使用的代码和 jar 包,跟 K8S 不兼容导致的
尝试替换 spark 下面的 jar 目录下的 k8s 包

https://repo1.maven.org/maven2/io/fabric8/kubernetes-client/4.4.2/kubernetes-client-4.4.2.jar
https://gitee.com/everworking/kubernetes-client

但还是有其他问题,应该直接把 K8S 的版本降低,到 1.16 (官网里没写 Spark 和 K8S 的版本匹配问题)

Spark on K8S (Kubernetes Native)

标签:nload   binding   info   export   github   stop   yam   uber   环境   

原文地址:https://www.cnblogs.com/moonlight-lin/p/13296909.html


评论


亲,登录后才可以留言!