Photo by Dennis Kummer on Unsplash.
Over the past few weeks, we have discussed several important topics in the world of data engineering and automation.
We have laid the groundwork for understanding the vocabulary and basic concepts a data engineer uses. Now it is time to start building your first set of batch jobs.
To do so, we will be using the Apache Airflow library to help automate our work.
Also, for our live data source, we will be using the sfgov 311 dataset. You can pull this information at any time and get the most up-to-date data about 311 reports. This usually involves stuff like vandalism, parking violations, etc.
For this pipeline, we will first be extracting the data into a raw CSV and then loading it into a MySQL database.
In this article, we will outline the code required to do so as well as point out some reasons for the various steps we are taking.
Creating a JSON Extract Function
Before getting started, you need to set up an Airflow environment to be able to follow along with each of the steps discussed in this article. If you haven't already done that, we find this article to be one of our personal favorites.
When creating data pipelines, especially those that are more batch-oriented, it is beneficial to extract data into a raw data layer. Doing this allows you to have a backup of the raw data.
When there is an error in the data, having a raw file can help you figure out if the data error is in the source or your overall process.
In our case, we will be pulling data from a JSON dataset that is online. We can do this using a pandas function called [read_json](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_json.html)
. This can either read a file or URL.
We will do this by making a function you can call that will extract JSON-based data from either a URL or file.
We can also have a timestamp to the file name to make sure we know when we pulled the data. This can be used later when trying to track any changes in data that might be more of a snapshot vs. updating data.
This can be done by using the DateTime
object, as shown below:
This filename will be used later with the extract function we are creating below:
# Step 1: Import all necessary packages.
# For scheduling
import datetime as dt
# For function jsonToCsv
import pandas as pd
# For function csvToSql
import csv
import pymysql
# Backwards compatibility of pymysql to mysqldb
pymysql.install_as_MySQLdb()
# Importing MySQLdb now
import MySQLdb
# For Apache Airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Step 2: Define functions for operators.
# A JSON string reader to .csv writer function.
def jsonToCsv(url, outputcsv):
# Reads the JSON string into a pandas DataFrame object.
data = pd.read_json(url)
# Convert the object to a .csv file.
# It is unnecessary to separate the JSON reading and the .csv writing.
data.to_csv(outputcsv)
return 'Read JSON and written to .csv'
This task still needs to get implemented. However, we will show you that at the end once we have the loading function set up.
Loading Data Into MySQL With Airflow
Once you have an extract, your next step is to load your data into some sort of raw layer in your data warehouse.
The key in this step is to not manipulate your data. The reason being that if your data has some form of data issue or problem from the data source, then it is easier to trace back.
You can do this by putting in data quality checks at each step. In the raw checks, you're usually checking that data types make sense.
For example, are all the date fields dates? Are all the states valid states? Believe it or not, we have had problems here. "WE" is not a state abbreviation.
These are sanity checks to make sure the data you are pulling is correct.
In theory, your application should have double-checked user inputs. However, we never trust the application layer.
All that aside, you can use the code below. What you will notice is we first have a database connection using MySQL and then load the CSV row by row:
def csvToSql():
# Attempt connection to a database
try:
dbconnect = MySQLdb.connect(
host='localhost',
user='root',
passwd='databasepwd',
db='mydb'
)
except:
print('Can\'t connect.')
# Define a cursor iterator object to function and to traverse the database.
cursor = dbconnect.cursor()
# Open and read from the .csv file
with open('./rogoben.csv') as csv_file:
# Assign the .csv data that will be iterated by the cursor.
csv_data = csv.reader(csv_file)
# Insert data using SQL statements and Python
for row in csv_data:
cursor.execute(
'INSERT INTO rogobenDB3(number, docusignid, publicurl, filingtype, \
cityagencyname, cityagencycontactname, \
cityagencycontacttelephone, cityagencycontactemail, \
bidrfpnumber, natureofcontract, datesigned, comments, \
filenumber, originalfilingdate, amendmentdescription, \
additionalnamesrequired, signername, signertitle) ' \
'VALUES("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", \
"%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s")',
row
)
# Commit the changes
dbconnect.commit()
'''
# Print all rows - FOR DEBUGGING ONLY
cursor.execute("SELECT * FROM rogobenDB3")
rows = cursor.fetchall()
print(cursor.rowcount)
for row in rows:
print(row)
'''
# Close the connection
cursor.close()
# Confirm completion
return 'Read .csv and written to the MySQL database'
If you were building a more robust system, then you would probably set up some form of database manager class that just took the connection string that you imported.
However, since we are just building this for a demo, we have put that code all in one function.
With all these functions set up, you can now officially set up your DAG. As I mentioned in previous articles, a DAG acts like a flow chart. It guides which tasks run first and which tasks are dependent on other tasks.
In our case, we have a task that extracts the data and another task that loads said data into a MySQL table. These two basic tasks will help get your pipeline started and it will look like the one below.
Setting Up Your Airflow Pipeline
Now with all these functions, we can set up the pipeline.
Setting up an actual pipeline in Airflow requires that you set up a default set of arguments. This allows you to set the owner, start date, how often the pipeline will retry, and several other parameters:
# Step 3: Define the DAG, i.e. the workflow
# DAG's arguments
default_args = {
'owner': 'rogoben',
'start_date':dt.datetime(2020, 4, 16, 11, 00, 00),
'concurrency': 1,
'retries': 0
}
# DAG's operators, or bones of the workflow
with DAG('parsing_govt_data',
catchup=False, # To skip any intervals we didn't run
default_args=default_args,
schedule_interval='* 1 * * * *', # 's m h d mo y'; set to run every minute.
) as dag:
opr_json_to_csv = PythonOperator(
task_id='json_to_csv',
python_callable=jsonToCsv,
op_kwargs={
'url':'https://data.sfgov.org/resource/pv99-gzft.json',
'outputcsv':'./rogoben.csv'
}
)
opr_csv_to_sql = PythonOperator(
task_id='csv_to_sql',
python_callable=csvToSql
)
# The actual workflow
opr_json_to_csv >> opr_csv_to_sql
In addition to the parameters, you will need to actually set up your specific operators. In this case, we have our two functions: jsonToCSV
and csvToSql
. These will be used in a PythonOperator
. This allows you to create what we call tasks.
In order to make sure the tasks operate in the order that will make sense, you need to define the dependencies.
You can define a dependency by using the bit shift operator. For those unfamiliar with the bit shift operator, it looks like >>
or <<
.
In this case, you would define it like opr_json_to_csv >> opr_csv_to_sql
.
This ensures that the opr_json_to_csv
runs before opr_csv_to_sql
.
Truth be told, you will have duplicate data loading this way.
In order to deal with duplicate data, you can load the raw layer and then check to ensure you're not loading duplicate data in the later staging layer. So we won't worry about that for now.
With that, you have essentially completed your first pipeline.
So now where do you put your pipeline?
In order to have this pipeline run, you will need to save it in your airflow/dags
folder that you have set up. If you still haven't set it up, then you should by using our favorite Airflow setup guide.
airflow # airflow root directory.
├── dags # the dag root folder
│ ├── first_dag.py # where you put your first task
Once this pipeline is saved --- and as long as you have Airflow running in the background --- your DAG will automatically get picked up by Airflow.
You can check this out by going to your localhost:8080, which is where Airflow's dashboard runs by default.
From there, your DAG should appear.
Once it shows up, you can start to look into your DAG to make sure all the various tasks are set up.
It will look like the images below:
Now your pipeline is ready to go.
Finishing Your First Data Pipeline
Congrats on building and automating your first Airflow data pipeline! Now you can take this framework and utilize it across your other ETLs and data pipelines.
This, of course, is just the first layer of a data platform or data warehouse. From here, you will still need to create a production layer, a metrics layer, and some sort of data visualization or data science layer.
Then you can really start making an impact with your data.
If you would like to read more about data science, cloud computing and technology, check out the articles below!
Data Engineering 101: Writing Your First Pipeline
5 Great Libraries To Manage Big Data
What Are The Different Kinds Of Cloud Computing
4 Simple Python Ideas To Automate Your Workflow
4 Must Have Skills For Data Scientists
SQL Best Practices --- Designing An ETL Video
Top comments (0)