Google Cloud Composer: Overcoming The Short-living Tasks Problem

Author’s Github and Twitter. Originally posted on Huq Industries tech blog.

Introduction

It is built on top of Apache Airflow and is a fully managed service that leverages other GCP products like Cloud SQL, GCS, Kubernetes Engine, Stackdriver, Cloud SQL and Identity Aware Proxies.

You don’t have to worry about provisioning and dev-ops, so you can focus on your core business logic and let Google take care of the rest. With Airflow it’s easy to create, schedule, and monitor pipelines that span both cloud and on-premises data centres. With a vast selection of “Operators”, you can define pipelines natively in Python with just a few lines of code, and connect to several cloud providers or on-premises instances. If you don’t find the operator that fits your needs, it’s incredibly easy to create your own and import it. If you want to learn more about Apache Airflow, refer to the official docs.

Running short-living tasks

echo task information from Airflow UI
# time echo my-awesome-string

real 0m0.000s
user 0m0.000s
sys 0m0.000s

As expected echo is blazing fast - and even trying to measure its execution time with the standard implementation of time does not cut it. We could recompile the kernel with CONFIG_HIGH_RES_TIMERS=y - but who has time for that! So to find out, let's use date with nanosecond precision (keep in mind that in this way the measure will be slightly overestimated).

# date +"%9N"; echo my-awesome-string ; date +"%9N"
571368370
my-awesome-string
573152179

This output is the decimal part of the date-stamp expressed in nanoseconds, so to get the elapsed time the equation is:

A more realistic, yet simple pipeline

Let’s apply these findings to a more real-world example. Let’s imagine we have a pipeline made of 4 tasks:

  1. Run a query on BigQuery using the BigQuery operator.
  2. Export the data to GCS.
  3. Compose the exported file into a single file.
  4. Copy the composed file to S3.
Sample Pipeline

In our world, this pipeline runs daily for each country (of which we cover almost 250), which results in 1,000 tasks per daily run. It takes just a few minutes to execute this operation via a simple bash script because each operation runs for just a few seconds. Using Airflow however, this pipeline takes about an hour to complete. When running a backfill job over 3 years of data across 3 different instances of this pipeline, we end up with more than 3 millions tasks to execute, and having so many tasks to manage at the same time creates an avalanche effect. This overwhelms the system and causes problems in getting tasks to schedule, queue, and run.

To achieve our aim while invoking fewer tasks, we accepted a reduction in the granularity of execution and parallelism but maintained the automation capabilities offered by Airflow. We were able to tune this developing an Airflow plugin called Chains, which can be found on the airflow-plugins repository. NB. If you have built a custom operator or plugins that you want to share, you can do that by submitting a PR to that repository.

The Chain Airflow plugin

This solution trades parallelism and granularity in favour of reducing the number of tasks bundling together multiple operations into a single Airflow task. Using these operators in our example pipeline, we were able to bundle the countries into groups and adjust the trade-off between parallelism and the total number of Airflow tasks. In our case, by using Chains and partitioning our countries into 7 groups, we were able to reduce the number of Airflow tasks by a factor of 35.

Conclusions

Of course, this is somewhat expected, as Airflow is not designed to be a real-time system and is really designed for managing larger and more complex pipelines.

Italian by birth but citizen of the world by choice Researched network measurement and security Opensource aficionado Juggle billions of events into the cloud