DEV Community

Atsushi Suzuki
Atsushi Suzuki

Posted on

Automating Looker Studio Data Updates with S3 CSVs Processed through BigQuery

Introduction

In my company's Customer Success department, we share analytical data using Looker Studio during meetings with our clients. However, this process had several challenges.

For the first time in addressing these challenges, I worked with GCP tools (BigQuery, Cloud Functions), and I've documented the experience in this article.

Traditional Procedure

  1. Export data for the past week in CSV format from the application's management screen.
  2. Manually copy that data into a Google Spreadsheet.
  3. Load the data from the spreadsheet into Looker Studio and conduct the analysis.

Challenges

  • Downloading CSV files and copying them to spreadsheets required a bit of effort and time.
  • Managing each client's data was dependent on manual work, requiring continuous updates, thus inefficient.
  • With increasing data volumes, there was a risk of reaching the row limit in Google Spreadsheets.
  • Downloading large volumes of data could burden the production database (RDS), potentially impacting performance.

The row limit in spreadsheets was an urgent issue.

Implementation of the Solution

  • Introduced a mechanism to automatically load CSV files stored in Amazon S3 into GCP's BigQuery.
  • Created views (virtual tables) in BigQuery for each client_id, making only the necessary data accessible according to Google account permissions.

This improvement realized automation and efficiency in data management and reduced database load.

Reasons for Choosing BigQuery

There were options to use Redshift or Athena as data sources for Looker Studio, but BigQuery was chosen due to the following limitations:

  • Wanted to avoid significant costs.
  • Needed to maintain the analysis dashboards in Looker Studio according to each client's customization requirements.
  • Looker Studio does not support data loading from Athena.

Detailed Data Update Procedure

The data update process is as follows. The reason the CSV upload destination is S3 is that the application's infrastructure was originally composed in AWS. The source of the CSV files is RDS.

Image description

The procedure is as follows:

  1. CSV Upload (AM 9:00): The previous day's summary data is automatically uploaded to the S3 bucket (summary/crawler/*).
  2. Data Transfer to BigQuery (AM 9:15): The BigQuery Data Transfer Service is executed, transferring data from S3 to the summary dataset's summary_all table in BigQuery.
  3. Notification of Successful Data Transfer: Upon successful data transfer, a message is posted to the succeeded topic in Cloud Pub/Sub. Then, Google Cloud Functions (GCF) notify-update-summary-all receives this message and sends a notification to Slack.
  4. Automatic View Update: When the summary_all table is updated, the view corresponding to each client is also automatically updated.

Implementation Overview

Transfer of Existing Data

First, it is necessary to transfer several months' worth of daily CSV files stored in S3 to a BigQuery table.

Creation of Original Data Dataset and Views in BigQuery

Create a dataset summary from the project analytics-aws-to-gcp and create the table summary_all using the following query:



CREATE TABLE `analytics-aws-to-gcp.summary.summary_all`
(
  created_at STRING,
  client_id INT64,
  url STRING,
  ...
)


Enter fullscreen mode Exit fullscreen mode

Data Transfer from S3 to BigQuery

Use the BigQuery Data Transfer Service for transferring data from the S3 bucket to BigQuery tables.

The process of creating the transfer is initiated.

Image description

To load multiple CSVs from the S3 bucket analytics-aws-to-gcp, use a wildcard like s3://analytics-aws-to-gcp/* in the Amazon S3 URI.

Additionally, accessing S3 from BigQuery requires credentials (access key and secret access key). Therefore, it's necessary to create an IAM user with the AmazonS3ReadOnlyAccess policy on the AWS side and issue an access key.

Number of errors allowed specifies the maximum number of errors allowed during data transfer.

Automated Creation of Views for Each Client

After the data transfer to BigQuery's summary_all table is complete, views corresponding to each client are created. Although it is possible to create them manually, the existence of numerous clients makes automation through Cloud Functions create-summary-view-by-client-id efficient.



import os
from google.cloud import bigquery
from flask import jsonify

