Automating Data Pipelines using Apache Airflow

Agam Kushwaha
7 min readFeb 25, 2022

--

Objective

There is a ride-hailing company which generates large scale of data daily. The company wants to design a system which can load the data in structured format and analyze through automated data pipelines. Our task is to enable an efficient OLAP (Online analytical processing) system on the trips and bookings data for the company. We need to design and schedule a data pipeline to generate the following insights:

  • Get the car types involved with the highest number of trips for each city
  • Get the throughput of the trips (Number of trips/Number of bookings) for each city

The data provided is in the form of the following two tables:

  • Booking
  • Trip

The schema for these tables can be seen below:

Bookings:

Trips:

Solution Approach:

  • Bring data from MySQL to HDFS via Sqoop
  • Create necessary directories in HDFS
  • Create Hive tables on the imported data
  • Construct partitions in the Hive table
  • Filter invalid records using Spark
  • Run the analysis to generate aggregated result using Spark
  • View the result

The DAG for the same can be seen below:

Prerequisites:

  • mysql.dump (MySQL queries for table creation and record )
  • etl_dag.py(the code for the DAG explained in the video)
  • filter_trip.py (Spark application)
  • generate_trip_throughput.py (Spark application)
  • filter_booking.py (Spark application)
  • generate_car_with_most_trips.py (Spark application)

Codes:

All the codes can be downloaded from here.

Creating Tables and inserting records:

We’ll use the file mysql.dump; this is the file that will create our tables and insert the records into them. It is available in the same folder.

DAG Construction:

There are multiple steps involved in the DAG. We’ll use the file etl_dag.py for this activity. Let’s describe the steps in two parts.

Part 1:

Below is the list which describes the tasks related to table extraction and location created in the DAG and their definition:

  • extract_booking_table: Extracts the booking table from MySql to HDFS using Sqoop
  • extract_trip_table: Extracts the trip table from MySql to HDFS using Sqoop
  • create_raw_booking_location: Creates a location /data/raw/booking using Bash Operator which stores the raw data (original data as it is) for booking table
  • create_filtered_booking_location: Creates a location /data/refined/booking using Bash Operator which stores the refined data for booking table
  • create_raw_trip_location: Creates a location /data/raw/trip using Bash Operator which stores the raw data (original data as it is) for trip table
  • create_filtered_trip_location: Creates a location /data/refined/trip using Bash Operator which stores the refined data for trip table
  • create_result_trip_throughput_location: Creates a location /data/output/trip_throughput using Bash Operator which stores the result of throughput
  • create_result_car_with_most_trips_location: Creates a location /data/output/car_with_most_trips using Bash Operator which stores the results of cars with most trips

Part 2:

  • create_hive_database: Creates a database events using Hive Operator
  • create_booking_raw_table: Creates a table booking_raw using Hive Operator
  • create_trip_raw_table: Creates a table trip_raw using Hive Operator
  • add_partitions_for_booking_table: Creates partition for booking_table on the booking_dt column
  • add_partitions_for_trip_table: Creates partition for trip_table on the trip_dt column
  • create_filter_booking_table: Creates a table booking for refined data using PARQUET format and location hdfs:///data/refined/booking
  • create_filter_trip_table: Creates a table trip for refined data using PARQUET format and location hdfs:///data/refined/trip
  • filter_booking_table: Filters invalid records from booking_table using SparkSubmitOperator
  • filter_trip_table: Filters invalid records from trip_table using SparkSubmitOperator
  • create_car_with_most_trips_table: Creates table for KPI with PARQUET format and location hdfs:///data/output/car_with_most_trips
  • generate_car_with_most_trips: Generates result for cars with most number of trips
  • create_trip_throughput_table: Creates table for KPI with PARQUET format and location hdfs:///data/output/trip_throughput
  • generate_trip_throughput: Generates result for cars with throughputs

Spark Applications

