Objective of Pipeline
The objective of the data pipeline is to land the data in the data lake so that it can be further processed
by the different teams for effective decision-making and building machine learning products.
Currently, most of the analytical SQL queries run over the schema of data owned by the microservices
within the organization. The previous approach was to run these analytical SQL queries over the
production PostgreSQL databases.
In case of heavy workloads in SQL, we faced problems with the
performance of the PostgreSQL database in production. Thus we further migrated to separating the RDS
instances as per the microservices.
In order to get the data from the micro-service databases to a central
location, we have created a data pipeline.
Data Sources
Database Migration Service
AWS Database Migration Service helps to migrate databases to AWS resources securely. The source
database remains fully operational during the migration, minimizing downtime to applications that rely
on the database. AWS DMS supports both homogeneous as well as heterogeneous migrations between
different database platforms. With AWS DMS, we can continuously replicate data with low latency from
any supported source to any supported targets. In our case, we use the PostgreSQL RDS instances as
source and S3 as our data lake which is a highly available and scalable data lake solution.
Pre-configurations for AWS DMS
Source Configurations
In our case, the source for the AWS DMS is the RDS clusters. Since we want to get the data present in
those database clusters via AWS DMS to our target, we need to enable the create a new database
parameter and apply it to the RDS clusters from which we want the data.
Reference link:
https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html
Follow the following steps to create the new parameter.
- Go to the AWS console and open the dashboard for RDS.
- From the left navigation menu bar, go to the Parameter Groups.
- Click Create Parameter group button to a new parameter group.
- Select the DB family which will be Postgres 11 in our case.
- The type will be the DB parameter group.
- Provide an appropriate Group name (i.e. cdc-parameter-group)
- Add proper description for the parameter group and click create.
- Now Edit the created parameter group and search for rds.logical_replication. For rds.logical_replication, set the value to 1. You can also set the max_replication_slots value.
- Save changes to update the parameter group. Now we will have to apply the parameter group to the RDS cluster that is our source of data. For that please follow the following guidelines:
- From the RDS dashboard navigation bar select Databases.
- Select the appropriate RDS cluster.
- Click on modify button.
- Search for the Database Options where you will find the DB parameter group.
- From the drop-down, select the recently created parameter group.
- Click the continue button. After this, the database will be in modifying state but the parameters wonโt be applied so once the DB is modified, we will have to manually restart the RDS DB for the parameter group to be applied. 7 Target Configurations Our target for the DMS is the AWS S3 bucket which will act as our raw data lake. So we will need to create an S3 bucket which will hold our raw data from RDS. IAM role configuration An IAM role is required to access the RDS clusters as well as the S3 bucket. This role will be used by the AWS DMS in order to access the data from RDS instances and offload the data to S3 bucket. 1.3.3. AWS DMS configurations Replication instance Use the following guidelines to create the replication instance:
- Click on the replication instances from the AWS DMS console dashboard.
- Click on the create replication instance button
- Provide appropriate names and descriptions for the DMS instance.
- Select the appropriate instance class.
- Select the appropriate engine version (default 3.4.6)
- Select the allocated storage for the replication instance as per the replication lag that will be faced.
- Select the appropriate VPC where the Source and target AWS resources reside.
- Disable publicly accessible.
- Provide appropriate tags for the replication instance. Endpoints Before creating any other DMS configurations, we need to configure the source and target in the AWS DMS dashboard. The following guidelines can be used for creating the source endpoints:
- Click on the Endpoints from the left nav bar.
- For the source endpoint, select the source endpoint button. There is a Select RDS DB instance checkbox from which the RDS instance can also be selected.
- In the endpoint configuration, provide an appropriate Endpoint Identifier and description.
- In the Access to endpoint database, we have two options. For the sake of this documentation, we will be providing the access information manually
- Under endpoint settings, we have a checkbox as Use endpoint connection attributes. Tick the check box and provide the heartbeat configuration as: heartbeatEnable=true;heartbeatFrequency=1;
- Under the tags section, provide the appropriate tags for the DMS source.
- Under test endpoint connection, choose the appropriate VPC and previously created DMS replication instance.
- Run the test and if the test connection is successfully created the source endpoint. 8 Similarly, we need to create the endpoint for the target which is the S3 bucket. Use the following guidelines to create the Target endpoint:
- Click on create endpoint.
- Enter the endpoint identifier which is the name for the target endpoint.
- Select S3 as the target engine.
- Provide the IAM role that was created earlier that has access to the S3 bucket.
- Provide the bucket name and bucket folder.
- Enter appropriate tags for the target endpoint.
- Test the endpoint connection and create the endpoint if the test is successful. DMS Task Creating a DMS task requires the following things to be configured.
- Task identifier (task name)
- Description
- Replication instance.
- Source database endpoint.
- Target database endpoint
- Migration type (migrate existing data and replicate ongoing changes)
- Task settings
- Table mappings
- Migration task start-up configuration.
While configuring everything is easy to work, the table mappings for larger tables and all the tables
present in a database can be quite a time taking. Thus We will create the DMS tasks using a python
script which will query the database for the existing schema and generate the Table mapping JSON.
link to python script repository:
Once the DMS task is created, we can start the task when it is ready. Initially, the task will perform the
full load, once that is done. The task will stop. We will have to run the glue jobs just to consider the full
load data initially. Once that is done. We will resume the DMS tasks to consider the CDC data.
1.4. Processing the data from Raw data lake using AWS glue along with
Apache hudi
If you have gone through the python script to create the DMS task from the EC2 instance, we have set
up a setting in to turn off the DMS task once the full load is completed. This is because we want to
process the full load data before going through the CDC data. This is because for processing the
incoming data with Apache Hudi and then inserting it into the final data lake, the full load can be
inserted with the bulk insert functionality which is efficient. That is why our glue jobs will have a section
for bulk insert, updates and then deletes.
Refer to the following repository for the Glue job script:
Before creating the glue jobs,n we will require to download the jar files for hudi and spark avro. 9 Hudi bundle: https://libraries.io/maven/org.apache.hudi:hudi-utilities-bundle_2.11 Spark Avro: http://www.java2s.com/example/jar/s/spark-avro-2.11-index.html Once these files are downloaded, create a folder in the S3 bucket and upload these files to that location as it will be used later by the Glue jobs for handling the Upsert CDC data. Use the following guidelines to create the Glue jobs: - Go to AWS console and search for Glue.. Then go to the glue dashboard.
- Under the Glue dashboard, go to Jobs(legacy) and then click on the add job.
- The first section is about the Job properties. Fill in the required data: a. Name: appropriate job name b. IAM role (This IAM role should have access to read the data from the raw data lake and write to the processed data lake) c. Type: Spark d. Glue version Spark 2.4, Python 3 (Glue Version 2.0) e. This job runs: An existing script that you provide f. S3 path where the script is stored: Get the s3 path of the script. g. Temporary Directory: leave it as it is h. Advanced properties: Enable Job Bookmark i. Monitoring options: Continuous logging j. Tags: enter the appropriate tag such as Name, Owner, Project, Environment and module. k. Security Configurations
i. Python library path: leave empty
ii. Dependent Jar path: Add the s3 location of the hudi jar and spark avro jar.
iii. Referenced files path: s3 path for the data files that we got from Part 1, which is
the raw data lake.
iv. Worker type: Standard
v. Max no. of workers: 2
vi. Max concurrency: 1
vii. Job timeout (minutes): 2880 minutes
viii. Delay notification threshold: leave empty
ix. A number of retries:
x. Job Parameters: take reference from the table below.
l. Catalogue options: check the use of glue data catalogue as hive meta store
Top comments (0)