Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: googleapis/python-bigquery-sqlalchemy
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.4.15
Choose a base ref
...
head repository: googleapis/python-bigquery-sqlalchemy
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.5.0
Choose a head ref

Commits on May 29, 2020

  1. Copy the full SHA
    1638141 View commit details

Commits on Jun 18, 2020

  1. Merge pull request #57 from romainr/patch-1

    Documenting credentials_info constructor parameter
    mxmzdlv authored Jun 18, 2020
    Copy the full SHA
    9926598 View commit details

Commits on Jun 20, 2020

  1. ISSUE-58: Remove usage of deprecated Client.dataset

    Use TableReference instead.
    dimkonko committed Jun 20, 2020
    Copy the full SHA
    2685218 View commit details

Commits on Jul 15, 2020

  1. Merge pull request #59 from dimkonko/fix_dataset_deprecation_warning

    ISSUE-58: Remove usage of deprecated Client.dataset
    tswast authored Jul 15, 2020
    Copy the full SHA
    2a41ed7 View commit details

Commits on Jul 29, 2020

  1. Copy the full SHA
    f2f3752 View commit details

Commits on Aug 11, 2020

  1. Copy the full SHA
    579ab81 View commit details
  2. Add dialect fixture

    vinceatbluelabs committed Aug 11, 2020
    Copy the full SHA
    5823b69 View commit details
  3. Copy the full SHA
    75e6b6b View commit details
  4. Refactor

    vinceatbluelabs committed Aug 11, 2020
    Copy the full SHA
    a6631e1 View commit details
  5. Refactor

    vinceatbluelabs committed Aug 11, 2020
    Copy the full SHA
    1319c1a View commit details
  6. Copy the full SHA
    908fc5e View commit details
  7. Copy the full SHA
    51b5a53 View commit details
  8. Copy the full SHA
    c372917 View commit details

Commits on Aug 12, 2020

  1. Copy the full SHA
    545d0eb View commit details
  2. Copy the full SHA
    12fb2f3 View commit details

Commits on Sep 6, 2020

  1. Compile array columns.

    jmcarp committed Sep 6, 2020
    Copy the full SHA
    ae2b896 View commit details

Commits on Sep 26, 2020

  1. Copy the full SHA
    2e84599 View commit details
  2. Copy the full SHA
    32acaea View commit details

Commits on Oct 1, 2020

  1. Copy the full SHA
    6665817 View commit details

Commits on Nov 18, 2020

  1. Merge pull request #64 from jmcarp/compile-arrays

    Compile array columns.
    tswast authored Nov 18, 2020
    Copy the full SHA
    db971fb View commit details
  2. Copy the full SHA
    549002d View commit details
  3. add test cases from issue

    tswast committed Nov 18, 2020
    Copy the full SHA
    efd11fe View commit details
  4. Merge pull request #47 from JacobHayes/fix-nested-labels

    Fix visit_label override to handle nested labels
    tswast authored Nov 18, 2020
    Copy the full SHA
    9f47d0b View commit details
  5. Copy the full SHA
    e7135eb View commit details
  6. Merge pull request #62 from dimkonko/ISSUE-60_implement_get_view_names

    ISSUE-60: Implement get_view_names()
    tswast authored Nov 18, 2020
    Copy the full SHA
    8c1450f View commit details
  7. Merge pull request #63 from bluelabsio/project_dataset_table_parsing

    Allow use of schema argument for project and dataset
    tswast authored Nov 18, 2020
    Copy the full SHA
    72fbe2b View commit details
  8. Copy the full SHA
    26dffb4 View commit details
  9. Copy the full SHA
    a1c60ee View commit details
  10. Merge pull request #71 from mvoitko/feature/add-flake8-code-check

    Add Flake8 check plugin to pytest. Fix code style
    tswast authored Nov 18, 2020
    Copy the full SHA
    7bbd70d View commit details
  11. Release 0.5.0

    tswast committed Nov 18, 2020
    Copy the full SHA
    d37595f View commit details
