David Prat Portfolio 600
David Prat

I am Cloud Architect and Big Data Expert.

Table of Contents

Share it!

Spark cluster with Airflow on Kubernetes

Architecture diagram

In this tutorial Kubernetes will be used to create a Spark cluster from which parallel jobs will be launched. The launch of the jobs won’t be done directly through the master node of the Spark cluster but from another node running an instance of Airflow. This provides more control over the executed jobs as well as interesting features such as backfill execution. This consists in performing executions that corresponded to past time from the current time, when the the scheduling is defined.

The picture below shows roughly how the components are interconnected. For this example, a Pod for each service is defined. Inside the spark cluster, one Pod for a master node, and then one Pod for a worker node. However, the yaml will be configured to use a Daemonset instead of a Deployment. This tells to Kubernetes to allocate exactly one Pod for each node in our cluster. A Nodeport service is defined to get access from the outside to the Spark cluster. Additionally, a second Nodeport is defined to get access to the Airflow Pod. Each Pod will be controlled by a Replicaset that will be responsible of keeping that Pod always up and running.

Introducing Airflow

Airflow was developed internally at Airbnb as a tool to programmatically schedule and monitor workflows. Its success is given to the fact that more and more technologies are appearing in the area of Big Data and the old way of having everything inside an application to perform an ETL such as Pentaho or executing some batch jobs though cron is getting obsolete. To meet this necessities, Airflow consists in a very powerful server and scheduler that offers an Python API to define what is called executors through which the programmer can specify tasks and how will they be executed in the form of a DAG (directed acyclic graph). Airflow web UI looks like the picture below. In the picture, each row represents a DAG. That is to say, a workflow where each node of the graph is a task that will be executed in order according to the graph node edges.The scheduler accomplishes what cron tool makes in Linux. It is very easy to enable and disable each DAG by making click to the matching switch. There is also in each row a series of circles that act as a semaphore with a colour code indicating the state of each phase in the tasks of the DAG. For example if running phase went well, one of the circles will be coloured green.

Infrastructure definition

Having had a look to the architecture diagram and making a very brief introduction to Airflow, let’s code the yaml file that will define the whole infrastructure. In this case, all the infrastructure has been defined in a single file. This can be done by using “—” to separate each of the Kubernetes components. To begin with, the namespace for the whole infrastructure is set to “spark-clu”.

Next is to define the service as a Nodeport for the Spark master Pod. This will allow to connect to the master to for example see the Spark jobs execution through its web UI. Then a deployment for the Spark master node is define and a Daemonset is defined to deploy the worker nodes. Remember here, that the daemonset will create exactly one Pod for each node of the cluster. This is an easy way to create clusters in Kubernetes that balance the workload across the available hardware resources.

apiVersion: v1
kind: Namespace
metadata:
  name: spark-clu
--- 
apiVersion: v1
kind: Service
metadata:
  name: spark-cluster
  namespace: spark-clu
spec:
  selector:
    app: spark-master
  ports:
  - name: webui
    protocol: TCP
    port: 8080
    targetPort: 8080
    nodePort: 30000
  - name: master
    protocol: TCP
    port: 7077
    targetPort: 7077
  - name: master-rest
    protocol: TCP
    port: 6066
    targetPort: 6066
  type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-master
  namespace: spark-clu
  labels:
    app: spark-master
spec:
  selector:
    matchLabels:
      app: spark-master
  template:
    metadata:
      labels:
        app: spark-master
    spec:
      containers:
      - name: spark-master
        image: bde2020/spark-master:2.4.4-hadoop2.7
        imagePullPolicy: Always
        ports:
        - containerPort: 8080
        - containerPort: 7077
        - containerPort: 6066
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: spark-worker
  namespace: spark-clu
  labels:
    app: spark-worker
spec:
  selector:
    matchLabels:
      name: spark-worker
  template:
    metadata:
      labels:
        name: spark-worker
    spec:
      containers:
      - name: spark-worker
        image: bde2020/spark-worker:2.4.4-hadoop2.7
        imagePullPolicy: Always
        ports:
        - containerPort: 8081       
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: spark-clu
  labels:
    app: airflow
spec:
  selector:
    matchLabels:
      app: airflow
  template:
    metadata:
      labels:
        app: airflow
    spec:
      containers:
      - name: airflow-c
        image: greattenchu/centos-ariflowforcluster:1.0
        imagePullPolicy: Always
        ports:
        - containerPort: 8080
        volumeMounts:
        - mountPath: /root/airflow/dags
          name: volume
      volumes:
        - name: volume
          hostPath:
            path: /.../.../airflow-spark/dags
--- 
apiVersion: v1
kind: Service
metadata:
  name: airflow-instance
  namespace: spark-clu
