大數據

在Kubernetes中用Alluxio加速Spark數據訪問(一)

1.背景信息

1.1 alluxio

Alluxio是一個開源的基於內存的分佈式存儲系統,適合作為雲上大數據和AI / ML的數據編排方案。Alluxio可以同時管理多個底層文件系統,將不同的文件系統統一在同一個名稱空間下,讓上層客戶端可以自由訪問統一名稱空間內的不同路徑,不同存儲系統的數據。

alluxio的short-circuit功能可以使alluxio客戶端直接訪問alluxio worker所在主機的工作存儲,而不需要通過網絡棧與alluxio worker完成通信,可以提高性能。

1.2 spark operator

Spark-operator用於管理k8s集群中spark job。通過spark-operator可以在k8s集群中創建、查看和刪除spark job。

2.前提條件

本文檔的操作依賴如下的一些條件:

  • kubernetes集群:版本大於1.8,本次實驗的集群通過阿里雲容器服務創建,集群名稱為"ack-create-by-openapi-1"。

image.png

  • 安裝有linux或者mac操作系統的計算機作為我們的實驗環境(本次實驗中,假設該計算機名稱為alluxio-test)。該計算機需要準備如下環境:

    • docker >= 17.06
    • kubectl >= 1.8,能夠連接kubernets集群ack-create-by-openapi-1

3.實驗步驟

實驗步驟主要包括如下幾步:

  • 部署alluxio
  • 部署spark-operator
  • 製作spark docker鏡像
  • 上傳文件到alluxio
  • 提交spark job

下面將對每個步驟進行說明:

3.1 部署alluxio

進入容器服務應用目錄,在右上角的搜索框中搜索"alluxio",然後進入alluxio主界面,如圖:
image.png

選擇“參數”,修改配置中properties部分的"alluxio.user.short.circuit.enabled"值為"false",然後選擇將alluxio安裝到目標集群上(本次實驗的集群為"ack-create-by-openapi-1"),最後點擊創建,如圖
image.png

點擊創建後,使用kubectl給待安裝的alluxio組件的節點打上標籤"alluxio=true",首先查看該集群有哪些節點:

$ kubectl get nodes -o wide
NAME                      STATUS   ROLES    AGE   VERSION            INTERNAL-IP    EXTERNAL-IP   OS-IMAGE                               KERNEL-VERSION            CONTAINER-RUNTIME
cn-beijing.192.168.8.12   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.12   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.13   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.13   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.14   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.14   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.15   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.15   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.16   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.16   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.17   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.17   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5

可以看到有三個worker節點,分別為:

  • cn-beijing.192.168.8.15
  • cn-beijing.192.168.8.16
  • cn-beijing.192.168.8.17

我們給是三個節點都打上標籤"alluxio=true":

$ kubectl label nodes cn-beijing.192.168.8.15 \
  cn-beijing.192.168.8.16 \
  cn-beijing.192.168.8.17 \
  alluxio=true

使用kubectl查看各個pod是否都處於running狀態:

$ kubectl get po -n alluxio
NAME                   READY   STATUS    RESTARTS   AGE
alluxio-master-0       2/2     Running   0          4h1m
alluxio-worker-5zg26   2/2     Running   0          4h1m
alluxio-worker-ckmr9   2/2     Running   0          4h1m
alluxio-worker-dvgvd   2/2     Running   0          4h1m

驗證alluxio是否處於ready:

$ kubectl exec -ti alluxio-master-0 -n alluxio bash

//下面步驟alluxio-master-0 pod中執行
bash-4.4# alluxio fsadmin report capacity

Capacity information for all workers:
    Total Capacity: 3072.00MB
        Tier: MEM  Size: 3072.00MB
    Used Capacity: 0B
        Tier: MEM  Size: 0B
    Used Percentage: 0%
    Free Percentage: 100%

Worker Name      Last Heartbeat   Storage       MEM
192.168.8.15    0                capacity      1024.00MB
                                  used          0B (0%)
192.168.8.16    0                capacity      1024.00MB
                                  used          0B (0%)
192.168.8.17    0                capacity      1024.00MB
                                  used          0B (0%)

3.2 部署spark-operator

進入容器服務應用目錄,在右上角的搜索框中搜索"ack-spark-operator",然後進入ack-spark-operator主界面,如圖:
image.png
選擇將ack-spark-operator安裝到目標集群上(本次實驗的集群為"ack-create-by-openapi-1"),然後點擊創建,如圖:
image.png

本次實驗將會使用sparkctl向k8s集群提交一個spark job,需要將sparkctl安裝到我們在"2.前提條件"中所提到的實驗環境"alluxio-test"中:

$ wget http://spark-on-k8s.oss-cn-beijing.aliyuncs.com/sparkctl/sparkctl-linux-amd64 -O /usr/local/bin/sparkctl
$ chmod +x /usr/local/bin/sparkctl

3.3 製作spark docker鏡像

