Introduction
This article introduces a method to transfer data managed in HubSpot—specifically 'contacts,' 'companies,' and 'deals'—to BigQuery, which serves as a data source for BI tools like Looker Studio. Previously, we utilized a SaaS called TROCCO to transfer data from HubSpot to a spreadsheet, from which Looker Studio would load data. However, changes to TROCCO's pricing plan made it impossible to update data as frequently as every two hours within the free tier. Additionally, the slow data loading from the spreadsheet presented challenges, prompting the adoption of Google Cloud Functions (GCF) and BigQuery as alternatives.
GCP Configuration
-
Project Name:
hubspot-to-bigquery
-
Google Cloud Functions (GCF):
sync-hubspot-to-bigquery
- Implemented in Python
-
Cloud Scheduler:
sync-hubspot-to-bigquery-job
- Executes the job every hour at the 0th minute
-
BigQuery:
- Dataset:
hubspot_data
- Tables:
contacts
,companies
,deals
- Dataset:
Data Flow
- Cloud Scheduler triggers the GCF once every hour.
- GCF extracts data from HubSpot and transfers it to BigQuery.
- Note: To update data, all records are deleted before new records are added.
- Looker Studio generates reports by referencing the tables in BigQuery.
Implementation
Issuing a HubSpot Access Token
Create a private app from the settings menu in HubSpot and set the required scopes (crm.object.deals.read
, crm.object.companies.read
, crm.object.contacts.read
). After creating the private app, copy the access token for later use.
Creating BigQuery Tables
Create the hubspot_data
dataset in BigQuery and set up tables for deals
, companies
, and contacts
using the following SQL query for the companies
table:
CREATE TABLE `hubspot-to-bigquery.hubspot_data.companies` (
id INT64,
created_at TIMESTAMP,
updated_at TIMESTAMP,
about_us STRING,
...
)
Setting Up Google Cloud Function (GCF)
Below is the complete script for the function:
import os
import logging
from datetime import datetime, timezone, timedelta
from hubspot import HubSpot
from hubspot.crm.contacts import ApiException
from google.cloud import bigquery
from google.api_core.retry import Retry
logging.basicConfig(level=logging.INFO)
client = bigquery.Client()
contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}
companies_table_id = "hubspot-to-bigquery.hubspot_data.companies"
companies_table = client.get_table(companies_table_id)
companies_table_schema_keys = {field.name for field in companies_table.schema}
deals_table_id = "hubspot-to-bigquery.hubspot_data.deals"
deals_table = client.get_table(deals_table_id)
deals_table_schema_keys = {field.name for field in deals_table.schema}
# Properties to fetch from HubSpot
contacts_properties = ["id", "created_at", "updated_at", "company_size", "date_of_birth", ...]
companies_properties = ["id", "created_at", "updated_at", "about_us", ...]
deals_properties = ["id", "created_at", "updated_at", "amount_in_home_currency", ...]
def sync_hubspot_to_bigquery(_):
access_token = os.getenv("ACCESS_TOKEN")
if not access_token:
logging.error("Access token not found in environment variables")
return "Access token not found in environment variables", 500
api_client = HubSpot(access_token=access_token)
try:
# Delete all records for fresh update
delete_table_records(contacts_table_id)
delete_table_records(companies_table_id)
delete_table_records(deals_table_id)
# Fetch data from HubSpot
contacts_fetched = api_client.crm.contacts.get_all(properties=contacts_properties)
companies_fetched = api_client.crm.companies.get_all(properties=companies_properties)
deals_fetched = api_client.crm.deals.get_all(properties=deals_properties)
# Create rows to insert into BigQuery
contacts_rows = create_rows_to_insert(contacts_fetched, contacts_table_schema_keys)
companies_rows = create_rows_to_insert(companies_fetched, companies_table_schema_keys)
deals_rows = create_rows_to_insert(deals_fetched, deals_table_schema_keys)
# Insert data into BigQuery tables
insert_rows_bigquery(contacts_table_id, contacts_rows)
insert_rows_bigquery(companies_table_id, companies_rows)
insert_rows_bigquery(deals_table_id, deals_rows)
success_message = f"Data synchronized successfully: {len(contacts_rows)} contacts, {len(companies_rows)} companies, and {len(deals_rows)} deals updated."
logging.info(success_message)
return success_message, 200
except ApiException as e:
error_message = f"Exception when requesting: {e}"
logging.error(error_message)
return error_message, 500
def delete_table_records(table_id):
delete_query = f"DELETE FROM `{table_id}` WHERE TRUE"
try:
query_job = client.query(delete_query)
query_job.result()
logging.info(f"All records have been deleted from {table_id}.")
except Exception as e:
logging.error(f"Failed to delete records from {table_id}: {e}")
def convert_utc_to_jst(timestamp):
jst_zone = timezone(timedelta(hours=9))
jst_time = timestamp.astimezone(jst_zone)
logging.debug(f"Converted {timestamp} to {jst_time}")
return jst_time.isoformat()
def create_rows_to_insert(fetched_data, table_schema_keys):
rows_to_insert = []
for data in fetched_data:
data_properties = data.properties
row = {
"id": data.id,
"created_at": convert_utc_to_jst(data.created_at) if data.created_at else None,
"updated_at": convert_utc_to_jst(data.updated_at) if data.updated_at else None
}
for key, prop in data_properties.items():
if key in table_schema_keys:
value = prop if prop != '' and prop is not None else None
if isinstance(value, datetime):
value = convert_utc_to_jst(value)
row[key] = value
rows_to_insert.append(row)
return rows_to_insert
def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
for i in range(0, len(rows_to_insert), batch_size):
batch = rows_to_insert[i:i + batch_size]
try:
errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
if errors:
logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")
else:
logging.info(f"Batch {i // batch_size + 1} inserted successfully into {table_id}.")
except Exception as e:
logging.error(f"Error inserting data into {table_id}: {e}")
The requirements for this function are as follows:
functions-framework==3.*
hubspot-api-client
google-cloud-bigquery
Log Configuration and BigQuery Client Initialization
Logging is set to the INFO level, and the BigQuery client is initialized with default project settings. We also retrieve the IDs and schema keys of the BigQuery tables that will be used later for data insertion.
logging.basicConfig(level=logging.INFO)
client = bigquery.Client()
contacts_table_id = "hubspot-to-bigquery.hubspot_data.contacts"
contacts_table = client.get_table(contacts_table_id)
contacts_table_schema_keys = {field.name for field in contacts_table.schema}
# Similarly, settings for the company and deals tables are also configured.
Definition of the Data Synchronization Function
The sync_hubspot_to_bigquery
function retrieves the HubSpot access token from environment variables and initializes the API client. Subsequently, it extracts data from HubSpot using specified properties and inserts it into BigQuery.
def sync_hubspot_to_bigquery(_):
access_token = os.getenv("ACCESS_TOKEN")
if not access_token:
logging.error("Access token not found in environment variables")
return "Access token not found in environment variables", 500
api_client = HubSpot(access_token=access_token)
# Detailed steps for data extraction and insertion are described later.
Data Insertion and Retry Policy
The data extracted is batch-inserted into BigQuery. A retry policy is set up to automatically retry failed insertion operations, ensuring data integrity.
def insert_rows_bigquery(table_id, rows_to_insert, batch_size=100):
custom_retry = Retry(initial=1.0, maximum=10.0, multiplier=2.0, deadline=1200.0)
for i in range(0, len(rows_to_insert), batch_size):
batch = rows_to_insert[i:i + batch_size]
errors = client.insert_rows_json(table_id, batch, retry=custom_retry)
if errors:
logging.error(f"Errors occurred in batch {i // batch_size + 1}: {errors}")
Notes on BigQuery Data Updates and Time Conversion
- Streaming Buffer Limitation: The specification of BigQuery's streaming buffer means that setting a data update frequency shorter than one hour could result in errors. While we previously updated data every two hours using TROCCO, this setup allows us to update data more frequently, though care must be taken if even shorter update intervals are required.
- Necessity for Timestamp Conversion: Since TIMESTAMP data in BigQuery is stored in UTC, it is necessary to convert it to Japan Standard Time (JST). When analyzing data in Looker Studio, appropriate conversion of these timestamps allows for accurate report generation without the effects of time zone differences.
Cloud Scheduler Setup
Set up Cloud Scheduler to automatically execute the GCF every hour at minute zero. This schedule ensures that HubSpot data is regularly updated, keeping the latest information stored in BigQuery. The scheduler's Cron configuration is 0 * * * *
, which triggers the job every hour on the hour.
Top comments (0)