Skip to content

Creating a master orchestrator to handle complex Google Cloud Composer jobs

Apache Airflow is a great way to orchestrate jobs of various kinds on Google Cloud. One can interact with BigQuery, start Apache Beam jobs and move documents around in Google Cloud Storage, just to name a few. Generally, a single Airflow job is written in a single Python document. This is convenient because all orchestration logic is in one place, while the purpose of the document stays clear because it only caters to one specific sequence of events. This approach works well in situations where tasks are simple and independent from each other. However, we need a different approach when different Airflow sequences become more complex and dependent on each other’s success.

Starting-Small-with-Google-BigQuery-On-demand-webinar

 

The challenge: The interdependence of Beam jobs

Suppose we want to write data to a database using Apache Beam. This task can easily be orchestrated using Airflow with the Beam operator. But in some cases it may be necessary to have data from one Beam job available in the database before another Beam job starts. For instance, if a certain table needs to look up values in another table, data needs to be available before starting the Beam job.

In this case Beam jobs are dependent on each other. One Beam job can only run after another has completely finished, and some Beam jobs can’t run at all if one fails. Apart from Airflow DAGs triggering the individual Beam jobs, we need a separate DAG to govern the sequence in which these individual triggers are going to be run. This is where the master controller comes in.

General outline: Making a master controller

This post will outline how to make a master controller in the situation of using Airflow to trigger Beam jobs for ETL tasks. For illustration purposes we will use Airflow DAGs A and B which are used to trigger individual Beam jobs. Furthermore, we will use a master controller DAG that will govern the sequence of events. From now on, the master_controller will be referred to as M.

When working with only one independent Airflow job we are only interacting with Apache Beam. However, when using a master controller we need to interact between different Airflow DAG files. One way of doing is, is using the TriggerDagRunOperator.

Triggering external DAGs

As the name suggests, the TriggerDagRunOperator is used to trigger other DAGs. Defining the trigger DAG is pretty straightforward. Below a code snippet is shown that is used to trigger an external DAG.

def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj

The code snippet above is a function that can be used for fine grained control over whether or not to trigger an external DAG. For this example the conditional trigger is set to always be True.

trigger_external_dag = TriggerDagRunOperator(
        task_id='trigger-external-dag',
        trigger_dag_id='external-dag-id',
        python_callable=conditionally_trigger,
        params={'condition_param': True, 'message': 'Running GCS Sensor'},
        dag=dag,
    )

As can be seen in the snippet above, the only two things of importance are the trigger_dag_id and the python_callable (conditional trigger). As mentioned earlier, the conditional trigger is set to True, as can be seen from the code ‘params={‘condition_param’: True…}. The trigger_dag_id is simply the id of the external DAG you want to trigger.

As a side note: code such as the 'condition_param' options works through the airflow context,
which acts as a sort of python library that stores the context of the current airflow job.

Now that we know how to trigger external DAGs, it is tempting to just place all the external DAGs you want to run in a sequence of TriggerDagRunOperator definitions in the M DAG, and orchestrate them like:

trigger_external_dag_A >> trigger_external_dag_B >> etc.

However, the problem with this is, despite the fact that trigger_external_dag_B is dependent on trigger_external_dag_A in this example, trigger_external_dag_A will have a ‘success’ status as soon as it has triggered dag_A. This means that the master_controller will not wait for dag_A to finish before starting trigger_external_dag_B.

Our scenario requires DAGs to run in sequence, B starting only after full completion of A. Thus, we need a way to make trigger_external_dag_A wait for dag_A to finish the entire Beam job.

Waiting for external tasks to complete

One way of signalling task completion between DAGs can be to use sensors. For instance, the GoogleCloudStorageObjectSensor can be used to sense whether a specific object is present in a specific folder in GCS. We can use this approach to signal task completion between DAGs in the following way.

Master controller (M) triggers dag_A. In turn, dag_A runs its Beam_trigger. Once this Beam job is done, dag_A runs a GoogleCloudStorageToGoogleCloudStorageOperator. This operator can transfer a file from one folder to another. In GCS, we will have specified two folders to store our sensor files. One can be called the inbox and the other the outbox. The inbox contains sensor text files for each DAG to run. Upon completion of all tasks in dag_A, the DAG will transfer its sensor text file from the inbox to the outbox. While dag_A is running, M has kicked off a GoogleCloudStorageObjectSensor specified to sense files coming in to the GCS outbox folder. In this case, specifically the sensor file for dag_A.

Now we can orchestrate the sequence of events in M like this:

trigger_external_dag_A >> sense_dag_A_completion >>  trigger_external_dag_B

The GoogleCloudStorageObjectSensor code (in M) might look something like this:

sense_dag_A_completion = GoogleCloudStorageObjectSensor(
task_id='sense-dag-a-completion',
bucket=bucket_containing_outbox,
object='outbox/dag_A_sensor.txt',
google_cloud_storage_conn_id='google_cloud_default'
)

The GoogleCloudStorageToGoogleCloudStorageOperator code (in dag_A) might look like this:

send_dag_A_sensor_file = GoogleCloudStorageToGoogleCloudStorageOperator(
task_id='send-dag-a-sensor-file',
source_bucket=bucket_containing_inbox_folder,
source_object='inbox/dag_A_sensor.txt',
destination_bucket=bucket_containing_outbox,
destination_object='outbox/dag_A_sensor.txt',
move_object=True,
google_cloud_storage_conn_id='google_cloud_default'
)

For automation purposes one might decide to add a ‘return_dag_A_sensor_file’ GoogleCloudStorageToGoogleCloudStorageOperator at the end of M to make sure the sensor file is back in the inbox for the next airflow run.

Our entire design will look like this:

google-cloud-platform-master-controller-orchestration

In conclusion, for complex orchestration jobs in Google Cloud Composer a master_controller can be used to govern tasks that are dependent on each other. The master_controller will contain logic regarding the sequence of running separate Beam trigger DAGs, and will decide which tasks should be completed before starting the next DAG.

Next: Creating complex orchestrations with Google Cloud Composer using sub_controllers