Save processed datetime task_save_datetime = PythonOperator( Process current datetime task_process_datetime = PythonOperator( Get current datetime task_get_datetime = BashOperator( to_csv(csv_path, index = False, mode =df_mode, header =df_header) get( 'first_dag_csv_path')ĭf_header = True df_mode = 'w' df. Raise Exception( 'No processed datetime value.')Ĭsv_path = Variable. If the file doesn’t exist, we’ll create it using the write mode and we’ll also include the header:ĭt = ti. In plain English, the file exists so we want to append new rows to it without adding the header row every time. If that file exists, we’ll set the parameters df_header and df_mode to False and 'a', respectively. We can then use Variable class from Airflow to get the path for the CSV file. It uses xcoms to pull the processed datetime from the previous task, and then creates a Pandas DataFrame based on it. The third task will run a Python function called save_datetime(). Image 8 - Creating a new Airflow variable (2) (image by author) You’ll see our variable added to the list: Of course, you should specify the path on your machine, but that goes without saying. Image 7 - Creating a new Airflow variable (image by author) I’ve named mine first_dag_csv_path and entered /Users/dradecic/Desktop/datetimes.csv as the value: We’re declaring it to hold a path to a location where we’ll save the CSV file. Image 6 - Airflow variable list (image by author)Ĭlick on the plus sign to add a new variable. You should see a blank list like the one below: While on the Airflow home page, go under Admin - Variables. You’ll see how connections and dependencies work later, but first, let’s write the code for the third task.Īirflow Task #3 - Save Processed Datetime The tasks are listed here but aren’t connected in any way. Image 5 - Airflow DAG graph view (image by author) Image 4 - Airflow web application home page (image by author)Ĭlick on it and go to the Graph view - you’ll see our two tasks listed: Open Airflow home page - and see if your DAG appears in the list: Image 3 - Testing the second Airflow task (image by author) The bash_command argument allows you to specify the shell command that’ll be executed:Īirflow tasks test first_airflow_dag process_datetime Each task in Airflow needs an ID, which must be unique across the DAG level. We’ll use Airflow’s BashOperator to execute a shell command. catchup - Boolean, whether or not Airflow should catch up for every scheduled interval between start_date and now.Īnd with that out of the way, we can proceed with writing our first task.start_date - The date at which your DAG will first run, I’ve set it in the past.For example, * * * * * means the DAG will run every minute. You can pass the strings or a cron-like expression. schedule_interval - Specifies the interval at which your DAG should run.dag_id - A unique ID that will represent the DAG inside the Airflow web application.We’ve made a lot of imports, and these are the modules and operators we’ll use throughout the file.Įvery Airflow DAG is defined with Python’s context manager syntax ( with). Start_date =datetime(year = 2022, month = 2, day = 1), From import BashOperatorįrom import PythonOperator
0 Comments
Leave a Reply. |
AuthorWrite something about yourself. No need to be fancy, just an overview. ArchivesCategories |