DEV Community

Atsushi Suzuki
Atsushi Suzuki

Posted on

Bypassing TROCCO: Direct Data Transfer from HubSpot to BigQuery Using Cloud Functions

Introduction

This article introduces a method to transfer data managed in HubSpot—specifically 'contacts,' 'companies,' and 'deals'—to BigQuery, which serves as a data source for BI tools like Looker Studio. Previously, we utilized a SaaS called TROCCO to transfer data from HubSpot to a spreadsheet, from which Looker Studio would load data. However, changes to TROCCO's pricing plan made it impossible to update data as frequently as every two hours within the free tier. Additionally, the slow data loading from the spreadsheet presented challenges, prompting the adoption of Google Cloud Functions (GCF) and BigQuery as alternatives.

GCP Configuration

  • Project Name: hubspot-to-bigquery
  • Google Cloud Functions (GCF): sync-hubspot-to-bigquery
    • Implemented in Python
  • Cloud Scheduler: sync-hubspot-to-bigquery-job
    • Executes the job every hour at the 0th minute
  • BigQuery:
    • Dataset: hubspot_data
    • Tables: contacts, companies, deals

Data Flow

  1. Cloud Scheduler triggers the GCF once every hour.
  2. GCF extracts data from HubSpot and transfers it to BigQuery.
    • Note: To update data, all records are deleted before new records are added.
  3. Looker Studio generates reports by referencing the tables in BigQuery.

Implementation

Issuing a HubSpot Access Token

Create a private app from the settings menu in HubSpot and set the required scopes (crm.object.deals.read, crm.object.companies.read, crm.object.contacts.read). After creating the private app, copy the access token for later use.

Image description

Image description

Creating BigQuery Tables

Create the hubspot_data dataset in BigQuery and set up tables for deals, companies, and contacts using the following SQL query for the companies table:

CREATE TABLE `hubspot-to-bigquery.hubspot_data.companies` (
  id INT64,
  created_at TIMESTAMP,
  updated_at TIMESTAMP,
  about_us STRING,
  ...
)
Enter fullscreen mode Exit fullscreen mode

Image description

Setting Up Google Cloud Function (GCF)

Below is the complete script for the function:

import os
import logging
from datetime import datetime, timezone, timedelta
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException
from google.cloud import bigquery
from google.api_core.retry import Retry

logging.basicConfig(level=logging.INFO)

client = bigquery.Client()
contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}

companies_table_id = "hubspot-to-bigquery.hubspot_data.companies"
companies_table = client.get_table(companies_table_id)
companies_table_schema_keys = {field.name for field in companies_table.schema}

deals_table_id = "hubspot-to-bigquery.hubspot_data.deals"
deals_table = client.get_table(deals_table_id)
deals_table_schema_keys = {field.name for field in deals_table.schema}

# Properties to fetch from HubSpot
contacts_properties = ["id", "created_at", "updated_at", "company_size", "date_of_birth", ...]
companies_properties = ["id", "created_at", "updated_at", "about_us", ...]
deals_properties = ["id", "created_at", "updated_at", "amount_in_home_currency", ...]

def sync_hubspot_to_bigquery(_):
    access_token = os.getenv("ACCESS_TOKEN")
    if not access_token:
        logging.error("Access token not found in environment variables")
        return "Access token not found in environment variables", 500

    api_client = HubSpot(access_token=access_token)
    try:
        # Delete all records for fresh update
        delete_table_records(contacts_table_id)
        delete_table_records(companies_table_id)
        delete_table_records(deals_table_id)

        # Fetch data from HubSpot
        contacts_fetched = api_client.crm.contacts.get_all(properties=contacts_properties)
        companies_fetched = api_client.crm.companies.get_all(properties=companies_properties)
        deals_fetched = api_client.crm.deals.get_all(properties=deals_properties)

        # Create rows to insert into BigQuery
        contacts_rows = create_rows_to_insert(contacts_fetched, contacts_table_schema_keys)
        companies_rows = create_rows_to_insert(companies_fetched, companies_table_schema_keys)
        deals_rows = create_rows_to_insert(deals_fetched, deals_table_schema_keys)

        # Insert data into BigQuery tables
        insert_rows_bigquery(contacts_table_id, contacts_rows)
        insert_rows_bigquery(companies_table_id, companies_rows)
        insert_rows_bigquery(deals_table_id, deals_rows)

        success_message = f"Data synchronized successfully: {len(contacts_rows)} contacts, {len(companies_rows)} companies, and {len(deals_rows)} deals updated."
        logging.info(success_message)
        return success_message, 200

    except ApiException as e:
        error_message = f"Exception when requesting: {e}"
        logging.error(error_message)
        return error_message, 500

