Friday, April 21, 2023

AirFlow

 Apache Airflow is an open-source platform to run any type of workflow, it using the Python programming language to define the pipelines. 


When to use Apache Airflow?

- When process is stable and once deployed, is expected to differ from time to time (weeks rather than hours or minutes)

- related to the time interval

- scheduled on time


Apache Airflow can be used to schedule:

- ETL pipelines that extract data from multiple sources

- Training machine learning models

- Report generation

- Backups and similar DevOps operations


Airflow DAG

Workflows are defined in Airflow by DAGs (Directed Acyclic Graphs) and are nothing more than a python file.

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.operators.dummy_operator import DummyOperator
with DAG(
"etl_sales_daily",
start_date=days_ago(1),
schedule_interval=None,
) as dag:
task_a = DummyOperator(task_id="task_a")
task_b = DummyOperator(task_id="task_b")
task_c = DummyOperator(task_id="task_c")
task_d = DummyOperator(task_id="task_d")
task_a >> [task_b, task_c]
task_c >> task_d





Every task in a Airflow DAG has its own task_id that has to be unique within a DAG. Each task has a set of dependencies that define its relationships to other tasks. These include:


Upstream tasks — a set of tasks that will be executed before this particular task.

Downstream tasks — set of tasks that will be executed after this task.

In our example task_b and task_c are downstream of task_a. And respectively task_a is in upstream of both task_b and task_c.

 A common way of specifying a relation between tasks is using the >> operator which works for tasks and collection of tasks



Trigger Rule

each task can specify trigger_rule which allows users to make the relations between tasks even more complex. Examples of trigger rules are:


all_success—meaning that all tasks in upstream of a task have to succeed before Airflow attempts to execute this task

one_success— one succeeded task in upstream is enough to trigger a task with this rule

none_failed— each task in upstream has to either succeed or be skipped, no failed tasks are allowed to trigger this task


Complete DAG sample

from PythonProcessVideo.StartProcess import StartProcess
from PythonProcessVideo.StartProcess import StartSecondProcess
from PythonProcessVideo.StartProcess import Start3rdProcess

import airflow
from airflow import DAG
from airflow.operators import bash_operator
from airflow.operators import python_operator
from datetime import timedelta


default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'PushVideosDAG',
    default_args=default_args,
    description='Push videos from firebase to Ziggeo',
    schedule_interval= timedelta(minutes=5),
    dagrun_timeout= timedelta(minutes=15)
    )

start_bash_notofication = bash_operator.BashOperator(
    task_id='echo',
    bash_command='echo Process Started',
    dag=dag,
    depends_on_past=False)

start_python_task1 = python_operator.PythonOperator(
        task_id='pushVideos',
        dag=dag,
        depends_on_past=False,
        python_callable=StartProcess)
        
start_python_task2 = python_operator.PythonOperator(
        task_id='DetectPersonFace',
        dag=dag,
        depends_on_past=False,
        python_callable=StartSecondProcess)

start_python_task3 = python_operator.PythonOperator(
        task_id='pushPersonInfo',
        dag=dag,
        depends_on_past=False,
        python_callable=Start3rdProcess)
        
start_bash_notofication >> start_python_task2 >> start_python_task3 >> start_python_task1



No comments: