Avro is a row-oriented remote procedure call and data serialization framework developed within Apache's Hadoop project. It uses JSON for defining data types and protocols, and serializes data in a compact binary format.
Moving data from source to destination involves serialization and deserialization. Serialization means encoding the data from a source and preparing data structures for transmission and intermediate storage stages.
Avro provides data serialization service. Avro stores both the data definition and the data together in one message or file.
Avro relies on schemas. When Avro data is read, the schema used when writing it is always present.
import pandas as pd
from fastavro import writer, parse_schema, reader
With mall datasets from Kaggle, will read the data into pandas dataframe then create the AVRO schema and convert the pandas dataframe into records. Then write the data into avro file format.
Validate the avro file by reading it back into pandas dataframe.
# specifying the avro schema
schema = {
'doc': 'malldata',
'name': 'malldata',
'namespace': 'malldata',
'type': 'record',
'fields': [
{'name': 'CustomerID', 'type': 'int'},
{'name': 'Gender', 'type': 'string'},
{'name': 'Age', 'type': 'int'},
{'name': 'Income', 'type': 'float'},
{'name': 'SpendingScore', 'type': 'float'}
]
}
parsed_schema = parse_schema(schema)
# converting dataframe to records
records = df.to_dict('records')
# writing to avro file format
with open('malldata.avro', 'wb') as out:
writer(out, parsed_schema, records)
# reading it back into pandas dataframe
avro_records = []
#Read the Avro file
with open('/content/malldata.avro', 'rb') as fo:
avro_reader = reader(fo)
for record in avro_reader:
avro_records.append(record)
#Convert to pd.DataFrame
df_avro = pd.DataFrame(avro_records)
Lets upload the avro file to Google Cloud Storage and create BigQuery table with this avro file.
- GCS bucket
- The python code snippet for table data load
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
table_id = "<GCP Project>.avro.demo_avro_tbl"
job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.AVRO)
uri = "gs://<bucket>/malldata.avro"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
- In the gcloud shell run the python file as python3 avro_file_load.py Prints Loaded 200 rows on successful completion.
- In the BigQuery console, we can view the table
Top comments (0)