def create_summary_view_by_client_id(_):
    # Initialize BigQuery client
    client = bigquery.Client()

    print("Function execution started.")

    # Execute query to extract list of `client_id`
    client_ids_query = """
    SELECT DISTINCT client_id
    FROM `analytics-aws-to-gcp.summary

.summary_all`
    """
    query_job = client.query(client_ids_query)

    # Generate list of client_ids from query results
    client_ids = [row["client_id"] for row in query_job.result()]

    for client_id in client_ids:
        # Create dataset for each client
        dataset_id = f"analytics-aws-to-gcp.summary_{client_id}"
        dataset = bigquery.Dataset(dataset_id)
        dataset.location = "asia-northeast1"
        try:
            client.create_dataset(dataset, exists_ok=True)  # Create dataset if it doesn't exist
            print(f"Created dataset {dataset_id}")
        except Exception as e:
            print(f"Error creating dataset for client_id {client_id}: {e}")
            continue

        # Create view within the dataset
        view_id = f"{dataset_id}.summary_view"
        view_query = f"""
            SELECT *
            FROM `analytics-aws-to-gcp.summary.summary_all`
            WHERE client_id = {client_id}
        """
        view = bigquery.Table(view_id)
        view.view_query = view_query
        try:
            # Create view and get the resulting Table object
            created_view = client.create_table(view, exists_ok=True)
            print(f"Created view at {created_view.full_table_id}")
        except Exception as e:
            print(f"Error creating view for client_id {client_id} in dataset {dataset_id}: {e}")

    print("Function execution completed successfully.")

    return jsonify({"message": "View creation completed successfully"}), 200


Enter fullscreen mode Exit fullscreen mode


functions-framework==3.*
google-cloud-bigquery


Enter fullscreen mode Exit fullscreen mode

Run the function from the "Testing" tab by opening CLOUD SHELL and executing the test command:



curl -m 3610 -X POST https://asia-northeast1-analytics-aws-to-gcp.cloudfunctions.net/create-summary-view-by-client-id \
-H "Authorization: bearer $(gcloud auth print-identity-token)" \
-H "Content-Type: application/json" \
-d '{}'


Enter fullscreen mode Exit fullscreen mode

Granting Permissions to Datasets

After creating the views, fine-grained permission settings are applied so that each client can only view data related to them. However, since the views refer to the summary_all table in the original summary dataset, granting permissions to a client-specific dataset does not allow viewing the views unless access permission to the summary dataset is also granted.

As a solution, each client-specific view is registered as an authorized view within the original summary dataset. This allows each client to safely view data from the summary_all table through their view.

Image description

Automated Daily Data Updates

Once the transfer of existing data and initial setup are complete, new data is automatically added on a daily basis.

Data Transfer from S3 to BigQuery

Similar to the transfer of existing data, use the BigQuery Data Transfer Service. The transfer time is specified in UTC.

Image description

Also, turn ON the notification option for Pub/Sub notifications. This way, when the transfer is successful, a message is published to the Pub/Sub topic.

Image description

Automation of Slack Notifications

Create a Cloud Functions notify-update-summary-all. This function notifies Slack based on the message received from Pub/Sub about the success of data transfer by BigQuery Data Transfer Service.



import os
import json
import requests
import base64
from flask import jsonify

def notify_update_summary_all(event, context):
    # Slack Webhook URL
    webhook_url = os.environ.get('SLACK_WEBHOOK_URL')

    if 'data' in event:
        message_data = base64.b64decode(event['data']).decode('utf-8')
        message = json.loads(message_data)
    else:
        message = {'message': 'No data found in Pub/Sub message'}

    slack_message = {
        "text": f"Summary data transfer to BigQuery has been completed: {message.get('message')}"
    }

    response = requests.post(webhook_url, json=slack_message)

    print(response.text)

    return jsonify(success=True), 200


Enter fullscreen mode Exit fullscreen mode


functions-framework==3.*
requests


Enter fullscreen mode Exit fullscreen mode

When deploying this function, specify the created Pub/Sub topic as the event trigger and set the Slack Webhook URL as a runtime environment variable. This ensures that the function is executed every time a message is published to the specified topic, sending a notification to the designated Slack channel.

Image description

Top comments (0)