Spark ETL – Airflow on Kubernetes, Part 2
Introduction
Let’s talk about pitfalls and inconveniences of airflow! It is the following part of this article
Pitfalls and inconveniences
Keep your airflow dag simple!
It is based on my own experience and applies with our airflow setup:
- Airflow on kubernetes with local executor mode
- Tasks are run with KubernetesPodOperator.
Following are problems I have encountered, there are many more, Those are what mark me the most:
- Dag’s
dagrun_timeout
and operator’sexecution_timeout
parameter don’t apply, because KubernetesPodOperator does not implementon_kill
method. - Backfill disregards Dag’s
max_active_runs
parameter on retrigger, it is still not fixed upto the time this article has been written, check here - A Dag can be triggered with parameters this way :
airflow trigger_dag 'example_dag' -r 'run_id' --conf '{"key":"value"}'
it does not work when you retrigger backfill. e.g, backfill on dayx
with--conf '{"key":"value1"}'
rerun backfill on dayx
with--conf '{"key":"value2"}'
thekey
keepsvalue1
- Can’t configure kubernetes ephemeral storage resource with KubernetesPodOperator
- Dag run history has not been clearly properly. During dag development, I adjust dag or tasks parameters frequently, it messes up with the scheduler database, some dag run can’t be marked as failed or cleared, even after deleting and redeploying dag, problems still persist. I need to run sql clean query against airflow instance database to solve problems
- Kubernetes attached volume name is not a templated field. Spark job use large disk space(when memory is not enough). thus to attach volume to the kubernetes pod. it is problematic for parallel backfilling. e.g we want to reprocess(backfill) in parallel from day
01-01-2020
to04-01-2020
, I create volumes with name :"spark-pv-claim-{{ ds_nodash }}"
, it results to 3 volumes :"spark-pv-claim-20200101"
,"spark-pv-claim-20200103"
,"spark-pv-claim-20200103"
since we can’t configure persistentVolumeClaim name as a templated field with KubernetesPodOperator you can not dynamically attach volumes.
Solutions and workaround for upon problems:
- Use bash timeout command, configure KubernetesPodOperator with
"cmds": ["timeout", "45m", "/bin/sh", "-c"],
gives you 45 minutes timeout of spark task. - Make dag executions independent, no dependencies to the past, as the
max_active_runs
problem only happens for backfilling, clear dag history before retrigger. - Use airflow variables instead. e.g we have a
force_reprocess
in the dag, set variable before backfillairflow variables --set force_reprocess false;
backfill withairflow backfill "historical_reprocess" --start_date "y" --end_date "y" --reset_dagruns -y;
in the dag I get the variable value withforce_reprocess = Variable.get("force_reprocess", default_var=True)
you need to remember to delete variable after backfill. - Airflow version from 1.10.11 will have ephemeral storage support.
- run sql clean query against the airflow database, you can find queries in this post
- This feature is crucial for us, so I extended KubernetesPodOperator to manage volume dynamically. I added a new templated field as
"pvc_name"
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.utils.decorators import apply_defaults
class KubernetesPodWithVolumeOperator(KubernetesPodOperator):
template_fields = (*KubernetesPodOperator.template_fields, "pvc_name")
@apply_defaults
def __init__(self, pvc_name=None, *args, **kwargs):
super(KubernetesPodWithVolumeOperator, self).__init__(*args, **kwargs)
self.pvc_name = pvc_name
sometime
def execute(self, context):
if self.pvc_name:
volume_mount = VolumeMount(
"spark-volume", mount_path="/tmp", sub_path=None, read_only=False
)
volume_config = {"persistentVolumeClaim": {"claimName": f"{self.pvc_name}"}}
volume = Volume(name="spark-volume", configs=volume_config)
self.volumes.append(volume)
self.volume_mounts.append(volume_mount)
super(KubernetesPodWithVolumeOperator, self).execute(context)
Usage of the custom operator:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
historical_process = KubernetesPodWithVolumeOperator(
namespace=os.environ['AIRFLOW__KUBERNETES__NAMESPACE'],
name="historical-process",
image=historical_process_image,
image_pull_policy="IfNotPresent",
cmds=["/bin/sh","-c"],
arguments=[spark_submit_sh],
env_vars=envs,
task_id="historical-process-1",
is_delete_operator_pod=True,
in_cluster=True,
hostnetwork=False,
#important env vars to run spark submit
pod_runtime_info_envs=pod_runtime_info_envs,
pvc_name="spark-pv-claim-{{ ds_nodash }}",
)
Conclusion
Despite of many inconveniences and pitfalls, our pipelines run fine so far with the Airflow scheduler. We achieved huge cost reduction by moving away from AWS Glue. Historical data reprocessing can be done in a single click with backfilling. Realtime data process and the historical data process are in the same code base, Data pipeline steps are now transparent with Dag definition in python.
I will in the future study other alternatives such as perfect core, kedro or dagster. Maybe there are better tools out there.