Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: map "if_exists" value to LoadJobConfig.WriteDisposition #583

Merged
merged 61 commits into from Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 58 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
70927c5
feat: add WriteDispositon to to_gbq
aribray Oct 21, 2022
15de9f3
pass test_load unit tests
aribray Oct 25, 2022
c44f1a4
nox -s blacken
aribray Oct 25, 2022
bc8faf5
test write_disposition parameter
aribray Oct 25, 2022
c3ed7a7
refactor tests for WriteDisposition
aribray Oct 26, 2022
936113e
remove unused import
aribray Oct 26, 2022
8613458
update minimum google-auth version to 2.13.0
aribray Oct 26, 2022
bc09929
update constraints-3.7.txt
aribray Oct 26, 2022
f7394bb
bump google-api-core version to 2.10.2 and google-auth-oauthlib to 0.7.0
aribray Oct 26, 2022
0d45665
remove version constraints from google-api-core
aribray Oct 26, 2022
0e4be09
bump google-cloud-bigquery version to 3.3.5, google-cloud-bigquery-st…
aribray Oct 26, 2022
57c0bf9
bump pandas version, bump db-dbtypes version
aribray Oct 26, 2022
16411c9
fix pandas version
aribray Oct 26, 2022
cb651d2
resolve dependency conflicts
aribray Oct 26, 2022
81736e2
bump dbtypes version
aribray Oct 26, 2022
a006ac0
bump circleci pandas version
aribray Oct 26, 2022
1fa654e
rename conda requirements file
aribray Oct 26, 2022
0407ae5
resetcircleci config pandas version
aribray Oct 26, 2022
3680702
reset pandas version
aribray Oct 26, 2022
846a44d
readjust constraints-3.7 versions
aribray Oct 26, 2022
f44704b
test adding to kokoro
aribray Oct 27, 2022
f3ced37
test removing circleci
aribray Oct 27, 2022
ad45c6d
test removing circleci
aribray Oct 27, 2022
6c771d9
Revert "test removing circleci"
aribray Oct 27, 2022
5f23bd0
add system tests to github workflow
aribray Oct 27, 2022
0a96c98
refactor to_gbq to map 'if_exists' to write_disposition
aribray Oct 27, 2022
8f92fef
fix docstring
aribray Oct 27, 2022
77b2d94
fix docstring
aribray Oct 27, 2022
7b3efc1
fix conda requirements
aribray Oct 27, 2022
3fc748d
remove system tests from github workflow
aribray Oct 27, 2022
0bdef40
adjust circle ci dependencies
aribray Oct 27, 2022
972ce57
drop versions for circleci build
aribray Oct 27, 2022
18b0273
add circleci fixture back to conftest.py
aribray Oct 27, 2022
43731c2
match versions
aribray Oct 27, 2022
4eccfc9
match versions
aribray Oct 27, 2022
4bf2a1c
reset dependency versions
aribray Oct 27, 2022
e664423
reset dependency versions
aribray Oct 27, 2022
02ebb1f
reset dependency versions
aribray Oct 27, 2022
591cb85
reset dependency versions
aribray Oct 27, 2022
78b4687
bump pydata-google-auth version
aribray Oct 27, 2022
86cb721
try pinning dependencies in circleci requirements
aribray Oct 27, 2022
96d2f83
unpin dependencies
aribray Oct 27, 2022
e7c4335
try adding dpendency
aribray Oct 27, 2022
fc1e91a
reset requirements.conda
aribray Oct 27, 2022
ea71b8b
add oauthlib dependency
aribray Oct 27, 2022
d66da9a
pin pydata google auth version
aribray Oct 27, 2022
3b006c5
add google-auth dependency to circleci requirements
aribray Oct 28, 2022
ed04c3c
unpin dependency
aribray Oct 28, 2022
bda78e8
use requirements.txt created by pip-compile
aribray Oct 28, 2022
f5dd1e7
pin dependencies in circleci requirements
aribray Oct 28, 2022
6527fd7
unpin dependencies in CircleCI requrements
aribray Oct 30, 2022
330e34d
repin some dependencies
aribray Oct 30, 2022
e5eb48c
remove quiet flag
aribray Oct 30, 2022
bc2253f
unpin dependencies in CircleCI requrements
aribray Oct 30, 2022
c47aa5f
bump pandas version
aribray Oct 30, 2022
3a813a0
drop pandas version, fix package name
aribray Oct 30, 2022
5c26e43
fix linting, add test
aribray Oct 30, 2022
fba8921
rename tests
aribray Oct 30, 2022
ac61ee4
formatting
aribray Oct 30, 2022
6664df6
add newline
aribray Oct 30, 2022
06ee43c
formatting
aribray Oct 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 9 additions & 6 deletions ci/requirements-3.7-0.24.2.conda
@@ -1,14 +1,17 @@
codecov
aribray marked this conversation as resolved.
Show resolved Hide resolved
coverage
db-dtypes==0.3.1
db-dtypes
fastavro
flake8
freezegun
numpy==1.16.6
google-cloud-bigquery==1.27.2
google-cloud-bigquery-storage==1.1.0
pyarrow==3.0.0
numpy
google-api-core
google-auth
google-cloud-bigquery
google-cloud-bigquery-storage
pyarrow
pydata-google-auth
pytest
pytest-cov
tqdm==4.23.0
requests-oauthlib
tqdm
101 changes: 41 additions & 60 deletions pandas_gbq/gbq.py
Expand Up @@ -20,10 +20,7 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