spec:
  selector:
    app: airflow
  ports:
  - name: webui
    protocol: TCP
    port: 8080
    targetPort: 8080
    nodePort: 30001
  type: NodePort

Now it is turn to define the Airflow instance, also through a Deployment. Notice how the namespace is the same as for the Spark cluster. This allows to connect using machine names from Airflow to Spark master node for example. For the Airflow container a volume will be mount. This way, DAGs can be programmed locally in the shared folder and then, after some seconds, they will appear in the Airflow web UI. Finally, the Nodeport for the Airflow Pod is defined. Take notice of the Nodeport selector here; it is pointing to the airflow app instead of the spark-master.

Airflow custom Docker image

For this tutorial a custom Docker image of Airflow has been developed. There are some already prepared images on the Internet that contain an Airflow installation inside a very lightweight Linux distribution. For example the Puckel build. However, to execute Scala Jobs SSH operator will be required and for this reason a custom build is done. In this case, we start from a Linux CentOS image and make a basic installation of Apache Airflow. The Dockerfile is included below:

FROM centos:8

RUN yum update
RUN yum -y install gcc
RUN yum -y install platform-python-devel.x86_64
RUN yum -y install python3-pip
RUN export AIRFLOW_HOME=~/airflow
RUN pip3 install apache-airflow
RUN pip3 install apache-airflow[ssh]
RUN airflow initdb
RUN yum -y install tree vim
RUN yum install -y epel-release
RUN yum -y update
RUN yum install -y supervisor
RUN airflow initdb

COPY supervisord.conf /etc/supervisord.conf
COPY airflow_wrapper_script.sh airflow_wrapper_script.sh
RUN chmod 777 airflow_wrapper_script.sh

EXPOSE 8080
CMD ["./airflow_wrapper_script.sh"]

The image is basically an Airflow installation inside a CentOS 8 Linux. First the dependencies of Airflow are installed, then the ssh module for Airflow is installed. This will allow to use the ssh operator in Airflow, what will enable to launch any command from Spark. In this case, the spark-submit command. Vim and tree are also included as auxiliary tools but they would be not needed.

Airflow has two processes that should be run in order to use it with all its functionalities. These are the the web server UI and the scheduler. The former one is in charge of providing the user interface that has been shown previously, whereas the later one is the schedules the DAGs executions. Because the best practices for Docker includes only executing one command per container, executing two commands can be achieved in different ways. For example: the first attempt was to use supervisor, which is a process manager. This gives a lot of power to the programmer to launch multiple process. However, its utilisation results to be CPU intensive for the container. For this reason, in this tutorial has opted to use a script that runs the two processes. The code is shown below:

#!/bin/bash

# turn on bash's job control
set -m

# Start the primary process and put it in the background
airflow webserver -p 8080 &

# Start the helper process
airflow scheduler
# the my_helper_process might need to know how to wait on the
# primary process to start before it does its work and returns

# now we bring the primary process back into the foreground
# and leave it there
fg %1
Configuring Spark nodes

Until here the tutorial has explained a basic configuration to have the cluster set in Kubernetes. However, to actually be able to order execute code from Airflow there are some tuning that has to be done. Let’s first connect to the Spark master container and install the ssh server. If not Airflow’s ssh operator won’t be able to connect to the Spark master node.

apk add openssh
apk add openrc # for next commands
rc-update add sshd
rc-status
touch /run/openrc/softlevel 
/etc/init.d/sshd start
apk add vim
vim /etc/ssh/sshd_config
PermitRootLogin yes
/etc/init.d/sshd restart

Now let’s set the environment to be able to compile Scala code. In this case, it is assumed that the container has enough resources to compile. However, the same can be done in another machine, just to compile the code and then upload the .jar into the master node. The commands below will install the SBT compiler into the Alpine Linux distribution of the master node. Remember that Alpine Linux is a very lightweight distribution. This is the reason why some additional steps and packets are being install.

apk add gzip
export PATH="/usr/local/sbt/bin:$PATH" && apk update && apk add ca-certificates wget tar && mkdir -p "/usr/local/sbt" && wget -qO - --no-check-certificate "https://piccolo.link/sbt-0.13.16.tgz" | tar xz -f sbt-0.13.16.tgz && mv sbt /usr/local/ && sbt sbtVersion
/usr/local/sbt/bin/sbt
Building the Scala project

It’s turn to build the Scala project and run it into the cluster. This tutorial is not aimed to explain into detail Scala and how to build a project. Anyway, the following folders structure can be taken as an example for your tests.

/test
	/project
	/src
		/main
			/scala
				hiworld.scala
	/target

The file that specifies the dependencies can be defined like the code below. Spark core and Spark SQL are included for the test. Define a file called helloworld.sbt and put it into the root directory of the project.

