I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:
- Task 1 executes
- If Task 1 succeed, then execute Task 2a
- Else If Task 1 fails, then execute Task 2b
- Finally execute Task 3

All tasks above are SSHExecuteOperator.
I’m guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?
Answers:
Thank you for visiting the Q&A section on Magenaut. Please note that all the answers may not help you solve the issue immediately. So please treat them as advisements. If you found the post helpful (or not), leave a comment & I’ll get back to you as soon as possible.
Method 1
Airflow has a BranchPythonOperator that can be used to express the branching dependency more directly.
The docs describe its use:
The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id. The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.
…
If you want to skip some tasks, keep in mind that you can’t have an empty path, if so make a dummy task.

Code Example
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
EDIT:
If you’re installing an Airflow version >=1.10.3, you can also return a list of task ids, allowing you to skip multiple downstream paths in a single Operator and don’t use a dummy task before joining.

Method 2
You have to use airflow trigger rules
All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.
The trigger rule possibilities:
ALL_SUCCESS = 'all_success' ALL_FAILED = 'all_failed' ALL_DONE = 'all_done' ONE_SUCCESS = 'one_success' ONE_FAILED = 'one_failed' DUMMY = 'dummy'
Here is the idea to solve your problem:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
All methods was sourced from stackoverflow.com or stackexchange.com, is licensed under cc by-sa 2.5, cc by-sa 3.0 and cc by-sa 4.0