Create a service account to use the BigQuery API in advance and download the credentials (JSON). https://cloud.google.com/docs/authentication/getting-started?hl=ja
Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"
[requires]
python_version = "3.8"
migrate_table.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
For [PATH TO CREDENTIAL], specify the JSON path of the credential.
migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
#Delete existing table
client.delete_table(table, not_found_ok=True)
#Creating a table
schema = [
bigquery.SchemaField('id', 'INT64'),
bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))
For [DATASET NAME] and [TABLE NAME], specify the data set name and table name to create.
If the not_found_ok flag is False when deleting an existing table, an error will occur if the table does not exist, so leave it as True. An image like DROP TABLE IF EXISTS in DDL.
When creating a table, you need to specify the column name and type as the schema definition. By default, it is a Nullable column, so if you want to make it a Not Null column, specify the mode.
bigquery.SchemaField('id', 'INT64', mode='REQUIRED')
will do.
Import CSV data as initial data. Prepare the data according to the schema definition type in CSV in advance.
import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
#Import initial data
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()
If you want to skip the CSV header, you can specify the number of skipped rows with skip_leading_rows.
migrate_view.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=credentials.project_id,
)
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')
client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)
In the case of a view, the flow is almost the same as that of an entity table.
The difference is that in the case of a view, specify the SQL query directly in view_query.
The schema definition is automatically constructed from SQL queries and does not need to be specified.
When you want to change the execution query and scheduling settings in a scene where data is regularly refreshed using Scheduled Query.
migrate_schedule.py
import json
from google.cloud import bigquery
from google.oauth2 import service_account
from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format
credentials = service_account.Credentials.from_service_account_file(
'[PATH TO CREDENTIAL]',
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery_datatransfer_v1.DataTransferServiceClient(
credentials=credentials
)
config = google.protobuf.json_format.ParseDict(
{
"name": '[RESOURCE NAME]',
"destination_dataset_id": '[DATASET NAME]',
"display_name": '[DISPLAY NAME]',
"data_source_id": "scheduled_query",
"params": {
"query": "SELECT * FROM dataset_name.table_name",
"destination_table_name_template": '[TABLE NAME]',
"write_disposition": "WRITE_TRUNCATE",
"partitioning_field": "",
},
"schedule": "every 24 hours",
},
bigquery_datatransfer_v1.types.TransferConfig(),
)
update_mask = {"paths": ["display_name", "params", "schedule"]}
response = client.update_transfer_config(
config, update_mask
)
I will pass the schedule settings (config) and update mask (ʻupdate_mask) to ʻupdate_transfer_config.
ʻUpdate_mask specifies which field to update in config. Settings that are not included in ʻupdate_mask will not be updated.
The details of the setting value of config are as follows.
name
Specify the resource name of the schedule.
The resource name is displayed in a list of schedules from "Scheduled Query" in the BigQuery console, so you can check it from the configuration information of the corresponding schedule.

destination_dataset_id Specify the name of the data set to save the query results executed in the schedule.
display_name Since it is the display name of the schedule, give it any name.
params.query Specifies the query to execute.
params.destination_table_name_template Specify the name of the table where the query results executed in the schedule are saved.
params.write_disposition
Specifies how to save to the table.
If you specify WRITE_TRUNCATE, the existing table is replaced with the result of the execute query.
If WRITE_APPEND is specified, the result of the execute query will be added to the existing table.
schedule Specify the execution timing of the schedule. ex. Run every hour ... every 1 hour Run at midnight every day ... every day 00:00
When using BigQuery as a data mart, it is difficult to manage changes if you create a table from the console, so it is recommended that you can manage it with git if you drop it in the code.
Recommended Posts