Load CSV data into Big Query via GCS bucket + cloud functions using cloud shell

dharmendra mishra
3 min readNov 5, 2020

--

I wanted to try out the automatic loading of CSV data into Big Query, specifically using a Cloud Function that would automatically run whenever a new CSV file was uploaded into a Google Cloud Storage bucket.

Before starting, let me tell you small definitions of the components we used here.

Bigquery: It's a highly scalable, cost-effective, and fully managed cloud data warehouse fully enterprise data warehouse.

Cloud Functions: Google Cloud Functions is a serverless execution environment for building and connecting cloud services.
With Cloud Functions, you write simple, single-purpose functions that are attached to events emitted from your cloud infrastructure and services.
Your function is triggered when an event being watched is fired.

Cloud Storage: Cloud storage is a model of computer data storage in which the digital data is stored in logical pools, said to be on “the cloud”.

I used the google cloud shell to perform this POC:

Step 1:

I already have 500 lines of dummy data with the title of “us-500.csv”.

So, it looks like.

$ head -3 us-500.csvfirst_name last_name company_name address city county state zip James Butt Benton, John B Jr 6649 N Blue Gum St New Orleans Orleans LA 70116 Josephine Darakjy Chaney, Jeffrey A Esq 4 B Blue Ridge Blvd Brighton Livingston MI 48116 Art Venere Chemel, James L Cpa 8 W Cerritos Ave #54 Bridgeport Gloucester NJ 8014

Step 2: Let’s make a google cloud storage bucket.

$ gsutil mb gs://ustestbucket

Step 3: Let’s make a big query dataset.

$ bq mk --dataset my-first-project:ustestdataset

Step 4: Make a table within that dataset to match the CSV schema

$ bq mk --table ustestdataset.uscsvtable first_name:STRING,last_name:STRING,company_name:STRING,address:STRING,city:STRING,county:STRING,state:STRING,zip:INTEGER

Note :

1. just don’t give space between the columns while creating a schema.

2. just go to the big Query dataset and verify it is clean and fresh.

Step 5: Let’s write some python code and save it in main.py.

import os
from google.cloud import bigquery
def csv_loader(data, context):
client = bigquery.Client()
dataset_id = os.environ['DATASET']
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [
bigquery.SchemaField('first_name', 'STRING'),
bigquery.SchemaField('last_name', 'STRING'),
bigquery.SchemaField('company_name', 'STRING'),
bigquery.SchemaField('address', 'STRING'),
bigquery.SchemaField('city', 'STRING'),
bigquery.SchemaField('county', 'STRING'),
bigquery.SchemaField('state', 'STRING'),
bigquery.SchemaField('zip', 'INTEGER')
]
job_config.skip_leading_rows = 1
job_config.source_format = bigquery.SourceFormat.CSV
# get the URI for uploaded CSV in GCS from 'data'
uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
# lets do this
load_job = client.load_table_from_uri(
uri,
dataset_ref.table(os.environ['TABLE']),
job_config=job_config)
print('Starting job {}'.format(load_job.job_id))
print('Function=csv_loader, Version=' + os.environ['VERSION'])
print('File: {}'.format(data['name']))
load_job.result() # wait for table load to complete.
print('Job finished.')
destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
print('Loaded {} rows.'.format(destination_table.num_rows))

Step 6: We just don’t want to hardcode things like bucket/table/dataset names in code, let’s create a YAML file, so that will store our deployment-specific configuration in the environment variable.

environment.yaml
BUCKET: ustestbucket
DATASET: ustestdataset
TABLE: uscsvtable
VERSION: v14

Step 7: Let’s create a requirements.txt file for the required imports.

requirements.txt
google-cloud
google-cloud-bigquery

Step 8: Now we would need a .gcloudignore file so that our CSV and YAML file will not be deployed into GCP

.gcloudignore
*csv
*yaml

Step 9: our folder will look like this now.

$ lsenvironment.yaml main.py requirements.txt us-500.csv

Step 10: Now we are ready to deploy the cloud functions, we will add a trigger on the cloud storage bucket that will fire every time a new file added to the bucket. Here we are creating cloud functions named “us_csv_loader”

$ gcloud beta functions deploy us_csv_loader — runtime=python37 — trigger-resource=gs://ustestbucket — trigger-event=google.storage.object.finalize — entry-point=csv_loader — env-vars-file=environment.yaml

Step 11: now, the function deployed, yay! Copy the ustestdata csv into the ustest bucket.

$ gsutil cp ustestdata.csv gs://ustestbucket/Copying file://ustestdata.csv [Content-Type=text/csv]…- [1 files][ 60.4 KiB/ 60.4 KiB]Operation completed over 1 objects/60.4 KiB.

Step 12: now that we have copied the CSV file in the bucket, the function should fire. Check the logs.

$ gcloud functions logs read

Step 13: let’s check the big Query table again to see the row count has changed.

$ bq query --nouse_legacy_sql 'SELECT COUNT(*) FROM `my-first-project.ustestdataset.uscsvtable'

Looks like the rows are updated.

proof of concept: complete!

conclusion: cloud functions are pretty great.

--

--

dharmendra mishra
dharmendra mishra

Written by dharmendra mishra

Data-driven Analytics/Engineering leader with 12+ years of experience in digital advertising company. Skills SQL, Python, Excel and GCP Google Cloud Platform.

No responses yet