From 7389cd2a363ebf403b66905ca845ca842a754922 Mon Sep 17 00:00:00 2001 From: aribray <45905583+aribray@users.noreply.github.com> Date: Mon, 7 Nov 2022 11:36:12 -0600 Subject: [PATCH] feat: map "if_exists" value to LoadJobConfig.WriteDisposition (#583) * feat: map "if_exists" value to LoadJobConfig.WriteDisposition This uses LoadJobConfig.WriteDisposition to replace if_exists='fail'/'replace'/'append' behavior in to_gbq() ### Dependency updates - Update the minimum version of `db-dtypes` to 1.0.4 - Update the minimum version of `google-api-core` to 2.10.2 - Update the minimum version of `google-auth` to 2.13.0 - Update the minimum version of `google-auth-oauthlib` to 0.7.0 - Update the minimum version of `google-cloud-bigquery` to 3.3.5 - Update the minimum version of `google-cloud-bigquery-storage` to 2.16.2 - Update the minimum version of `pandas` to 1.1.4 - Update the minimum version of `pydata-google-auth` to 1.4.0 --- ci/requirements-3.7-0.24.2.conda | 15 +++-- pandas_gbq/gbq.py | 99 +++++++++++++------------------- pandas_gbq/load.py | 18 ++++-- requirements.txt | 2 +- setup.py | 16 +++--- testing/constraints-3.7.txt | 16 +++--- tests/system/test_gbq.py | 12 +++- tests/unit/test_load.py | 19 ++++-- tests/unit/test_to_gbq.py | 36 +++--------- 9 files changed, 112 insertions(+), 121 deletions(-) diff --git a/ci/requirements-3.7-0.24.2.conda b/ci/requirements-3.7-0.24.2.conda index 2facfb2c..2d61383e 100644 --- a/ci/requirements-3.7-0.24.2.conda +++ b/ci/requirements-3.7-0.24.2.conda @@ -1,14 +1,17 @@ codecov coverage -db-dtypes==0.3.1 +db-dtypes fastavro flake8 freezegun -numpy==1.16.6 -google-cloud-bigquery==1.27.2 -google-cloud-bigquery-storage==1.1.0 -pyarrow==3.0.0 +numpy +google-api-core +google-auth +google-cloud-bigquery +google-cloud-bigquery-storage +pyarrow pydata-google-auth pytest pytest-cov -tqdm==4.23.0 +requests-oauthlib +tqdm diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 56d6fd70..82099998 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -20,10 +20,7 @@ if typing.TYPE_CHECKING: # pragma: NO COVER import pandas -from pandas_gbq.exceptions import ( - AccessDenied, - GenericGBQException, -) +from pandas_gbq.exceptions import AccessDenied, GenericGBQException from pandas_gbq.features import FEATURES import pandas_gbq.schema import pandas_gbq.timestamp @@ -116,20 +113,12 @@ class InvalidSchema(ValueError): table in BigQuery. """ - def __init__( - self, message: str, local_schema: Dict[str, Any], remote_schema: Dict[str, Any] - ): - super().__init__(message) - self._local_schema = local_schema - self._remote_schema = remote_schema - - @property - def local_schema(self) -> Dict[str, Any]: - return self._local_schema + def __init__(self, message: str): + self._message = message @property - def remote_schema(self) -> Dict[str, Any]: - return self._remote_schema + def message(self) -> str: + return self._message class NotFoundException(ValueError): @@ -155,7 +144,12 @@ class TableCreationError(ValueError): Raised when the create table method fails """ - pass + def __init__(self, message: str): + self._message = message + + @property + def message(self) -> str: + return self._message class Context(object): @@ -382,8 +376,14 @@ def process_http_error(ex): if "cancelled" in ex.message: raise QueryTimeout("Reason: {0}".format(ex)) - - raise GenericGBQException("Reason: {0}".format(ex)) + elif "Provided Schema does not match" in ex.message: + error_message = ex.errors[0]["message"] + raise InvalidSchema(f"Reason: {error_message}") + elif "Already Exists: Table" in ex.message: + error_message = ex.errors[0]["message"] + raise TableCreationError(f"Reason: {error_message}") + else: + raise GenericGBQException("Reason: {0}".format(ex)) def download_table( self, @@ -577,6 +577,7 @@ def load_data( self, dataframe, destination_table_ref, + write_disposition, chunksize=None, schema=None, progress_bar=True, @@ -596,6 +597,7 @@ def load_data( schema=schema, location=self.location, api_method=api_method, + write_disposition=write_disposition, billing_project=billing_project, ) if progress_bar and tqdm: @@ -609,11 +611,6 @@ def load_data( except self.http_error as ex: self.process_http_error(ex) - def delete_and_recreate_table(self, project_id, dataset_id, table_id, table_schema): - table = _Table(project_id, dataset_id, credentials=self.credentials) - table.delete(table_id) - table.create(table_id, table_schema) - def _bqschema_to_nullsafe_dtypes(schema_fields): """Specify explicit dtypes based on BigQuery schema. @@ -975,11 +972,9 @@ def to_gbq( ): """Write a DataFrame to a Google BigQuery table. - The main method a user calls to export pandas DataFrame contents to - Google BigQuery table. + The main method a user calls to export pandas DataFrame contents to Google BigQuery table. - This method uses the Google Cloud client library to make requests to - Google BigQuery, documented `here + This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here `__. See the :ref:`How to authenticate with Google BigQuery ` @@ -1114,15 +1109,21 @@ def to_gbq( stacklevel=2, ) - if if_exists not in ("fail", "replace", "append"): - raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) - if "." not in destination_table: raise NotFoundException( "Invalid Table Name. Should be of the form 'datasetId.tableId' or " "'projectId.datasetId.tableId'" ) + if if_exists not in ("fail", "replace", "append"): + raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) + + if_exists_list = ["fail", "replace", "append"] + dispositions = ["WRITE_EMPTY", "WRITE_TRUNCATE", "WRITE_APPEND"] + dispositions_dict = dict(zip(if_exists_list, dispositions)) + + write_disposition = dispositions_dict[if_exists] + connector = GbqConnector( project_id, reauth=reauth, @@ -1142,17 +1143,20 @@ def to_gbq( table_id = destination_table_ref.table_id default_schema = _generate_bq_schema(dataframe) + # If table_schema isn't provided, we'll create one for you if not table_schema: table_schema = default_schema + # It table_schema is provided, we'll update the default_schema to the provided table_schema else: table_schema = pandas_gbq.schema.update_schema( default_schema, dict(fields=table_schema) ) - # If table exists, check if_exists parameter try: + # Try to get the table table = bqclient.get_table(destination_table_ref) except google_exceptions.NotFound: + # If the table doesn't already exist, create it table_connector = _Table( project_id_table, dataset_id, @@ -1161,34 +1165,12 @@ def to_gbq( ) table_connector.create(table_id, table_schema) else: + # Convert original schema (the schema that already exists) to pandas-gbq API format original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema) - if if_exists == "fail": - raise TableCreationError( - "Could not create the table because it " - "already exists. " - "Change the if_exists parameter to " - "'append' or 'replace' data." - ) - elif if_exists == "replace": - connector.delete_and_recreate_table( - project_id_table, dataset_id, table_id, table_schema - ) - else: - if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema): - raise InvalidSchema( - "Please verify that the structure and " - "data types in the DataFrame match the " - "schema of the destination table.", - table_schema, - original_schema, - ) - - # Update the local `table_schema` so mode (NULLABLE/REQUIRED) - # matches. See: https://github.com/pydata/pandas-gbq/issues/315 - table_schema = pandas_gbq.schema.update_schema( - table_schema, original_schema - ) + # Update the local `table_schema` so mode (NULLABLE/REQUIRED) + # matches. See: https://github.com/pydata/pandas-gbq/issues/315 + table_schema = pandas_gbq.schema.update_schema(table_schema, original_schema) if dataframe.empty: # Create the table (if needed), but don't try to run a load job with an @@ -1198,6 +1180,7 @@ def to_gbq( connector.load_data( dataframe, destination_table_ref, + write_disposition=write_disposition, chunksize=chunksize, schema=table_schema, progress_bar=progress_bar, diff --git a/pandas_gbq/load.py b/pandas_gbq/load.py index 10328069..bad99584 100644 --- a/pandas_gbq/load.py +++ b/pandas_gbq/load.py @@ -113,13 +113,13 @@ def load_parquet( client: bigquery.Client, dataframe: pandas.DataFrame, destination_table_ref: bigquery.TableReference, + write_disposition: str, location: Optional[str], schema: Optional[Dict[str, Any]], billing_project: Optional[str] = None, ): job_config = bigquery.LoadJobConfig() - job_config.write_disposition = "WRITE_APPEND" - job_config.create_disposition = "CREATE_NEVER" + job_config.write_disposition = write_disposition job_config.source_format = "PARQUET" if schema is not None: @@ -143,13 +143,13 @@ def load_parquet( def load_csv( dataframe: pandas.DataFrame, + write_disposition: str, chunksize: Optional[int], bq_schema: Optional[List[bigquery.SchemaField]], load_chunk: Callable, ): job_config = bigquery.LoadJobConfig() - job_config.write_disposition = "WRITE_APPEND" - job_config.create_disposition = "CREATE_NEVER" + job_config.write_disposition = write_disposition job_config.source_format = "CSV" job_config.allow_quoted_newlines = True @@ -167,6 +167,7 @@ def load_csv_from_dataframe( client: bigquery.Client, dataframe: pandas.DataFrame, destination_table_ref: bigquery.TableReference, + write_disposition: str, location: Optional[str], chunksize: Optional[int], schema: Optional[Dict[str, Any]], @@ -187,13 +188,14 @@ def load_chunk(chunk, job_config): project=billing_project, ).result() - return load_csv(dataframe, chunksize, bq_schema, load_chunk) + return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk) def load_csv_from_file( client: bigquery.Client, dataframe: pandas.DataFrame, destination_table_ref: bigquery.TableReference, + write_disposition: str, location: Optional[str], chunksize: Optional[int], schema: Optional[Dict[str, Any]], @@ -223,7 +225,7 @@ def load_chunk(chunk, job_config): finally: chunk_buffer.close() - return load_csv(dataframe, chunksize, bq_schema, load_chunk) + return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk) def load_chunks( @@ -234,6 +236,7 @@ def load_chunks( schema=None, location=None, api_method="load_parquet", + write_disposition="WRITE_EMPTY", billing_project: Optional[str] = None, ): if api_method == "load_parquet": @@ -241,6 +244,7 @@ def load_chunks( client, dataframe, destination_table_ref, + write_disposition, location, schema, billing_project=billing_project, @@ -253,6 +257,7 @@ def load_chunks( client, dataframe, destination_table_ref, + write_disposition, location, chunksize, schema, @@ -263,6 +268,7 @@ def load_chunks( client, dataframe, destination_table_ref, + write_disposition, location, chunksize, schema, diff --git a/requirements.txt b/requirements.txt index 7b3ede97..bf23435f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,4 +2,4 @@ pandas google-auth google-auth-oauthlib google-cloud-bigquery -tqdm +tqdm \ No newline at end of file diff --git a/setup.py b/setup.py index 0bf0c7b2..12c8443c 100644 --- a/setup.py +++ b/setup.py @@ -23,24 +23,24 @@ release_status = "Development Status :: 4 - Beta" dependencies = [ "setuptools", - "db-dtypes >=0.3.1,<2.0.0", + "db-dtypes >=1.0.4,<2.0.0", "numpy >=1.16.6", - "pandas >=0.24.2", + "pandas >=1.1.4", "pyarrow >=3.0.0, <10.0dev", - "pydata-google-auth", + "pydata-google-auth >=1.4.0", # Note: google-api-core and google-auth are also included via transitive # dependency on google-cloud-bigquery, but this library also uses them # directly. - "google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0", - "google-auth >=1.25.0", - "google-auth-oauthlib >=0.0.1", + "google-api-core >= 2.10.2, <3.0.0dev", + "google-auth >=2.13.0", + "google-auth-oauthlib >=0.7.0", # Require 1.27.* because it has a fix for out-of-bounds timestamps. See: # https://github.com/googleapis/python-bigquery/pull/209 and # https://github.com/googleapis/python-bigquery-pandas/issues/365 # Exclude 2.4.* because it has a bug where waiting for the query can hang # indefinitely. https://github.com/pydata/pandas-gbq/issues/343 - "google-cloud-bigquery >=1.27.2,<4.0.0dev,!=2.4.*", - "google-cloud-bigquery-storage >=1.1.0,<3.0.0dev", + "google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*", + "google-cloud-bigquery-storage >=2.16.2,<3.0.0dev", ] extras = { "tqdm": "tqdm>=4.23.0", diff --git a/testing/constraints-3.7.txt b/testing/constraints-3.7.txt index 1d9efed7..569287ad 100644 --- a/testing/constraints-3.7.txt +++ b/testing/constraints-3.7.txt @@ -5,15 +5,15 @@ # # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 -db-dtypes==0.3.1 -google-api-core==1.31.5 -google-auth==1.25.0 -google-auth-oauthlib==0.0.1 -google-cloud-bigquery==1.27.2 -google-cloud-bigquery-storage==1.1.0 +db-dtypes==1.0.4 +google-api-core==2.10.2 +google-auth==2.13.0 +google-auth-oauthlib==0.7.0 +google-cloud-bigquery==3.3.5 +google-cloud-bigquery-storage==2.16.2 numpy==1.16.6 -pandas==0.24.2 +pandas==1.1.4 pyarrow==3.0.0 -pydata-google-auth==0.1.2 +pydata-google-auth==1.4.0 tqdm==4.23.0 protobuf==3.19.5 diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index ee8190b5..5b90e8ba 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -673,9 +673,17 @@ def test_upload_data_if_table_exists_fail(self, project_id): test_id = "2" test_size = 10 df = make_mixed_dataframe_v2(test_size) - self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df)) - # Test the default value of if_exists is 'fail' + # Initialize table with sample data + gbq.to_gbq( + df, + self.destination_table + test_id, + project_id, + chunksize=10000, + credentials=self.credentials, + ) + + # Test the default value of if_exists == 'fail' with pytest.raises(gbq.TableCreationError): gbq.to_gbq( df, diff --git a/tests/unit/test_load.py b/tests/unit/test_load.py index f2209bda..1d99d9b4 100644 --- a/tests/unit/test_load.py +++ b/tests/unit/test_load.py @@ -108,7 +108,7 @@ def test_load_csv_from_dataframe_allows_client_to_generate_schema(mock_bigquery_ _ = list( load.load_csv_from_dataframe( - mock_bigquery_client, df, destination, None, None, None + mock_bigquery_client, df, destination, None, None, None, None ) ) @@ -151,7 +151,9 @@ def test_load_csv_from_file_generates_schema(mock_bigquery_client): ) _ = list( - load.load_csv_from_file(mock_bigquery_client, df, destination, None, None, None) + load.load_csv_from_file( + mock_bigquery_client, df, destination, None, None, None, None + ) ) mock_load = mock_bigquery_client.load_table_from_file @@ -222,7 +224,7 @@ def test_load_chunks_omits_policy_tags( def test_load_chunks_with_invalid_api_method(): with pytest.raises(ValueError, match="Got unexpected api_method:"): - load.load_chunks(None, None, None, api_method="not_a_thing") + load.load_chunks(None, None, None, None, api_method="not_a_thing") def test_load_parquet_allows_client_to_generate_schema(mock_bigquery_client): @@ -233,7 +235,14 @@ def test_load_parquet_allows_client_to_generate_schema(mock_bigquery_client): "my-project.my_dataset.my_table" ) - load.load_parquet(mock_bigquery_client, df, destination, None, None) + load.load_parquet( + mock_bigquery_client, + df, + destination, + None, + None, + None, + ) mock_load = mock_bigquery_client.load_table_from_dataframe assert mock_load.called @@ -255,7 +264,7 @@ def test_load_parquet_with_bad_conversion(mock_bigquery_client): ) with pytest.raises(exceptions.ConversionError): - load.load_parquet(mock_bigquery_client, df, destination, None, None) + load.load_parquet(mock_bigquery_client, df, destination, None, None, None) @pytest.mark.parametrize( diff --git a/tests/unit/test_to_gbq.py b/tests/unit/test_to_gbq.py index c8b419ed..4456df0e 100644 --- a/tests/unit/test_to_gbq.py +++ b/tests/unit/test_to_gbq.py @@ -94,7 +94,11 @@ def test_to_gbq_with_if_exists_append_mismatch(mock_bigquery_client): "myproj.my_dataset.my_table", schema=(SchemaField("col_a", "INTEGER"), SchemaField("col_b", "STRING")), ) - with pytest.raises(gbq.InvalidSchema) as exception_block: + mock_bigquery_client.side_effect = gbq.InvalidSchema( + message=r"Provided Schema does not match Table *" + ) + + with pytest.raises((gbq.InvalidSchema)) as exception_block: gbq.to_gbq( DataFrame({"col_a": [0.25, 1.5, -1.0]}), "my_dataset.my_table", @@ -103,16 +107,10 @@ def test_to_gbq_with_if_exists_append_mismatch(mock_bigquery_client): ) exc = exception_block.value - assert exc.remote_schema == { - "fields": [ - {"name": "col_a", "type": "INTEGER", "mode": "NULLABLE"}, - {"name": "col_b", "type": "STRING", "mode": "NULLABLE"}, - ] - } - assert exc.local_schema == {"fields": [{"name": "col_a", "type": "FLOAT"}]} + assert exc.message == r"Provided Schema does not match Table *" -def test_to_gbq_with_if_exists_replace(mock_bigquery_client): +def test_to_gbq_with_if_exists_replace(mock_bigquery_client, expected_load_method): mock_bigquery_client.get_table.side_effect = ( # Initial check google.cloud.bigquery.Table("myproj.my_dataset.my_table"), @@ -125,10 +123,7 @@ def test_to_gbq_with_if_exists_replace(mock_bigquery_client): project_id="myproj", if_exists="replace", ) - # TODO: We can avoid these API calls by using write disposition in the load - # job. See: https://github.com/googleapis/python-bigquery-pandas/issues/118 - assert mock_bigquery_client.delete_table.called - assert mock_bigquery_client.create_table.called + expected_load_method.assert_called_once() def test_to_gbq_with_if_exists_replace_cross_project( @@ -146,20 +141,7 @@ def test_to_gbq_with_if_exists_replace_cross_project( project_id="billing-project", if_exists="replace", ) - # TODO: We can avoid these API calls by using write disposition in the load - # job. See: https://github.com/googleapis/python-bigquery-pandas/issues/118 - assert mock_bigquery_client.delete_table.called - args, _ = mock_bigquery_client.delete_table.call_args - table_delete: google.cloud.bigquery.TableReference = args[0] - assert table_delete.project == "data-project" - assert table_delete.dataset_id == "my_dataset" - assert table_delete.table_id == "my_table" - assert mock_bigquery_client.create_table.called - args, _ = mock_bigquery_client.create_table.call_args - table_create: google.cloud.bigquery.TableReference = args[0] - assert table_create.project == "data-project" - assert table_create.dataset_id == "my_dataset" - assert table_create.table_id == "my_table" + expected_load_method.assert_called_once() # Check that billing project and destination table is set correctly. expected_load_method.assert_called_once()