BigQuery Walkthrough¶
NStack offers Google Cloud BigQuery integration which you use by extending a built-in BigQuery module with your own data types, SQL, and credentials.
This walkthrough will show you how to use this module to upload data, download data, delete tables, and run queries.
Supported Operations¶
There are four interactions you can have with BigQuery, which are exposed as NStack functions
Function | Description |
---|---|
runQuery |
Execute an SQL query on BigQuery |
downloadData |
Download all the rows of data from a table as a single batch |
streamData |
Stream rows of data from a table with a configurable batch size |
uploadData |
Upload rows of data to a table |
dropTable |
Delete a table from BigQery |
How To¶
Init a new BigQuery Module¶
BigQuery exists as a Framework
Module within NStack.
Framework modules contain pre-built functions,
but require you to add your own files,
configuration,
and type signatures.
In this case, it is our credentials,
SQL files,
and the type signatures of the data we are uploading or downloading.
Note
Learn more about Framework Modules
To use it, we nstack init
a new module
and change it to use the BigQuery module as its parent.
> mkdir CustomerTable; cd CustomerTable
> nstack init --framework nstack/BigQuery:0.2.0
The framework
parameter to the init command here
sets the parent framework module to be BigQuery,
rather than the default Python image.
Add your credentials¶
To interact with BigQuery, the module must be able to authenticate with the BigQuery servers and therefore must have a copy of valid BigQuery credentials.
To add your credentials, you generate them from Google Auth in JSON format.
Note
See https://cloud.google.com/bigquery/authentication for details on how to generate json credentials
Then place them in a file in the module directory, e.g. credentials.json
.
{
"type": "service_account",
"project_id": "fake-project-462733",
"private_key_id": "5eb41c28aede90da40529221be4ac85f514134ba",
"private_key": "-----BEGIN PRIVATE KEY-----...private key here...-----END PRIVATE KEY-----\n",
"client_email": "my-user@fake-project-462733.iam.gserviceaccount.com",
"client_id": "981722325615647221019",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/tmg-user%40fake-project-462733.iam.gserviceaccount.com"
}
Then add this file to the files
list in the nstack.yaml
build file:
files:
- credentials.json
Doing this this will include the file in the module. When we use our BigQuery module in a workflow, we will tell the module to use this file by specifying it as a configuration parameter.
Add your SQL¶
Similarly, if we’re going to use runQuery
to execute SQL,
we include our SQL script in the module files list in the same way.
Note
If you are only using downloadData
, uploadData
or dropTable
, you do not need to include this file as you are not executing any SQL.
If your SQL query lives in example_query_job.sql
, copy that file into your module directory,
and add it to the files list (which already includes your credentials):
files:
- credentials.json
- example_query_job.sql
Define your types and declare your function¶
If you’re uploading data from NStack or downloading data into NStack, you declare the types of data your function will take or return. This is either the data you are going to be uploading, or the data you expect to download from the table. Either way, the type signature should match the Database Schema.
E.g. if you have a table with the following columns:
---------------------------------------------------------------------------------------
| CustomerName VarChar | CustomerAddress VarChar | CustomerId Int64 | CountryId Int64 |
---------------------------------------------------------------------------------------
Then you define a Customer
type in you module’s module.nml
as follows:
type Customer = {
name : Text,
address: Text,
id : Int,
countryId : Int
}
Note
The fields must be in the correct order to match the DB table. The names do not need to match, and if you misorder two or more fields - but the types still match - then you will get results containing the wrong fields
Once you have the type declared, you can then declare the BigQuery action you wish to take as an NStack function.
Create a module.nml
file and add in the boilerplate module CustomerTable:0.0.1-SNAPSHOT where
.
Next you must write a function definition for one or more of the
runQuery
, downloadData
or uploadData
functions that exist in the BigQuery parent image.
If downloading or uploading,
you declare them to use a list of the data type you just declared
as input or output.
For instance, to upload a list of customer records to a table:
uploadData : [Customer] -> ()
Download a table as a list of customer records:
downloadData : () -> [Customer]
Stream the list of customer records with a configurable batch size:
streamData : () -> [Customer]
Execute a single SQL query:
runQuery : () -> ()
Delete a table
dropTable : () -> ()
Build your module¶
Once the previous steps have been completed,
you can build your module as normal using nstack build
.
If you run nstack list functions
you should see your new functions listed there:
nstack/CustomerTable:0.0.1-SNAPSHOT
downloadData :: () -> [Customer]
Configure and Run¶
Now that your module is registered with the server, you can use the functions in workflows like any other function.
The BigQuery module takes a number of configuration parameters to allow you to configure it correctly for working with your particular BigQuery project
All BigQuery functions need the following configuration parameters supplied:
Configuration | Description |
---|---|
bq_credentials_file |
Path to the credentials file used to authenticate with BigQuery. |
bq_project |
Name of the BigQuery Project to use |
bq_dataset |
Name of the BigQuery Dataset in the above project to use |
The uploadData
, downloadData
, streamData
, and dropTable
functions also need the following parameter:
Configuration | Description |
---|---|
bq_table |
Name of the table to upload to, download from, or delete, respectively. |
The streamData
function needs the following parameter
Configuration | Description |
---|---|
bq_batch_size |
Batch size when streaming from table (1000-10000 recommended). |
The runQuery
function needs the following parameters
Configuration | Description |
---|---|
bq_query_file |
SQL query to execute. |
bq_query_dest |
Table to store the results of the sql query. |
The following parameters may be used when using runQuery
,
but are optional and can be ommitted if unneeded.
Configuration | Description |
---|---|
bq_maximum_billing_Tier |
Maximum billing tier if not default, must be an integer |
bq_use_legacy_sql |
Boolean flag to use legacy bigquery SQL format, rather than standard SQL. Should be “Yes”, “No”, “True” or “False” |
For instance, to expose a database uploader as an HTTP endpoint, you might do the following:
def upload = CustomerTable.uploadData {
bq_credentials_file = "credentials.json",
bq_project = "AcmeCorp",
bq_dataset = "AcmeCorpSales"
bq_table = "CustomerTable",
}
def workflow = Sources.http<[Customer]> { http_path = "/addCustomers" } | upload | Sinks.log<()>
Or to run a query on a given schedule:
def query = CustomerTable.runQuery {
bq_credentials_file = "credentials.json",
bq_project = "AcmeCorp",
bq_dataset = "AcmeCorpSales"
bq_query_file = "SalesQuery.sql",
bq_query_dst = "SalesAnalysisResults"
}
def workflow = Sources.schedule<()> { cron = "* * * * * *" } | query | Sinks.log<()>
Template Configuration¶
The BigQuery module supports using Jinja2 templates inside of its configuration parameters and in the SQL queries it executes.
This allows you to build more flexible functions that can cover a wider range of behaviors.
Note
For full details on Jinja2 templates, see http://jinja.pocoo.org/docs/2.9/templates/
The syntax you will use most is the standard expression template, which uses double curly braces:
prefix_{{ some.template.expression }}_suffix
Here the expression in curly braces will be evalated and replaced with its result.
The Jinja2 templates are evaluated in a sandbox for security reasons, so you do not have access to the full python standard library.
However, date and time functionality is exposed from the datetime
package
and can be accessed through the
date
, time
, datetime
and timedelta
variables.
E.g. to specify a target table for a query based on todays date, you can use
runQuery { bq_query_dest = "MyTablePrefix_{{ date.today().strftime('%Y%m%d') }}" }
On the 6th of July 2017, this would write to a table called MyTablePrefix_20170706
.
These value are evaluated every time the function processes a message, so if you keep the workflow running and send events to the function over multiple days you will write to a different table each time.
Note
For Python datetime formatting help, see: https://docs.python.org/2/library/datetime.html
In the SQL query itself, you have access to the same date and time functionality, including calculing offsets via timedelta.
E.g. to query last weeks table:
SELECT * FROM MyTablePrefix_{{ (date.today() - timedelta(days=7)).strftime('%Y%m%d') }} LIMIT 1000
In the SQL, you can also refer to the function configuration parameters
(as defined in your workflow DSL)
under a config
object.
E.g. to access a parameter named source_table
, you can write:
SELECT * FROM MyTablePrefix_{{ config.source_table }} LIMIT 1000
and then specify it in the DSL:
runQuery { source_table = "SomeTable" }
Note
You can add as many config parameters to a function as you like, even if they’re not normally used by the function