In this article we will go through the process to auto ingest data into Snowflake from Google Cloud Storage (GCS) using a Snow Pipe. Along the way we will understand the required concepts and tasks involved in a step by step manner.
You can refer to the official docs here.
Assumptions
- You have an active Google Cloud account with permission to create IAM roles, GCS buckets, Pub/Sub topics.
- You have an active Snowflake account with permission to create Database, Schema, Stage and Integration Objects.
Data Source: Open Data
A Snowpipe is an event-based data ingestion tool that speeds up the process of loading data from files as soon as they arrive at a designated staging location. We will use Google Cloud Storage (GCS) in this example to load data into Snowflake. Google PubSub will be used to notify the Snowpipe as soon as a file is placed in GCS.
As a pre-requisite, refer to the Part-I article and create all the objects mentioned there before continuing further.
Create a Topic (GCP)
We need to create a Topic in Google PubSub. A message will be published to this topic whenever a new file is uploaded into the GCS bucket/folder.
Here we are creating a topic snowpipe_gcs_topic
. The gcloud CLI command below will create the topic if it does not already exist. It will also verify that the GCS bucket has permission to publish events to this topic and grant the required permission if necessary. The OBJECT_FINALIZE
event is sent out when a new object is successfully created in the bucket.
gcloud storage buckets notifications create -f json gs://snowflake_gcs-stage-bucket -t snowpipe_gcs_topic -e OBJECT_FINALIZE
gcloud pubsub topics describe snowpipe_gcs_topic
Create a Subscription (GCP)
Next we need to create a subscription to the topic so that we get notified whenever a new message is published to the topic. Here we are creating a subscription snowpipe_gcs_sub
to the topic snowpipe_gcs_topic
gcloud pubsub subscriptions create snowpipe_gcs_sub --topic=snowpipe_gcs_topic
gcloud pubsub subscriptions describe snowpipe_gcs_sub
Make sure that the subscription state is active
Service Account Permission (GCP)
A Service Account is created by Snowflake when a Storage Integration Object is created. This Service Account needs to be provided with PubSub role.
Please refer to Part-I for more details on how to create a Storage Integration Object and provide appropriate permissions.
Select the Subscription name in PubSub UI. Click on View Permissions.
Click on Add Principal.
Input the Service Account name and add the PubSub Subscriber Role to the Service Account.
Navigate to the Dashboard page in the Cloud Console, and select your project from the dropdown list.
Click the ADD PEOPLE TO THIS PROJECT button.
Add the service account name. From the Select a role dropdown, select Monitoring Viewer.
Click the Save button. The service account name is added to the Monitoring Viewer role.
Create a Notification Integration (Snowflake)
The notification integration references the Pub/Sub subscription in Google Cloud. Snowflake associates the notification integration with a GCS service account created for your Snowflake account. This service account is created when we created the Storage Integration Object.
Replace the GCP-PROJECT-ID
in the below command with your own project ID.
CREATE NOTIFICATION INTEGRATION snowflake_gcp_pubsub_int
TYPE = QUEUE
NOTIFICATION_PROVIDER = GCP_PUBSUB
ENABLED = true
GCP_PUBSUB_SUBSCRIPTION_NAME = 'projects/GCP-PROJECT-ID/subscriptions/snowpipe_gcs_sub';
DESC NOTIFICATION INTEGRATION snowflake_gcp_pubsub_int;
Create a Snow Pipe (Snowflake)
Lets create a Snow Pipe which will auto-ingest data into the Snowflake table as it arrives in the GCS location.
CREATE SCHEMA manuf_db.pipes;
CREATE PIPE manuf_db.pipes.snow_gcs_pipe
AUTO_INGEST = true
INTEGRATION = snowflake_gcp_pubsub_int
AS
COPY INTO manuf_db.public.manuf_tbl
FROM @manuf_db.stages.snowflake_gcp_stage
PATTERN = '.*manufacturers.*'
FILE_FORMAT = manuf_db.file_formats.file_format_csv;
DESC PIPE manuf_db.pipes.snow_gcs_pipe;
SHOW PIPES IN manuf_db.pipes;
Check the Status of the Snow Pipe. It should be in RUNNING status for it to work correctly. Make sure the Service Account is provided with correct permissions in GCP (as described above).
SELECT SYSTEM$PIPE_STATUS('manuf_db.pipes.snow_gcs_pipe');
Upload files in GCS (GCP)
Upload a file in the stage bucket/folder. As soon as the file is loaded a message will be published to the Topic snowpipe_gcs_topic
.
Click on the PubSub Subscription and click on the Metrics tab to view the messages published.
The file now should be loaded to the manuf_tbl in Snowflake. It takes around a minute or two for the data ingestion to happen.
You may load multiple files in the GCS bucket and those will be loaded using the Snow Pipe.
Verify Ingested Data (Snowflake)
Run the below SQL to verify if the data is loaded in the table.
SELECT * FROM manuf_db.public.manuf_tbl;
SELECT count(*) FROM manuf_db.public.manuf_tbl;
The Copy History of the files can also be seen in Snowflake.
Below is the output from the Pipe Status. This data can be very handy for debugging purposes. It displays the last file that was ingested along with the timestamp.
SELECT SYSTEM$PIPE_STATUS('manuf_db.pipes.snow_gcs_pipe');
All Objects created in Snowflake Account Part-I and Part-II)
Fun Fact
Based on the data used, as of 2020-11-01 there are 543 Beer Manufacturer in ON producing 1403 different kinds of Beers.
Kindly let me know if this article was helpful. Your feedback is highly appreciated.
Top comments (0)