from pandas_gbq.exceptions import (
AccessDenied,
GenericGBQException,
)
from pandas_gbq.exceptions import AccessDenied, GenericGBQException
from pandas_gbq.features import FEATURES
import pandas_gbq.schema
import pandas_gbq.timestamp
Expand Down Expand Up @@ -116,20 +113,12 @@ class InvalidSchema(ValueError):
table in BigQuery.
"""

def __init__(
self, message: str, local_schema: Dict[str, Any], remote_schema: Dict[str, Any]
):
super().__init__(message)
self._local_schema = local_schema
self._remote_schema = remote_schema

@property
def local_schema(self) -> Dict[str, Any]:
return self._local_schema
def __init__(self, message: str):
self._message = message

@property
def remote_schema(self) -> Dict[str, Any]:
return self._remote_schema
def message(self) -> str:
return self._message


class NotFoundException(ValueError):
Expand All @@ -155,7 +144,12 @@ class TableCreationError(ValueError):
Raised when the create table method fails
"""

pass
def __init__(self, message: str):
self._message = message

@property
def message(self) -> str:
return self._message


class Context(object):
Expand Down Expand Up @@ -379,11 +373,16 @@ def get_client(self):
def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
raise QueryTimeout("Reason: {0}".format(ex))

raise GenericGBQException("Reason: {0}".format(ex))
elif "Provided Schema does not match" in ex.message:
error_message = ex.errors[0]["message"]
raise InvalidSchema(f"Reason: {error_message}")
elif "Already Exists: Table" in ex.message:
error_message = ex.errors[0]["message"]
raise TableCreationError(f"Reason: {error_message}")
else:
raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
self,
Expand Down Expand Up @@ -577,6 +576,7 @@ def load_data(
self,
dataframe,
destination_table_ref,
write_disposition,
chunksize=None,
schema=None,
progress_bar=True,
Expand All @@ -586,7 +586,6 @@ def load_data(
from pandas_gbq import load

total_rows = len(dataframe)

try:
chunks = load.load_chunks(
self.client,
Expand All @@ -596,6 +595,7 @@ def load_data(
schema=schema,
location=self.location,
api_method=api_method,
write_disposition=write_disposition,
billing_project=billing_project,
)
if progress_bar and tqdm:
Expand All @@ -609,11 +609,6 @@ def load_data(
except self.http_error as ex:
self.process_http_error(ex)

def delete_and_recreate_table(self, project_id, dataset_id, table_id, table_schema):
table = _Table(project_id, dataset_id, credentials=self.credentials)
table.delete(table_id)
table.create(table_id, table_schema)


def _bqschema_to_nullsafe_dtypes(schema_fields):
"""Specify explicit dtypes based on BigQuery schema.
Expand Down Expand Up @@ -975,11 +970,9 @@ def to_gbq(
):
"""Write a DataFrame to a Google BigQuery table.

The main method a user calls to export pandas DataFrame contents to
Google BigQuery table.
The main method a user calls to export pandas DataFrame contents to Google BigQuery table.

This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here
<https://googleapis.dev/python/bigquery/latest/index.html>`__.

See the :ref:`How to authenticate with Google BigQuery <authentication>`
Expand Down Expand Up @@ -1114,15 +1107,21 @@ def to_gbq(
stacklevel=2,
)

if if_exists not in ("fail", "replace", "append"):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if "." not in destination_table:
raise NotFoundException(
"Invalid Table Name. Should be of the form 'datasetId.tableId' or "
"'projectId.datasetId.tableId'"
)

if if_exists not in ("fail", "replace", "append"):
aribray marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if_exists_list = ["fail", "replace", "append"]
dispositions = ["WRITE_EMPTY", "WRITE_TRUNCATE", "WRITE_APPEND"]
aribray marked this conversation as resolved.
Show resolved Hide resolved
dispositions_dict = dict(zip(if_exists_list, dispositions))

write_disposition = dispositions_dict[if_exists]

connector = GbqConnector(
project_id,
reauth=reauth,
Expand All @@ -1142,17 +1141,20 @@ def to_gbq(
table_id = destination_table_ref.table_id

default_schema = _generate_bq_schema(dataframe)
# If table_schema isn't provided, we'll create one for you
if not table_schema:
table_schema = default_schema
# It table_schema is provided, we'll update the default_schema to the provided table_schema
else:
table_schema = pandas_gbq.schema.update_schema(
default_schema, dict(fields=table_schema)
)

# If table exists, check if_exists parameter
try:
# Try to get the table
table = bqclient.get_table(destination_table_ref)
except google_exceptions.NotFound:
# If the table doesn't already exist, create it
table_connector = _Table(
project_id_table,
dataset_id,
Expand All @@ -1161,34 +1163,12 @@ def to_gbq(
)
table_connector.create(table_id, table_schema)
else:
# Convert original schema (the schema that already exists) to pandas-gbq API format
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)

if if_exists == "fail":
raise TableCreationError(
"Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"'append' or 'replace' data."
)
elif if_exists == "replace":
connector.delete_and_recreate_table(
project_id_table, dataset_id, table_id, table_schema
)
else:
if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema):
raise InvalidSchema(
"Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.",
table_schema,
original_schema,
)

# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(table_schema, original_schema)

if dataframe.empty:
# Create the table (if needed), but don't try to run a load job with an
Expand All @@ -1198,6 +1178,7 @@ def to_gbq(
connector.load_data(
dataframe,
destination_table_ref,
write_disposition=write_disposition,
chunksize=chunksize,
schema=table_schema,
progress_bar=progress_bar,
Expand Down
20 changes: 13 additions & 7 deletions pandas_gbq/load.py
Expand Up @@ -113,20 +113,19 @@ def load_parquet(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
schema: Optional[Dict[str, Any]],
billing_project: Optional[str] = None,
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.create_disposition = "CREATE_NEVER"
aribray marked this conversation as resolved.
Show resolved Hide resolved
job_config.write_disposition = write_disposition
job_config.source_format = "PARQUET"

if schema is not None:
schema = pandas_gbq.schema.remove_policy_tags(schema)
job_config.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
dataframe = cast_dataframe_for_parquet(dataframe, schema)

try:
client.load_table_from_dataframe(
dataframe,
Expand All @@ -143,13 +142,14 @@ def load_parquet(

def load_csv(
dataframe: pandas.DataFrame,
write_disposition: str,
chunksize: Optional[int],
bq_schema: Optional[List[bigquery.SchemaField]],
load_chunk: Callable,
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.create_disposition = "CREATE_NEVER"

job_config.write_disposition = write_disposition
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

Expand All @@ -167,6 +167,7 @@ def load_csv_from_dataframe(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
Expand All @@ -187,13 +188,14 @@ def load_chunk(chunk, job_config):
project=billing_project,
).result()

return load_csv(dataframe, chunksize, bq_schema, load_chunk)
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)


def load_csv_from_file(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
Expand Down Expand Up @@ -223,7 +225,7 @@ def load_chunk(chunk, job_config):
finally:
chunk_buffer.close()

return load_csv(dataframe, chunksize, bq_schema, load_chunk)
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)


def load_chunks(
Expand All @@ -234,13 +236,15 @@ def load_chunks(
schema=None,
location=None,
api_method="load_parquet",
write_disposition="WRITE_EMPTY",
aribray marked this conversation as resolved.
Show resolved Hide resolved
billing_project: Optional[str] = None,
):
if api_method == "load_parquet":
load_parquet(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
schema,
billing_project=billing_project,
Expand All @@ -253,6 +257,7 @@ def load_chunks(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
chunksize,
schema,
Expand All @@ -263,6 +268,7 @@ def load_chunks(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
chunksize,
schema,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -2,4 +2,4 @@ pandas
google-auth
google-auth-oauthlib
google-cloud-bigquery
tqdm
tqdm
16 changes: 8 additions & 8 deletions setup.py
Expand Up @@ -23,24 +23,24 @@
release_status = "Development Status :: 4 - Beta"
dependencies = [
"setuptools",
"db-dtypes >=0.3.1,<2.0.0",
"db-dtypes >=1.0.4,<2.0.0",
"numpy >=1.16.6",
"pandas >=0.24.2",
"pandas >=1.1.4",
"pyarrow >=3.0.0, <10.0dev",
"pydata-google-auth",
"pydata-google-auth >=1.4.0",
# Note: google-api-core and google-auth are also included via transitive
# dependency on google-cloud-bigquery, but this library also uses them
# directly.
"google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0",
"google-auth >=1.25.0",
"google-auth-oauthlib >=0.0.1",
"google-api-core >= 2.10.2, <3.0.0dev",
"google-auth >=2.13.0",
"google-auth-oauthlib >=0.7.0",
# Require 1.27.* because it has a fix for out-of-bounds timestamps. See:
# https://github.com/googleapis/python-bigquery/pull/209 and
# https://github.com/googleapis/python-bigquery-pandas/issues/365
# Exclude 2.4.* because it has a bug where waiting for the query can hang
# indefinitely. https://github.com/pydata/pandas-gbq/issues/343
"google-cloud-bigquery >=1.27.2,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=1.1.0,<3.0.0dev",
"google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=2.16.2,<3.0.0dev",
]
extras = {
"tqdm": "tqdm>=4.23.0",
Expand Down