Apache Airflow is an open-source platform to run any type of workflow, it using the Python programming language to define the pipelines.
When to use Apache Airflow?
- When process is stable and once deployed, is expected to differ from time to time (weeks rather than hours or minutes)
- related to the time interval
- scheduled on time
Apache Airflow can be used to schedule:
- ETL pipelines that extract data from multiple sources
- Training machine learning models
- Report generation
- Backups and similar DevOps operations
Airflow DAG
Workflows are defined in Airflow by DAGs (Directed Acyclic Graphs) and are nothing more than a python file.
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperatorwith DAG(
"etl_sales_daily",
start_date=days_ago(1),
schedule_interval=None,
) as dag:
task_a = DummyOperator(task_id="task_a")
task_b = DummyOperator(task_id="task_b")
task_c = DummyOperator(task_id="task_c")
task_d = DummyOperator(task_id="task_d") task_a >> [task_b, task_c]
task_c >> task_d
Every task in a Airflow DAG has its own task_id that has to be unique within a DAG. Each task has a set of dependencies that define its relationships to other tasks. These include:
Upstream tasks — a set of tasks that will be executed before this particular task.
Downstream tasks — set of tasks that will be executed after this task.
In our example task_b and task_c are downstream of task_a. And respectively task_a is in upstream of both task_b and task_c.
A common way of specifying a relation between tasks is using the >> operator which works for tasks and collection of tasks
Trigger Rule
each task can specify trigger_rule which allows users to make the relations between tasks even more complex. Examples of trigger rules are:
all_success—meaning that all tasks in upstream of a task have to succeed before Airflow attempts to execute this task
one_success— one succeeded task in upstream is enough to trigger a task with this rule
none_failed— each task in upstream has to either succeed or be skipped, no failed tasks are allowed to trigger this task
Complete DAG sample
from PythonProcessVideo.StartProcess import StartProcess from PythonProcessVideo.StartProcess import StartSecondProcess from PythonProcessVideo.StartProcess import Start3rdProcess import airflow from airflow import DAG from airflow.operators import bash_operator from airflow.operators import python_operator from datetime import timedelta default_args = { 'start_date': airflow.utils.dates.days_ago(0), 'retries': 1, 'retry_delay': timedelta(minutes=5) } dag = DAG( 'PushVideosDAG', default_args=default_args, description='Push videos from firebase to Ziggeo', schedule_interval= timedelta(minutes=5), dagrun_timeout= timedelta(minutes=15) ) start_bash_notofication = bash_operator.BashOperator( task_id='echo', bash_command='echo Process Started', dag=dag, depends_on_past=False) start_python_task1 = python_operator.PythonOperator( task_id='pushVideos', dag=dag, depends_on_past=False, python_callable=StartProcess) start_python_task2 = python_operator.PythonOperator( task_id='DetectPersonFace', dag=dag, depends_on_past=False, python_callable=StartSecondProcess) start_python_task3 = python_operator.PythonOperator( task_id='pushPersonInfo', dag=dag, depends_on_past=False, python_callable=Start3rdProcess) start_bash_notofication >> start_python_task2 >> start_python_task3 >> start_python_task1
No comments:
Post a Comment