A common use case for the GA360 BigQuery export is to schedule a job to create a series of (flat) aggregate tables. These tables are often easier for users to consume due to their smaller size and complexity relative to the main table.
This guide provides an example method to regularly schedule a query and create a table with the output in Google BigQuery. This document describes the process for setting up a solution using a GCP Virtual Machine. In practice, it would also be possible to implement a solution on a local machine if desired.
Setup the Virtual Machine
During the process of creating a Virtual Machine, ensure that you select the Allow full access to all Cloud APIs checkbox.
Setup basic libraries
Get pip and install some basic libraries
username@project-vm-us:~$ sudo pip install --upgrade requests
username@project-vm-us:~$ sudo pip install --upgrade httplib2
username@project-vm-us:~$ sudo pip install --upgrade google-cloud-bigquery
Create Folders
Authorize
- Create an auth file for the APIs
username@project-vm-us:~$ nano auth.py
The contents of the file should be as follows:
auth.py
#!/usr/bin/python2.7
'''Handles credentials and authorization.
This module is used by the sample scripts to handle credentials and
generating authorized clients. The module can also be run directly
to print out the HTTP authorization header for use in curl commands.
Running:
python auth.py
will print the header to stdout.
'''
import httplib2
import google.auth
from google.auth.transport.requests import AuthorizedSession
def get_creds():
credentials, project = google.auth.default()
if not credentials.token:
authed_session = AuthorizedSession(credentials)
credentials.refresh(google.auth.transport.requests.Request())
return credentials
def print_creds(credentials):
'''Prints the authorization header to use in HTTP requests.'''
print 'Authorization: Bearer %s' % (credentials.token,)
def main():
print_creds(get_creds())
if __name__ == "__main__":
main() - Run the auth script
username@project-vm-us:~$ python auth.py
Create Query Script
import time
def wait_for_job(job):
while True:
job.reload() # Refreshes the state via a GET request.
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.error_result)
return
time.sleep(10)
def example_query():
query_id = str(int(time.time()))
print 'Starting example query query with ID: ' + query_id
client = bigquery.Client()
query = """\
INSERT QUERY HERE
"""
dataset = client.dataset('datasetName')
table = dataset.table(name='destination_table_name')
job = client.run_async_query('your-job-name-' + query_id, query)
job.destination = table
job.write_disposition = 'WRITE_TRUNCATE'
job.begin()
wait_for_job(job)
if __name__ == '__main__':
example_query()
Create script to execute query
python /home/username/schedule-query/run-query.py >> /home/username/schedule-query/log.txt 2>&1
Schedule the query
Here is a sample crontab to execute a script at 19.05 each day.
05 19 * * * sh /home/username/schedule-query/run-query.sh