Spark application is used to filter booking table, trip table, generate trip throughput and trip frequency. We’ll use below 4 files available here:

  • filter_booking.py
  • filter_trip.py
  • generate_trip_throughput.py
  • generate_car_with_most_trips.py

Setting Task Dependencies

Now, we will work on task dependencies among the tasks that are defined in our DAG. Below is the glimpse of the dependencies:

Last part of the file etl_dag.py contains all the task dependencies.

Running DAG

1. Login to your EC2 instance.

2. Activate the Python virtual environment using the following command:

source /home/ec2-user/airflow/bin/activate

3. We’ll place the mysql.dump file in /tmp folder in EC2 machine using WinSCP tool.

4. Run the command to execute the SQL commands in /tmp/mysql.dump and create our tables :

mysql -u root p<password> /tmp/mysql.dump

5. Next, we need to set up the different connections using the Airflow UI. Create/Edit(if they already exist) the following connections:

Sqoop:

  • Conn Id: sqoop_default
  • Conn Type: Jdbc Connection (Select from the drop-down )
  • Connection URL: jdbc:mysql://localhost/events
  • Login: root
  • Password: ***

Hive:

  • Conn Id: hiveserver2_default
  • Conn Type: Hive Server 2 Thrift (Select from the drop-down )
  • Host: localhost
  • Schema: default
  • Login: root
  • Password: ***
  • Port: 10000

Spark:

  • Conn Id: spark_default
  • Conn Type: Spark (Select from the drop-down )
  • Host: localhost

6. Create a directory called ride in the airflow_codes using the following command:

mkdir /home/ec2-user/airflow_codes/ride 

7. Now place the following file inside the ride directory we just created:

  • filter_trip.py
  • generate_trip_throughput.py
  • filter_booking.py
  • generate_car_with_most_trips.py

8. Now we need to place the etl_dag.py file in the /home/ec2-user/airflow/dags directory. (We can use WinSCP or create a new file and paste the code in that file).

9. To ensure that the file there are no issues/errors with the file is it considered good practice to compile the program using the following command:

python etl_dag.py

10. We can also use the following command to list the dags in our instance:

airflow list_dags

11. Once we have made sure that your dag file has no issues, we can go to the Airflow UI which is hosted in the URL: your_public_ip:8080/admin/

Note: You can find your_publin_ip in your AWS EC2 dashboard (IPv4 Public IP)

12. In case we are re-running this DAG, we will have to delete the target_dir of the sqoop task to avoid error. Also, clear the task/DAG before re-running it.

13. Switch ON the DAG (etl_dag) by hitting the button as mentioned in the below screenshot:

Note: The DAG might take a while to show up on the UI. Keep refreshing and wait Patiently.

14. Click on the etl_dag and go to the graph view. We will find the task is running. If it is not reflecting, wait for some time.

15. Keep refreshing the page, we’ll find that eventually, all tasks will have successfully completed.

16. Once the DAG execution gets completed, the output will be generated in the tables trip_troughput and car_with_most_trips inside the events database.

17. At the last, we can switch off your DAG if we don’t want it to run anymore.

Result

We can view the results in the CLI by following the steps below:

Command for trip_throughtput:

select * from events.trip_throughput;

As per the above screen, we can see below observations:

  • On 2020–09–16, Mumbai has 75%, Chennai 80% and Bangalore 80% of trip throughput.
  • On 2020–09–17, Mumbai has 75%, Chennai 80% and Bangalore 80% of trip throughput.

Command for car_with_most_trips:

select * from events.car_with_most_trips;

As per the above screen, we can see below observations:

  • On 2020–09–16, Economy car has 2 trips in Mumbai, Sedan has 3 trips in Chennai and Sedan car has 3 trips in Bangalore.
  • On 2020–09–17, Sedan car has 2 trips, Economy has 2 trips in Chennai, Economy has 2 trips, Sedan has 2 trips in Bangalore and Sedan car has 2 trips in Mumbai.

--

--