spark下載頁面下載所需的spark版本,本次實驗選擇的saprk版本為2.4.6。運行如下命令下載spark:

$ cd /root
$ wget https://mirror.bit.edu.cn/apache/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
#

下載完成後,執行解壓操作:

$ tar -xf spark-2.4.6-bin-hadoop2.7.tgz
$ export SPARK_HOME=/root/spark-2.4.6-bin-hadoop2.7

spark docker鏡像是我們提交spark任務時使用到的鏡像,這個鏡像中需要包含alluxio client jar包。使用如下的命令獲取alluxio client jar包:

$ id=$(docker create alluxio/alluxio-enterprise:2.2.1-1.4)
$ docker cp $id:/opt/alluxio/client/alluxio-enterprise-2.2.1-1.4-client.jar \
    $SPARK_HOME/jars/alluxio-enterprise-2.2.1-1.4-client.jar
$ docker rm -v $id 1>/dev/null

alluxio client jar包準備好以後,開始構建鏡像:

$ docker build -t spark-alluxio:2.4.6 -f kubernetes/dockerfiles/spark/Dockerfile $SPARK_HOME

請記住鏡像名稱“spark-alluxio:2.4.6”,在向k8s提交spark job中會用到這個信息。

鏡像構建完成以後,對鏡像的處理有兩種方式:

  • 如果有私有鏡像倉庫,將該鏡像推送到私有鏡像倉庫中,同時保證k8s集群節點能夠pull該鏡像
  • 如果沒有私有鏡像倉庫,那麼需要使用docker save命令將該鏡像導出,然後scp到k8s集群的各個節點,在每個節點上使用docker load命令將鏡像導入,這樣就能保證每個節點上都存在該鏡像。

3.4 上傳文件到alluxio

文章開頭提到過:本次實驗是提交一個spark job到k8s中,該spark job的目標是對某一個文件統計每一個單詞出現的次數。現在需要把這個文件傳到alluxio存儲上,這裡為了方便,直接把alluxio master中/opt/alluxio-2.3.0-SNAPSHOT/LICENSE(文件路徑可能因alluxio版本有點差異)這個文件傳到alluxio上。

使用"kubectl exec"進入alluxio master pod,並拷貝當前目錄下的LICENSE文件到alluxio的根目錄中:

$ kubectl exec -ti alluxio-master-0  -n alluxio bash
//下面步驟alluxio-master-0 pod中執行
bash-4.4# alluxio fs copyFromLocal LICENSE /

接著查看一下LICENSE這個文件分成的block被alluxio放到哪些worker上了。

$ kubectl exec -ti alluxio-master-0 -n alluxio bash
//下面步驟alluxio-master-0 pod中執行

bash-4.4# alluxio fs stat /LICENSE
/LICENSE is a file path.
FileInfo{fileId=33554431, fileIdentifier=null, name=LICENSE, path=/LICENSE, ufsPath=/opt/alluxio-2.3.0-SNAPSHOT/underFSStorage/LICENSE, length=27040, blockSizeBytes=67108864, creationTimeMs=1592381889733, completed=true, folder=false, pinned=false, pinnedlocation=[], cacheable=true, persisted=false, blockIds=[16777216], inMemoryPercentage=100, lastModificationTimesMs=1592381890390, ttl=-1, lastAccessTimesMs=1592381890390, ttlAction=DELETE, owner=root, group=root, mode=420, persistenceState=TO_BE_PERSISTED, mountPoint=false, replicationMax=-1, replicationMin=0, fileBlockInfos=[FileBlockInfo{blockInfo=BlockInfo{id=16777216, length=27040, locations=[BlockLocation{workerId=8217561227881498090, address=WorkerNetAddress{host=192.168.8.17, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=192.168.8.17, rack=null)}, tierAlias=MEM, mediumType=MEM}]}, offset=0, ufsLocations=[]}], mountId=1, inAlluxioPercentage=100, ufsFingerprint=, acl=user::rw-,group::r--,other::r--, defaultAcl=}
Containing the following blocks:
BlockInfo{id=16777216, length=27040, locations=[BlockLocation{workerId=8217561227881498090, address=WorkerNetAddress{host=192.168.8.17, containerHost=, rpcPort=29999, dataPort=29999, webPort=30000, domainSocketPath=, tieredIdentity=TieredIdentity(node=192.168.8.17, rack=null)}, tierAlias=MEM, mediumType=MEM}]}

可以看到LICENSE這個文件只有一個block(id為16777216),被放在了ip為192.168.8.17的k8s節點上。我們使用kubectl查看該節點名稱為cn-beijing.192.168.8.17