name := "helloworld"
version := "1.0"
scalaVersion := "2.11.12"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0"

As an example of code you can take the code of below. A Spark session is created to then run a word counter over a text file. It is very important to notice here that while setting the Spark configuration of the Spark session, the master address was provided. The way to run the cluster is as standalone cluster. This is how the docker images are configured, and this tutorial doesn’t go into configuring Yarn or Mesos cluster.

import org.apache.spark._
import org.apache.spark.sql.SparkSession

object helloworld {
  def main(args: Array[String]) {   
    val appID: String = "spark1"
    val conf: SparkConf = new SparkConf().setMaster("spark://spark-master-5fbb9c7b66-dn7cj:7077").setAppName("hiworld").set("spark.ui.enabled", "false").set("spark.driver.host", "localhost").set("spark.app.id", appID)
    val spark = SparkSession.builder().config(conf).getOrCreate()  
    println("Hello, world")
    
    import spark.implicits._
    val wc=spark.read.text("/helloworld/shakespeare.txt").as[String]
    val wordmap = wc.flatMap(value => value.split("\\s+"))
    val WordGroup = wordmap.groupByKey(_.toLowerCase())
    println("Word Count—->"+WordGroup.count().show())
  }
}

To compile the code execute the build command. This command not only compiles the code but also generates the .jar that can be submit to the Spark cluster, in this case, using the spark-submit command.

/usr/local/sbt/bin/sbt package

/spark/bin/spark-submit --class "helloworld" --master=spark://spark-master:7077 ./target/scala-2.11/helloworld_2.11-1.0.jar

Before executing the spark-submit the Spark cluster has to be initialised. The Spark Docker images from bde2020 don’t come built to configure themselves once they are running. For this reason it is needed to configure them manually with the commands of below. Bear in mind that the important parameter is the IP of the master node.

#Inside of the Workernode
apk add procps coreutils rsync && /spark/sbin/start-slave.sh spark://10.1.0.49:7077

Now checking out the Spark Web UI the alive workers section should show 1 worker alive. If some applications were running in the cluster, the running applications section would show the application ID of them. Notice that it is also possible to see the worker of the cluster and get information of them, but only if they are registered with the previous command.

Deploying the DAG in Airflow

Setting up the DAG so Airflow executes the command on our behalf is done with the SSH operator as this allows to execute any command that can be launched from ssh, which is what we need in this case, to execute the spark-submit command over a scala Spark code.

from datetime import timedelta, datetime
import airflow
from airflow import DAG
from airflow.contrib.operators.ssh_operator import SSHOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'start_date': datetime.now() - timedelta(minutes=20),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}
dag = DAG(dag_id='sparkscalasshoperatordag',
          default_args=default_args,
          schedule_interval='0,10,20,30,40,50 * * * *',
          dagrun_timeout=timedelta(seconds=120))

t1_bash = """
/spark/bin/spark-submit --class "helloworld" --master=spark://spark-master-5fbb9c7b66-dn7cj:7077 --deploy-mode client --conf spark.driver.host=spark-client /helloworld/target/scala-2.11/helloworld_2.11-1.0.jar
"""
t1 = SSHOperator(
    ssh_conn_id='ssh_default',
    task_id='sparkscala_ssh_operator',
    command=t1_bash,
    dag=dag)

Once the DAG is uploaded to the shared folder with the container of the Airflow, it is only necessary to make click on the correspoding DAG switch and Airflow’s scheduler will execute the DAG according to the schedule interval. In this case every 10 minutes.

Troubleshooting

When running the Spark cluster, depending on the RAM capacity of your underlying hardware, it can happen that little RAM is assigned by default to the containers in comparison to what Spark processes need. In this case, the containers can be shut down by Kubernetes. This will show up an error of “command terminated with exit code 137″, which indicates an OOM problem. To solve this issue this post can be check out to perform the tuning in the Java of the node. Alternatively, more memory can be allocated to the containers. If running Docker on Mac, more resources for the cluster can be requested from the control panel. In any case, the resources for the containers can be set within the yaml file. To have more information of this concern the following link contains examples.

Conclusions

The tutorial has explained how to set up the Spark cluster and how to run the jobs from Airflow with a timing. It is clear to see that Kubernetes allows to test system without having to reserve or dedicate hardware to the purpose and this facilitates and makes much more comfortable trying new things, ways of working, new features, different configurations, etc. On the other hand, some drawbacks appear when using Kubernetes. For example: images are mostly based in lightweight version of OS, which forces additional configurations when something a bit different needs to be done. Networking configuration files also need attention from the programmer because the cluster to be configured is different in each case. However, the overall impression when using Kubernetes is very positive. In this case it has made much faster setting up the system but in a corporate environment Kubernetes provides a tidied way of make deployments and a lot of control of the deployed infrastructure.