def delete_table_records(table_id):
    delete_query = f"DELETE FROM `{table_id}` WHERE TRUE"
    try:
        query_job = client.query(delete_query)
        query_job.result()
        logging.info(f"All records have been deleted from {table_id}.")
    except Exception as e:
        logging.error(f"Failed to delete records from {table_id}: {e}")

def convert_utc_to_jst(timestamp):
    jst_zone = timezone(timedelta(hours=9))
    jst_time = timestamp.astimezone(jst_zone)
    logging.debug(f"Converted {timestamp} to {jst_time}")
    return jst_time.isoformat()

def create_rows_to_insert(fetched_data, table_schema_keys):
    rows_to_insert = []
    for data in fetched_data:
        data_properties = data.properties
        row = {
            "id": data.id,
            "created_at": convert_utc_to_jst(data.created_at) if data.created_at else None,
            "updated_at": convert_utc_to_jst(data.updated_at) if data.updated_at else None
        }
        for key, prop in data_properties.items():
            if key in table_schema_keys:
                value = prop if prop != '' and prop is not None else None
                if isinstance(value, datetime):
                    value = convert_utc_to_jst(value)
                row[key] = value
        rows_to_insert.append(row)
    return rows_to_insert

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
    custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
    for i in range(0, len(rows_to_insert), batch_size):
        batch = rows_to_insert[i:i + batch_size]
        try:
            errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
            if errors:
                logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")
            else:
                logging.info(f"Batch {i // batch_size + 1} inserted successfully into {table_id}.")
        except Exception as e:
            logging.error(f"Error inserting data into {table_id}: {e}")
Enter fullscreen mode Exit fullscreen mode

The requirements for this function are as follows:

functions-framework==3.*
hubspot-api-client
google-cloud-bigquery
Enter fullscreen mode Exit fullscreen mode

Log Configuration and BigQuery Client Initialization

Logging is set to the INFO level, and the BigQuery client is initialized with default project settings. We also retrieve the IDs and schema keys of the BigQuery tables that will be used later for data insertion.

logging.basicConfig(level=logging.INFO)
client = bigquery.Client()

contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}
# Similarly, settings for the company and deals tables are also configured.
Enter fullscreen mode Exit fullscreen mode

Definition of the Data Synchronization Function

The sync_hubspot_to_bigquery function retrieves the HubSpot access token from environment variables and initializes the API client. Subsequently, it extracts data from HubSpot using specified properties and inserts it into BigQuery.

def sync_hubspot_to_bigquery(_):
    access_token = os.getenv("ACCESS_TOKEN")
    if not access_token:
        logging.error("Access token not found in environment variables")
        return "Access token not found in environment variables", 500

    api_client = HubSpot(access_token=access_token)
    # Detailed steps for data extraction and insertion are described later.
Enter fullscreen mode Exit fullscreen mode

Data Insertion and Retry Policy

The data extracted is batch-inserted into BigQuery. A retry policy is set up to automatically retry failed insertion operations, ensuring data integrity.

def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
    custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
    for i in range(0, len(rows_to_insert), batch_size):
        batch = rows_to_insert[i:i + batch_size]
        errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
        if errors:
            logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")
Enter fullscreen mode Exit fullscreen mode

Notes on BigQuery Data Updates and Time Conversion

  • Streaming Buffer Limitation: The specification of BigQuery's streaming buffer means that setting a data update frequency shorter than one hour could result in errors. While we previously updated data every two hours using TROCCO, this setup allows us to update data more frequently, though care must be taken if even shorter update intervals are required.
  • Necessity for Timestamp Conversion: Since TIMESTAMP data in BigQuery is stored in UTC, it is necessary to convert it to Japan Standard Time (JST). When analyzing data in Looker Studio, appropriate conversion of these timestamps allows for accurate report generation without the effects of time zone differences.

Cloud Scheduler Setup

Set up Cloud Scheduler to automatically execute the GCF every hour at minute zero. This schedule ensures that HubSpot data is regularly updated, keeping the latest information stored in BigQuery. The scheduler's Cron configuration is 0 * * * *, which triggers the job every hour on the hour.

Image description

References

Top comments (0)