Instead of writing ETL for each table separately, you can have technique of doing it dynamically by using database (MySQL,PostgreSQL,SQL-Server) and Pyspark. Follow some steps to write code , for better understanding I am breaking it into steps.
Step 1
create two tables on database(I am using SQL-SERVER) having name of TEST_DWH :
table etl_metadata for keeping master data of ETL (source and destination information)
CREATE TABLE [dbo].[etl_metadata](
[id] [int] IDENTITY(1,1) NOT NULL,
[source_type] [varchar](max) NULL,
[source_info] [text] NULL,
[destination_db] [varchar](max) NULL,
[destination_schema] [varchar](max) NULL,
[destination_table] [varchar](max) NULL,
[etl_type] [varchar](max) NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
table etl_metadaata_schedule for having progress of daily ETL
CREATE TABLE [dbo].[etl_metadata_schedule](
[id] [int] NULL,
[source_type] [varchar](max) NULL,
[source_info] [text] NULL,
[destination_db] [varchar](max) NULL,
[destination_schema] [varchar](max) NULL,
[destination_table] [varchar](max) NULL,
[etl_type] [varchar](max) NULL,
[status] [varchar](max) NULL,
[started_at] [datetime] NULL,
[completed_at] [datetime] NULL,
[schedule_date] [datetime] NULL
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
Step 2
Now write ETL in python using Pyspark
- Get data into pandas to loop ETL process
- use etl_type to switch sources of reading (In my case I have taken two cases , CSV and database)
- write data to destination , destination info will be used from etl_metadata
"""
Created on Thu Mar 17 11:06:28 2022
@author: Administrator
"""
#SPARK LIBRARIES
from pyspark.sql import SparkSession
import pyodbc
import pandas as pd
#initiate spark env
import findspark
findspark.init()
findspark.find()
#print(findspark.find())
spark = SparkSession \
.builder \
.appName("Python ETL script for TEST") \
.master("local[*]")\
.config("spark.driver.memory", '8g')\
.config("spark.sql.ansi.enabled ",True)\
.config("spark.jars", "C:\Drivers\sqljdbc42.jar") \
.getOrCreate()
source_type = ''
source_info = ''
destination_db=''
destination_schema=''
destination_table = ''
etl_type = ''
query_string = ''
##Initiatong variable for query establishhing
#- timedelta(43)
#today = (date.today())
#print("Today's date:", "select a.*,null status,null status_description ,null started_at,null completed_at,GETDATE() schedule_date from dbo.etl_metadata_schedule_staging where schedule_date = "+"'"+str(today)+"'")
#set variable to be used to connect the database
database = "TEST_DWH"
user = "user"
password = "password"
query_string="SELECT a.*,CONCAT(ISNULL(b.status,'Pending'),b.status) status,null status_description ,null started_at,null completed_at FROM (SELECT *,getdate() schedule_date FROM dbo.etl_metadata ) a LEFT JOIN [dbo].[etl_metadata_schedule] b ON a.id = b.id and CAST(b.schedule_date AS date)= CAST(getdate() AS date) where ISNULL(b.status,'A') != 'completed'"
#Read ETL Meta Data
etl_meta_data_staging = spark.read\
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("query", query_string) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#-------------------CREATE NEW SCHEDULE----------------------------#
etl_meta_data_staging.filter("status == 'Pending'").show()
#THEN READ BASE META DATA AND CREATE ONE ELSE DONT
etl_meta_data_staging.filter("status == 'Pending'").write \
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("dbtable", "dbo.etl_metadata_schedule") \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.mode("append")\
.save()
#-------------------END CREATE NEW SCHEDULE----------------------------#
#--------------SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
conn = pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};"
"Server=localhost,1433;"
"Database="+database+";"
"UID="+user+";"
"PWD="+password+";")
cursor = conn.cursor()
#--------------END SQL SERVER CONNECTION TO MAINTAIN ERROR STATE-------------#
df_etl_meta_data_staging = etl_meta_data_staging.toPandas()
df_etl_meta_data_staging=df_etl_meta_data_staging.sort_values('id')
#---LOOP : read FROM SOURCE (extโบract) and write to destination.---#
for etl_id in df_etl_meta_data_staging['id']:
status = 'In Progress'
print("Starting for "+ str(etl_id))
#---------------UPDATE In Progress Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[started_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE In Progress Status---------------#
# load meta data into variables
source_type = df_etl_meta_data_staging['source_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
source_info = df_etl_meta_data_staging['source_info'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_db = df_etl_meta_data_staging['destination_db'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_schema = df_etl_meta_data_staging['destination_schema'][df_etl_meta_data_staging['id']==etl_id].values[0]
destination_table = df_etl_meta_data_staging['destination_table'][df_etl_meta_data_staging['id']==etl_id].values[0]
etl_type = df_etl_meta_data_staging['etl_type'][df_etl_meta_data_staging['id']==etl_id].values[0]
# initialize empty status for each run
status = ''
# Read data from spurce try to read otherwise through exception
#print(url_link)
#print("Reading via ", source_info)
# Read module data
try:
print("Reading via ", source_info)
# Read module data
if source_type == 'CSV':
jdbcDF = spark.read\
.format("csv") \
.option("header", "true") \
.option("quote", "\"") \
.option("escape", "\"") \
.load(source_info)
status= 'read_successful'
jdbcDF.show()
elif source_type == 'sqlserver':
jdbcDF = spark.read\
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+database+"};") \
.option("query", source_info) \
.option("user", user) \
.option("password", password) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#Try to Write Extracted data relevant to destination table
try:
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:sqlserver://localhost:1433;databaseName={"+destination_db+"};") \
.option("dbtable", destination_schema+"."+destination_table) \
.option("user", user) \
.option("password", password) \
.option("truncate", "true") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.mode("overwrite")\
.save()
status = 'completed'
print("Write Successful")
#---------------UPDATE Success Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Success Status---------------#
#except of Write Extracted data relevant to destination table
#---------------UPDATE Success Status---------------#
except Exception as e :
print('some error in writing')
status = 'error in writing to destination db, '+str(e)
#---------------UPDATE Error Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Error Status---------------#
#except of Read module data
except Exception as e :
print("some error in reading from source")
status = 'error reading source , '+str(e)
print(status)
#---------------UPDATE Error Status---------------#
cursor.\
execute('''UPDATE [TEST_DWH].[dbo].[etl_metadata_schedule]
SET [status]=\''''
+status+ "',[completed_at]= CURRENT_TIMESTAMP where id= '"+ str(etl_id)+"';")
conn.commit()
#---------------UPDATE Error Status---------------#
Top comments (2)
Great contribution with easy documentation.
Thanks
This way could save time by reducing the need of writing ETL for each table
Thanks <3