Showing with 289 additions and 74 deletions.
  1. +9 −6 .gitignore
  2. +18 −0 CHANGELOG.md
  3. +7 −4 README.rst
  4. +1 −0 dev_requirements.txt
  5. +21 −12 pybigquery/parse_url.py
  6. +102 −35 pybigquery/sqlalchemy_bigquery.py
  7. +3 −0 scripts/load_test_data.sh
  8. +8 −1 setup.cfg
  9. +2 −1 setup.py
  10. +1 −1 test/conftest.py
  11. +9 −7 test/test_parse_url.py
  12. +108 −7 test/test_sqlalchemy_bigquery.py
15 changes: 9 additions & 6 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
.DS_Store
*.egg
/.env/
/.vscode/
*.pyc
.cache/
*.iml
/pybigquery.egg-info
/dist
/build
/dist
/pybigquery.egg-info
.idea/
.cache/
.pytest_cache/
env
venv/
*.egg
*.iml
*.pyc
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,24 @@

[1]: https://pypi.org/project/pybigquery/#history

## 0.5.0

2020-11-18

### Features

- Support the `ARRAY` data type in generated DDL. ([#64](https://github.com/mxmzdlv/pybigquery/pull/64))
- Support project ID and dataset ID in `schema` argument. ([#63](https://github.com/mxmzdlv/pybigquery/pull/63]))
- Implement `get_view_names()` method. ([#62](https://github.com/mxmzdlv/pybigquery/pull/62), [#60](https://github.com/mxmzdlv/pybigquery/issues/60))

### Bug Fixes

- Ignore no-op nested labels. ([#47](https://github.com/mxmzdlv/pybigquery/pull/47))

### Development

- Use flake8 for code style checks. ([#71](https://github.com/mxmzdlv/pybigquery/pull/71))

## 0.4.15

2020-04-23
11 changes: 7 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
@@ -54,11 +54,14 @@ To specify location of your datasets pass ``location`` to ``create_engine()``:
Table names
___________

To query tables from non-default projects, use the following format for the table name: ``project.dataset.table``, e.g.:
To query tables from non-default projects or datasets, use the following format for the SQLAlchemy schema name: ``[project.]dataset``, e.g.:

.. code-block:: python
sample_table = Table('bigquery-public-data.samples.natality')
# If neither dataset nor project are the default
sample_table_1 = Table('natality', schema='bigquery-public-data.samples')
# If just dataset is not the default
sample_table_2 = Table('natality', schema='bigquery-public-data')
Batch size
__________
@@ -85,7 +88,7 @@ When using a default dataset, don't include the dataset name in the table name,
table = Table('table_name')
Note that specyfing a default dataset doesn't restrict execution of queries to that particular dataset when using raw queries, e.g.:
Note that specifying a default dataset doesn't restrict execution of queries to that particular dataset when using raw queries, e.g.:

.. code-block:: python
@@ -101,7 +104,7 @@ ____________________________

There are many situations where you can't call ``create_engine`` directly, such as when using tools like `Flask SQLAlchemy <http://flask-sqlalchemy.pocoo.org/2.3/>`_. For situations like these, or for situations where you want the ``Client`` to have a `default_query_job_config <https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client>`_, you can pass many arguments in the query of the connection string.

The ``credentials_path``, ``location``, and ``arraysize`` parameters are used by this library, and the rest are used to create a `QueryJobConfig <https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig>`_
The ``credentials_path``, ``credentials_info``, ``location``, and ``arraysize`` parameters are used by this library, and the rest are used to create a `QueryJobConfig <https://googlecloudplatform.github.io/google-cloud-python/latest/bigquery/generated/google.cloud.bigquery.job.QueryJobConfig.html#google.cloud.bigquery.job.QueryJobConfig>`_

Note that if you want to use query strings, it will be more reliable if you use three slashes, so ``'bigquery:///?a=b'`` will work reliably, but ``'bigquery://?a=b'`` might be interpreted as having a "database" of ``?a=b``, depending on the system being used to parse the connection string.

1 change: 1 addition & 0 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -3,4 +3,5 @@ google-cloud-bigquery>=1.6.0
future==0.16.0

pytest==3.2.2
pytest-flake8==1.0.6
pytz==2017.2
33 changes: 21 additions & 12 deletions pybigquery/parse_url.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import re
GROUP_DELIMITER = re.compile(r'\s*\,\s*')
KEY_VALUE_DELIMITER = re.compile(r'\s*\:\s*')

from google.cloud.bigquery import QueryJobConfig
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.job import CreateDisposition, WriteDisposition, QueryPriority, SchemaUpdateOption

from google.cloud.bigquery.table import EncryptionConfiguration, TableReference
from google.cloud.bigquery.dataset import DatasetReference

GROUP_DELIMITER = re.compile(r'\s*\,\s*')
KEY_VALUE_DELIMITER = re.compile(r'\s*\:\s*')


def parse_boolean(bool_string):
bool_string = bool_string.lower()
@@ -17,17 +18,22 @@ def parse_boolean(bool_string):
else:
raise ValueError()

def parse_url(url):

def parse_url(url): # noqa: C901
query = url.query

# use_legacy_sql (legacy)
if 'use_legacy_sql' in query: raise ValueError("legacy sql is not supported by this dialect")
if 'use_legacy_sql' in query:
raise ValueError("legacy sql is not supported by this dialect")
# allow_large_results (legacy)
if 'allow_large_results' in query: raise ValueError("allow_large_results is only allowed for legacy sql, which is not supported by this dialect")
if 'allow_large_results' in query:
raise ValueError("allow_large_results is only allowed for legacy sql, which is not supported by this dialect")
# flatten_results (legacy)
if 'flatten_results' in query: raise ValueError("flatten_results is only allowed for legacy sql, which is not supported by this dialect")
if 'flatten_results' in query:
raise ValueError("flatten_results is only allowed for legacy sql, which is not supported by this dialect")
# maximum_billing_tier (deprecated)
if 'maximum_billing_tier' in query: raise ValueError("maximum_billing_tier is a deprecated argument")
if 'maximum_billing_tier' in query:
raise ValueError("maximum_billing_tier is a deprecated argument")

project_id = url.host
location = None
@@ -77,7 +83,8 @@ def parse_url(url):

# default_dataset
if 'default_dataset' in query or 'dataset_id' in query or 'project_id' in query:
raise ValueError("don't pass default_dataset, dataset_id, project_id in url query, instead use the url host and database")
raise ValueError(
"don't pass default_dataset, dataset_id, project_id in url query, instead use the url host and database")

# destination
if 'destination' in query:
@@ -88,13 +95,15 @@ def parse_url(url):
try:
dest_project, dest_dataset, dest_table = query['destination'].split('.')
except ValueError:
raise ValueError("url query destination parameter should be fully qualified with project, dataset, and table")
raise ValueError(
"url query destination parameter should be fully qualified with project, dataset, and table")

job_config.destination = TableReference(DatasetReference(dest_project, dest_dataset), dest_table)

# destination_encryption_configuration
if 'destination_encryption_configuration' in query:
job_config.destination_encryption_configuration = EncryptionConfiguration(query['destination_encryption_configuration'])
job_config.destination_encryption_configuration = EncryptionConfiguration(
query['destination_encryption_configuration'])

# dry_run
if 'dry_run' in query:
137 changes: 102 additions & 35 deletions pybigquery/sqlalchemy_bigquery.py
Original file line number Diff line number Diff line change
@@ -3,12 +3,13 @@
from __future__ import absolute_import
from __future__ import unicode_literals

import operator

from google import auth
from google.cloud import bigquery
from google.cloud.bigquery import dbapi, QueryJobConfig
from google.cloud.bigquery import dbapi
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import EncryptionConfiguration
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.table import TableReference
from google.oauth2 import service_account
from google.api_core.exceptions import NotFound
from sqlalchemy.exc import NoSuchTableError
@@ -91,6 +92,7 @@ def format_label(self, label, name=None):
result = self.quote(name)
return result


_type_map = {
'STRING': types.String,
'BOOLEAN': types.Boolean,
@@ -184,12 +186,19 @@ def visit_column(self, column, add_to_result_map=None,
self.preparer.quote(tablename) + \
"." + name

def visit_label(self, *args, **kwargs):
# Use labels in GROUP BY clause
if len(kwargs) == 0 or len(kwargs) == 1:
def visit_label(self, *args, within_group_by=False, **kwargs):
# Use labels in GROUP BY clause.
#
# Flag set in the group_by_clause method. Works around missing
# equivalent to supports_simple_order_by_label for group by.
if within_group_by:
kwargs['render_label_as_label'] = args[0]
result = super(BigQueryCompiler, self).visit_label(*args, **kwargs)
return result
return super(BigQueryCompiler, self).visit_label(*args, **kwargs)

def group_by_clause(self, select, **kw):
return super(BigQueryCompiler, self).group_by_clause(
select, **kw, within_group_by=True
)


class BigQueryTypeCompiler(GenericTypeCompiler):
@@ -206,6 +215,9 @@ def visit_text(self, type_, **kw):
def visit_string(self, type_, **kw):
return 'STRING'

def visit_ARRAY(self, type_, **kw):
return "ARRAY<{}>".format(self.process(type_.item_type, **kw))

def visit_BINARY(self, type_, **kw):
return 'BYTES'

@@ -284,6 +296,11 @@ def __init__(
def dbapi(cls):
return dbapi

@staticmethod
def _build_formatted_table_id(table):
"""Build '<dataset_id>.<table_id>' string using given table."""
return "{}.{}".format(table.reference.dataset_id, table.table_id)

@staticmethod
def _add_default_dataset_to_job_config(job_config, project_id, dataset_id):
# If dataset_id is set, then we know the job_config isn't None
@@ -294,7 +311,6 @@ def _add_default_dataset_to_job_config(job_config, project_id, dataset_id):

job_config.default_dataset = '{}.{}'.format(project_id, dataset_id)


def _create_client_from_credentials(self, credentials, default_query_job_config, project_id):
if project_id is None:
project_id = credentials.project_id
@@ -350,7 +366,28 @@ def _json_deserializer(self, row):
"""
return row

def _split_table_name(self, full_table_name):
def _get_table_or_view_names(self, connection, table_type, schema=None):
current_schema = schema or self.dataset_id
get_table_name = self._build_formatted_table_id \
if self.dataset_id is None else \
operator.attrgetter("table_id")

client = connection.connection._client
datasets = client.list_datasets()

result = []
for dataset in datasets:
if current_schema is not None and current_schema != dataset.dataset_id:
continue

tables = client.list_tables(dataset.reference)
for table in tables:
if table_type == table.table_type:
result.append(get_table_name(table))
return result

@staticmethod
def _split_table_name(full_table_name):
# Split full_table_name to get project, dataset and table name
dataset = None
table_name = None
@@ -363,26 +400,66 @@ def _split_table_name(self, full_table_name):
dataset, table_name = table_name_split
elif len(table_name_split) == 3:
project, dataset, table_name = table_name_split
else:
raise ValueError("Did not understand table_name: {}".format(full_table_name))

return (project, dataset, table_name)

def _table_reference(self, provided_schema_name, provided_table_name,
client_project):
project_id_from_table, dataset_id_from_table, table_id = self._split_table_name(provided_table_name)
project_id_from_schema = None
dataset_id_from_schema = None
if provided_schema_name is not None:
provided_schema_name_split = provided_schema_name.split('.')
if len(provided_schema_name_split) == 0:
pass
elif len(provided_schema_name_split) == 1:
if dataset_id_from_table:
project_id_from_schema = provided_schema_name_split[0]
else:
dataset_id_from_schema = provided_schema_name_split[0]
elif len(provided_schema_name_split) == 2:
project_id_from_schema = provided_schema_name_split[0]
dataset_id_from_schema = provided_schema_name_split[1]
else:
raise ValueError("Did not understand schema: {}".format(provided_schema_name))
if (dataset_id_from_schema and dataset_id_from_table and
dataset_id_from_schema != dataset_id_from_table):
raise ValueError(
"dataset_id specified in schema and table_name disagree: "
"got {} in schema, and {} in table_name".format(
dataset_id_from_schema, dataset_id_from_table
)
)
if (project_id_from_schema and project_id_from_table and
project_id_from_schema != project_id_from_table):
raise ValueError(
"project_id specified in schema and table_name disagree: "
"got {} in schema, and {} in table_name".format(
project_id_from_schema, project_id_from_table
)
)
project_id = project_id_from_schema or project_id_from_table or client_project
dataset_id = dataset_id_from_schema or dataset_id_from_table or self.dataset_id

table_ref = TableReference.from_string("{}.{}.{}".format(
project_id, dataset_id, table_id
))
return table_ref

def _get_table(self, connection, table_name, schema=None):
if isinstance(connection, Engine):
connection = connection.connect()

project, dataset, table_name_prepared = self._split_table_name(table_name)
if dataset is None:
if schema is not None:
dataset = schema
elif self.dataset_id:
dataset = self.dataset_id
client = connection.connection._client

table = connection.connection._client.dataset(dataset, project=project).table(table_name_prepared)
table_ref = self._table_reference(schema, table_name, client.project)
try:
t = connection.connection._client.get_table(table)
except NotFound as e:
table = client.get_table(table_ref)
except NotFound:
raise NoSuchTableError(table_name)
return t
return table

def has_table(self, connection, table_name, schema=None):
try:
@@ -463,23 +540,13 @@ def get_table_names(self, connection, schema=None, **kw):
if isinstance(connection, Engine):
connection = connection.connect()

datasets = connection.connection._client.list_datasets()
result = []
for d in datasets:
if schema is not None and d.dataset_id != schema:
continue
return self._get_table_or_view_names(connection, "TABLE", schema)

if self.dataset_id is not None and d.dataset_id != self.dataset_id:
continue
def get_view_names(self, connection, schema=None, **kw):
if isinstance(connection, Engine):
connection = connection.connect()

tables = connection.connection._client.list_tables(d.reference)
for t in tables:
if self.dataset_id is None:
table_name = d.dataset_id + '.' + t.table_id
else:
table_name = t.table_id
result.append(table_name)
return result
return self._get_table_or_view_names(connection, "VIEW", schema)

def do_rollback(self, dbapi_connection):
# BigQuery has no support for transactions.
3 changes: 3 additions & 0 deletions scripts/load_test_data.sh
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ bq rm -f -t test_pybigquery.sample
bq rm -f -t test_pybigquery_alt.sample_alt
bq rm -f -t test_pybigquery.sample_one_row
bq rm -f -t test_pybigquery.sample_dml
bq rm -f -t test_pybigquery.sample_view
bq rm -f -t test_pybigquery_location.sample_one_row

bq mk --table --schema=$(dirname $0)/schema.json --time_partitioning_field timestamp --clustering_fields integer,string test_pybigquery.sample
@@ -17,3 +18,5 @@ bq load --source_format=NEWLINE_DELIMITED_JSON --schema=$(dirname $0)/schema.jso

bq --location=asia-northeast1 load --source_format=NEWLINE_DELIMITED_JSON --schema=$(dirname $0)/schema.json test_pybigquery_location.sample_one_row $(dirname $0)/sample_one_row.json
bq mk --schema=$(dirname $0)/schema.json -t test_pybigquery.sample_dml

bq mk --use_legacy_sql=false --view 'SELECT string FROM test_pybigquery.sample' test_pybigquery.sample_view
Loading