$ kubectl get nodes -o wide
NAME                      STATUS   ROLES    AGE   VERSION            INTERNAL-IP    EXTERNAL-IP   OS-IMAGE                               KERNEL-VERSION            CONTAINER-RUNTIME
cn-beijing.192.168.8.12   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.12   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.13   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.13   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.14   Ready    master   21d   v1.16.6-aliyun.1   192.168.8.14   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.15   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.15   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.16   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.16   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5
cn-beijing.192.168.8.17   Ready    <none>   21d   v1.16.6-aliyun.1   192.168.8.17   <none>        Aliyun Linux 2.1903 (Hunting Beagle)   4.19.57-15.1.al7.x86_64   docker://19.3.5

3.5 提交spark job

下面的步驟將提交一個spark job到k8s集群中,該job主要是計算alluxio中/LICENSE文件的每個單詞出現的次數。

在步驟3.4中我們獲取到LICENSE這個文件所包含的block都在節點cn-beijing.192.168.8.17上,此次實驗中,我們通過指定node selector讓spark driver和spark executor都運行在節點cn-beijing.192.168.8.17,驗證在關閉alluxio的short-circuit功能的情況下,spark executor和alluxio worker之間的通信是否通過網絡棧完成。

  • 說明:如果在開啟alluxio的short-circuit功能的情況下,並且spark executor與其所要訪問的文件(本次實驗為/LICENSE這個文件)的block在同一個k8s節點上,那麼spark executor中的alluxio client與該k8s節點上的alluxio worker之間的通信通過domain socket方式完成。

首先生成提交spark job的yaml文件:

$ export SPARK_ALLUXIO_IMAGE=<步驟3.3中製作的image,即spark-alluxio:2.4.6>
$ export ALLUXIO_MASTER="alluxio-master-0"
$ export TARGET_NODE=<步驟3.4獲取到的LICENSE文件的block存儲的節點,即cn-beijing.192.168.8.17>
$ cat > /tmp/spark-example.yaml <<- EOF
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-count-words
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: "$SPARK_ALLUXIO_IMAGE"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.JavaWordCount
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.6.jar"
  arguments:
    - alluxio://${ALLUXIO_MASTER}.alluxio:19998/LICENSE
  sparkVersion: "2.4.6"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 2.4.6
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
    nodeSelector:
      kubernetes.io/hostname: "$TARGET_NODE"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 2.4.6
    nodeSelector:
      kubernetes.io/hostname: "$TARGET_NODE"
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
EOF

然後,使用sparkctl提交spark job:

$ sparkctl create /tmp/spark-example.yaml

4.實驗結果

當提交任務後,使用kubectl查看spark driver的日誌:

$ kubectl get po -l spark-role=driver
NAME                                 READY   STATUS      RESTARTS   AGE
spark-alluxio-1592296972094-driver   0/1     Completed   0          4h33m

$ kubectl logs spark-alluxio-1592296972094-driver --tail 20

USE,: 3
Patents: 2
d): 1
comment: 1
executed: 1
replaced: 1
mechanical: 1
20/06/16 13:14:28 INFO SparkUI: Stopped Spark web UI at http://spark-alluxio-1592313250782-driver-svc.default.svc:4040
20/06/16 13:14:28 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
20/06/16 13:14:28 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
20/06/16 13:14:28 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed (this is expected if the application is shutting down.)
20/06/16 13:14:28 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
20/06/16 13:14:28 INFO MemoryStore: MemoryStore cleared
20/06/16 13:14:28 INFO BlockManager: BlockManager stopped
20/06/16 13:14:28 INFO BlockManagerMaster: BlockManagerMaster stopped
20/06/16 13:14:28 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
20/06/16 13:14:28 INFO SparkContext: Successfully stopped SparkContext
20/06/16 13:14:28 INFO ShutdownHookManager: Shutdown hook called
20/06/16 13:14:28 INFO ShutdownHookManager: Deleting directory /var/data/spark-2f619243-59b2-4258-ba5e-69b8491123a6/spark-3d70294a-291a-423a-b034-8fc779244f40
20/06/16 13:14:28 INFO ShutdownHookManager: Deleting directory /tmp/spark-054883b4-15d3-43ee-94c3-5810a8a6cdc7

最後我們登陸到alluxio master上,查看相關指標統計到的值:

$ kubectl exec -ti alluxio-master-0 -n alluxio bash
//下面步驟alluxio-master-0 pod中執行
bash-4.4# alluxio fsadmin report metrics
Cluster.BytesReadAlluxio  (Type: COUNTER, Value: 290.47KB)
Cluster.BytesReadAlluxioThroughput  (Type: GAUGE, Value: 22.34KB/MIN)
Cluster.BytesReadDomain  (Type: COUNTER, Value: 0B)
Cluster.BytesReadDomainThroughput  (Type: GAUGE, Value: 0B/MIN)

BytesReadAlluxio和BytesReadAlluxioThroughput代表數據從網絡棧傳輸;BytesReadDomain和BytesReadDomainThroughput代表數據從domain socket傳輸。可以看到所有數據都是從網絡棧傳輸的(即使spark executor和LICENSE文件的block在同一k8s節點上)。

5.參考文檔

Leave a Reply

Your email address will not be published. Required fields are marked *