Skip to content

Commit 7389cd2

Browse files
authoredNov 7, 2022
feat: map "if_exists" value to LoadJobConfig.WriteDisposition (#583)
* feat: map "if_exists" value to LoadJobConfig.WriteDisposition This uses LoadJobConfig.WriteDisposition to replace if_exists='fail'/'replace'/'append' behavior in to_gbq() ### Dependency updates - Update the minimum version of `db-dtypes` to 1.0.4 - Update the minimum version of `google-api-core` to 2.10.2 - Update the minimum version of `google-auth` to 2.13.0 - Update the minimum version of `google-auth-oauthlib` to 0.7.0 - Update the minimum version of `google-cloud-bigquery` to 3.3.5 - Update the minimum version of `google-cloud-bigquery-storage` to 2.16.2 - Update the minimum version of `pandas` to 1.1.4 - Update the minimum version of `pydata-google-auth` to 1.4.0
1 parent afd6e21 commit 7389cd2

File tree

9 files changed

+112
-121
lines changed

9 files changed

+112
-121
lines changed
 

‎ci/requirements-3.7-0.24.2.conda

+9-6
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
codecov
22
coverage
3-
db-dtypes==0.3.1
3+
db-dtypes
44
fastavro
55
flake8
66
freezegun
7-
numpy==1.16.6
8-
google-cloud-bigquery==1.27.2
9-
google-cloud-bigquery-storage==1.1.0
10-
pyarrow==3.0.0
7+
numpy
8+
google-api-core
9+
google-auth
10+
google-cloud-bigquery
11+
google-cloud-bigquery-storage
12+
pyarrow
1113
pydata-google-auth
1214
pytest
1315
pytest-cov
14-
tqdm==4.23.0
16+
requests-oauthlib
17+
tqdm

‎pandas_gbq/gbq.py

+41-58
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
if typing.TYPE_CHECKING: # pragma: NO COVER
2121
import pandas
2222

23-
from pandas_gbq.exceptions import (
24-
AccessDenied,
25-
GenericGBQException,
26-
)
23+
from pandas_gbq.exceptions import AccessDenied, GenericGBQException
2724
from pandas_gbq.features import FEATURES
2825
import pandas_gbq.schema
2926
import pandas_gbq.timestamp
@@ -116,20 +113,12 @@ class InvalidSchema(ValueError):
116113
table in BigQuery.
117114
"""
118115

119-
def __init__(
120-
self, message: str, local_schema: Dict[str, Any], remote_schema: Dict[str, Any]
121-
):
122-
super().__init__(message)
123-
self._local_schema = local_schema
124-
self._remote_schema = remote_schema
125-
126-
@property
127-
def local_schema(self) -> Dict[str, Any]:
128-
return self._local_schema
116+
def __init__(self, message: str):
117+
self._message = message
129118

130119
@property
131-
def remote_schema(self) -> Dict[str, Any]:
132-
return self._remote_schema
120+
def message(self) -> str:
121+
return self._message
133122

134123

135124
class NotFoundException(ValueError):
@@ -155,7 +144,12 @@ class TableCreationError(ValueError):
155144
Raised when the create table method fails
156145
"""
157146

158-
pass
147+
def __init__(self, message: str):
148+
self._message = message
149+
150+
@property
151+
def message(self) -> str:
152+
return self._message
159153

160154

161155
class Context(object):
@@ -382,8 +376,14 @@ def process_http_error(ex):
382376

383377
if "cancelled" in ex.message:
384378
raise QueryTimeout("Reason: {0}".format(ex))
385-
386-
raise GenericGBQException("Reason: {0}".format(ex))
379+
elif "Provided Schema does not match" in ex.message:
380+
error_message = ex.errors[0]["message"]
381+
raise InvalidSchema(f"Reason: {error_message}")
382+
elif "Already Exists: Table" in ex.message:
383+
error_message = ex.errors[0]["message"]
384+
raise TableCreationError(f"Reason: {error_message}")
385+
else:
386+
raise GenericGBQException("Reason: {0}".format(ex))
387387

388388
def download_table(
389389
self,
@@ -577,6 +577,7 @@ def load_data(
577577
self,
578578
dataframe,
579579
destination_table_ref,
580+
write_disposition,
580581
chunksize=None,
581582
schema=None,
582583
progress_bar=True,
@@ -596,6 +597,7 @@ def load_data(
596597
schema=schema,
597598
location=self.location,
598599
api_method=api_method,
600+
write_disposition=write_disposition,
599601
billing_project=billing_project,
600602
)
601603
if progress_bar and tqdm:
@@ -609,11 +611,6 @@ def load_data(
609611
except self.http_error as ex:
610612
self.process_http_error(ex)
611613

612-
def delete_and_recreate_table(self, project_id, dataset_id, table_id, table_schema):
613-
table = _Table(project_id, dataset_id, credentials=self.credentials)
614-
table.delete(table_id)
615-
table.create(table_id, table_schema)
616-
617614

618615
def _bqschema_to_nullsafe_dtypes(schema_fields):
619616
"""Specify explicit dtypes based on BigQuery schema.
@@ -975,11 +972,9 @@ def to_gbq(
975972
):
976973
"""Write a DataFrame to a Google BigQuery table.
977974
978-
The main method a user calls to export pandas DataFrame contents to
979-
Google BigQuery table.
975+
The main method a user calls to export pandas DataFrame contents to Google BigQuery table.
980976
981-
This method uses the Google Cloud client library to make requests to
982-
Google BigQuery, documented `here
977+
This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here
983978
<https://googleapis.dev/python/bigquery/latest/index.html>`__.
984979
985980
See the :ref:`How to authenticate with Google BigQuery <authentication>`
@@ -1114,15 +1109,21 @@ def to_gbq(
11141109
stacklevel=2,
11151110
)
11161111

1117-
if if_exists not in ("fail", "replace", "append"):
1118-
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
1119-
11201112
if "." not in destination_table:
11211113
raise NotFoundException(
11221114
"Invalid Table Name. Should be of the form 'datasetId.tableId' or "
11231115
"'projectId.datasetId.tableId'"
11241116
)
11251117

1118+
if if_exists not in ("fail", "replace", "append"):
1119+
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))
1120+
1121+
if_exists_list = ["fail", "replace", "append"]
1122+
dispositions = ["WRITE_EMPTY", "WRITE_TRUNCATE", "WRITE_APPEND"]
1123+
dispositions_dict = dict(zip(if_exists_list, dispositions))
1124+
1125+
write_disposition = dispositions_dict[if_exists]
1126+
11261127
connector = GbqConnector(
11271128
project_id,
11281129
reauth=reauth,
@@ -1142,17 +1143,20 @@ def to_gbq(
11421143
table_id = destination_table_ref.table_id
11431144

11441145
default_schema = _generate_bq_schema(dataframe)
1146+
# If table_schema isn't provided, we'll create one for you
11451147
if not table_schema:
11461148
table_schema = default_schema
1149+
# It table_schema is provided, we'll update the default_schema to the provided table_schema
11471150
else:
11481151
table_schema = pandas_gbq.schema.update_schema(
11491152
default_schema, dict(fields=table_schema)
11501153
)
11511154

1152-
# If table exists, check if_exists parameter
11531155
try:
1156+
# Try to get the table
11541157
table = bqclient.get_table(destination_table_ref)
11551158
except google_exceptions.NotFound:
1159+
# If the table doesn't already exist, create it
11561160
table_connector = _Table(
11571161
project_id_table,
11581162
dataset_id,
@@ -1161,34 +1165,12 @@ def to_gbq(
11611165
)
11621166
table_connector.create(table_id, table_schema)
11631167
else:
1168+
# Convert original schema (the schema that already exists) to pandas-gbq API format
11641169
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)
11651170

1166-
if if_exists == "fail":
1167-
raise TableCreationError(
1168-
"Could not create the table because it "
1169-
"already exists. "
1170-
"Change the if_exists parameter to "
1171-
"'append' or 'replace' data."
1172-
)
1173-
elif if_exists == "replace":
1174-
connector.delete_and_recreate_table(
1175-
project_id_table, dataset_id, table_id, table_schema
1176-
)
1177-
else:
1178-
if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema):
1179-
raise InvalidSchema(
1180-
"Please verify that the structure and "
1181-
"data types in the DataFrame match the "
1182-
"schema of the destination table.",
1183-
table_schema,
1184-
original_schema,
1185-
)
1186-
1187-
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
1188-
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
1189-
table_schema = pandas_gbq.schema.update_schema(
1190-
table_schema, original_schema
1191-
)
1171+
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
1172+
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
1173+
table_schema = pandas_gbq.schema.update_schema(table_schema, original_schema)
11921174

11931175
if dataframe.empty:
11941176
# Create the table (if needed), but don't try to run a load job with an
@@ -1198,6 +1180,7 @@ def to_gbq(
11981180
connector.load_data(
11991181
dataframe,
12001182
destination_table_ref,
1183+
write_disposition=write_disposition,
12011184
chunksize=chunksize,
12021185
schema=table_schema,
12031186
progress_bar=progress_bar,

‎pandas_gbq/load.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -113,13 +113,13 @@ def load_parquet(
113113
client: bigquery.Client,
114114
dataframe: pandas.DataFrame,
115115
destination_table_ref: bigquery.TableReference,
116+
write_disposition: str,
116117
location: Optional[str],
117118
schema: Optional[Dict[str, Any]],
118119
billing_project: Optional[str] = None,
119120
):
120121
job_config = bigquery.LoadJobConfig()
121-
job_config.write_disposition = "WRITE_APPEND"
122-
job_config.create_disposition = "CREATE_NEVER"
122+
job_config.write_disposition = write_disposition
123123
job_config.source_format = "PARQUET"
124124

125125
if schema is not None:
@@ -143,13 +143,13 @@ def load_parquet(
143143

144144
def load_csv(
145145
dataframe: pandas.DataFrame,
146+
write_disposition: str,
146147
chunksize: Optional[int],
147148
bq_schema: Optional[List[bigquery.SchemaField]],
148149
load_chunk: Callable,
149150
):
150151
job_config = bigquery.LoadJobConfig()
151-
job_config.write_disposition = "WRITE_APPEND"
152-
job_config.create_disposition = "CREATE_NEVER"
152+
job_config.write_disposition = write_disposition
153153
job_config.source_format = "CSV"
154154
job_config.allow_quoted_newlines = True
155155

@@ -167,6 +167,7 @@ def load_csv_from_dataframe(
167167
client: bigquery.Client,
168168
dataframe: pandas.DataFrame,
169169
destination_table_ref: bigquery.TableReference,
170+
write_disposition: str,
170171
location: Optional[str],
171172
chunksize: Optional[int],
172173
schema: Optional[Dict[str, Any]],
@@ -187,13 +188,14 @@ def load_chunk(chunk, job_config):
187188
project=billing_project,
188189
).result()
189190

190-
return load_csv(dataframe, chunksize, bq_schema, load_chunk)
191+
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)
191192

192193

193194
def load_csv_from_file(
194195
client: bigquery.Client,
195196
dataframe: pandas.DataFrame,
196197
destination_table_ref: bigquery.TableReference,
198+
write_disposition: str,
197199
location: Optional[str],
198200
chunksize: Optional[int],
199201
schema: Optional[Dict[str, Any]],
@@ -223,7 +225,7 @@ def load_chunk(chunk, job_config):
223225
finally:
224226
chunk_buffer.close()
225227

226-
return load_csv(dataframe, chunksize, bq_schema, load_chunk)
228+
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)
227229

228230

229231
def load_chunks(
@@ -234,13 +236,15 @@ def load_chunks(
234236
schema=None,
235237
location=None,
236238
api_method="load_parquet",
239+
write_disposition="WRITE_EMPTY",
237240
billing_project: Optional[str] = None,
238241
):
239242
if api_method == "load_parquet":
240243
load_parquet(
241244
client,
242245
dataframe,
243246
destination_table_ref,
247+
write_disposition,
244248
location,
245249
schema,
246250
billing_project=billing_project,
@@ -253,6 +257,7 @@ def load_chunks(
253257
client,
254258
dataframe,
255259
destination_table_ref,
260+
write_disposition,
256261
location,
257262
chunksize,
258263
schema,
@@ -263,6 +268,7 @@ def load_chunks(
263268
client,
264269
dataframe,
265270
destination_table_ref,
271+
write_disposition,
266272
location,
267273
chunksize,
268274
schema,

‎requirements.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ pandas
22
google-auth
33
google-auth-oauthlib
44
google-cloud-bigquery
5-
tqdm
5+
tqdm

‎setup.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -23,24 +23,24 @@
2323
release_status = "Development Status :: 4 - Beta"
2424
dependencies = [
2525
"setuptools",
26-
"db-dtypes >=0.3.1,<2.0.0",
26+
"db-dtypes >=1.0.4,<2.0.0",
2727
"numpy >=1.16.6",
28-
"pandas >=0.24.2",
28+
"pandas >=1.1.4",
2929
"pyarrow >=3.0.0, <10.0dev",
30-
"pydata-google-auth",
30+
"pydata-google-auth >=1.4.0",
3131
# Note: google-api-core and google-auth are also included via transitive
3232
# dependency on google-cloud-bigquery, but this library also uses them
3333
# directly.
34-
"google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0",
35-
"google-auth >=1.25.0",
36-
"google-auth-oauthlib >=0.0.1",
34+
"google-api-core >= 2.10.2, <3.0.0dev",
35+
"google-auth >=2.13.0",
36+
"google-auth-oauthlib >=0.7.0",
3737
# Require 1.27.* because it has a fix for out-of-bounds timestamps. See:
3838
# https://github.com/googleapis/python-bigquery/pull/209 and
3939
# https://github.com/googleapis/python-bigquery-pandas/issues/365
4040
# Exclude 2.4.* because it has a bug where waiting for the query can hang
4141
# indefinitely. https://github.com/pydata/pandas-gbq/issues/343
42-
"google-cloud-bigquery >=1.27.2,<4.0.0dev,!=2.4.*",
43-
"google-cloud-bigquery-storage >=1.1.0,<3.0.0dev",
42+
"google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*",
43+
"google-cloud-bigquery-storage >=2.16.2,<3.0.0dev",
4444
]
4545
extras = {
4646
"tqdm": "tqdm>=4.23.0",

‎testing/constraints-3.7.txt

+8-8
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
#
66
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
77
# Then this file should have foo==1.14.0
8-
db-dtypes==0.3.1
9-
google-api-core==1.31.5
10-
google-auth==1.25.0
11-
google-auth-oauthlib==0.0.1
12-
google-cloud-bigquery==1.27.2
13-
google-cloud-bigquery-storage==1.1.0
8+
db-dtypes==1.0.4
9+
google-api-core==2.10.2
10+
google-auth==2.13.0
11+
google-auth-oauthlib==0.7.0
12+
google-cloud-bigquery==3.3.5
13+
google-cloud-bigquery-storage==2.16.2
1414
numpy==1.16.6
15-
pandas==0.24.2
15+
pandas==1.1.4
1616
pyarrow==3.0.0
17-
pydata-google-auth==0.1.2
17+
pydata-google-auth==1.4.0
1818
tqdm==4.23.0
1919
protobuf==3.19.5

‎tests/system/test_gbq.py

+10-2
Original file line numberDiff line numberDiff line change
@@ -673,9 +673,17 @@ def test_upload_data_if_table_exists_fail(self, project_id):
673673
test_id = "2"
674674
test_size = 10
675675
df = make_mixed_dataframe_v2(test_size)
676-
self.table.create(TABLE_ID + test_id, gbq._generate_bq_schema(df))
677676

678-
# Test the default value of if_exists is 'fail'
677+
# Initialize table with sample data
678+
gbq.to_gbq(
679+
df,
680+
self.destination_table + test_id,
681+
project_id,
682+
chunksize=10000,
683+
credentials=self.credentials,
684+
)
685+
686+
# Test the default value of if_exists == 'fail'
679687
with pytest.raises(gbq.TableCreationError):
680688
gbq.to_gbq(
681689
df,

‎tests/unit/test_load.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ def test_load_csv_from_dataframe_allows_client_to_generate_schema(mock_bigquery_
108108

109109
_ = list(
110110
load.load_csv_from_dataframe(
111-
mock_bigquery_client, df, destination, None, None, None
111+
mock_bigquery_client, df, destination, None, None, None, None
112112
)
113113
)
114114

@@ -151,7 +151,9 @@ def test_load_csv_from_file_generates_schema(mock_bigquery_client):
151151
)
152152

153153
_ = list(
154-
load.load_csv_from_file(mock_bigquery_client, df, destination, None, None, None)
154+
load.load_csv_from_file(
155+
mock_bigquery_client, df, destination, None, None, None, None
156+
)
155157
)
156158

157159
mock_load = mock_bigquery_client.load_table_from_file
@@ -222,7 +224,7 @@ def test_load_chunks_omits_policy_tags(
222224

223225
def test_load_chunks_with_invalid_api_method():
224226
with pytest.raises(ValueError, match="Got unexpected api_method:"):
225-
load.load_chunks(None, None, None, api_method="not_a_thing")
227+
load.load_chunks(None, None, None, None, api_method="not_a_thing")
226228

227229

228230
def test_load_parquet_allows_client_to_generate_schema(mock_bigquery_client):
@@ -233,7 +235,14 @@ def test_load_parquet_allows_client_to_generate_schema(mock_bigquery_client):
233235
"my-project.my_dataset.my_table"
234236
)
235237

236-
load.load_parquet(mock_bigquery_client, df, destination, None, None)
238+
load.load_parquet(
239+
mock_bigquery_client,
240+
df,
241+
destination,
242+
None,
243+
None,
244+
None,
245+
)
237246

238247
mock_load = mock_bigquery_client.load_table_from_dataframe
239248
assert mock_load.called
@@ -255,7 +264,7 @@ def test_load_parquet_with_bad_conversion(mock_bigquery_client):
255264
)
256265

257266
with pytest.raises(exceptions.ConversionError):
258-
load.load_parquet(mock_bigquery_client, df, destination, None, None)
267+
load.load_parquet(mock_bigquery_client, df, destination, None, None, None)
259268

260269

261270
@pytest.mark.parametrize(

‎tests/unit/test_to_gbq.py

+9-27
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,11 @@ def test_to_gbq_with_if_exists_append_mismatch(mock_bigquery_client):
9494
"myproj.my_dataset.my_table",
9595
schema=(SchemaField("col_a", "INTEGER"), SchemaField("col_b", "STRING")),
9696
)
97-
with pytest.raises(gbq.InvalidSchema) as exception_block:
97+
mock_bigquery_client.side_effect = gbq.InvalidSchema(
98+
message=r"Provided Schema does not match Table *"
99+
)
100+
101+
with pytest.raises((gbq.InvalidSchema)) as exception_block:
98102
gbq.to_gbq(
99103
DataFrame({"col_a": [0.25, 1.5, -1.0]}),
100104
"my_dataset.my_table",
@@ -103,16 +107,10 @@ def test_to_gbq_with_if_exists_append_mismatch(mock_bigquery_client):
103107
)
104108

105109
exc = exception_block.value
106-
assert exc.remote_schema == {
107-
"fields": [
108-
{"name": "col_a", "type": "INTEGER", "mode": "NULLABLE"},
109-
{"name": "col_b", "type": "STRING", "mode": "NULLABLE"},
110-
]
111-
}
112-
assert exc.local_schema == {"fields": [{"name": "col_a", "type": "FLOAT"}]}
110+
assert exc.message == r"Provided Schema does not match Table *"
113111

114112

115-
def test_to_gbq_with_if_exists_replace(mock_bigquery_client):
113+
def test_to_gbq_with_if_exists_replace(mock_bigquery_client, expected_load_method):
116114
mock_bigquery_client.get_table.side_effect = (
117115
# Initial check
118116
google.cloud.bigquery.Table("myproj.my_dataset.my_table"),
@@ -125,10 +123,7 @@ def test_to_gbq_with_if_exists_replace(mock_bigquery_client):
125123
project_id="myproj",
126124
if_exists="replace",
127125
)
128-
# TODO: We can avoid these API calls by using write disposition in the load
129-
# job. See: https://github.com/googleapis/python-bigquery-pandas/issues/118
130-
assert mock_bigquery_client.delete_table.called
131-
assert mock_bigquery_client.create_table.called
126+
expected_load_method.assert_called_once()
132127

133128

134129
def test_to_gbq_with_if_exists_replace_cross_project(
@@ -146,20 +141,7 @@ def test_to_gbq_with_if_exists_replace_cross_project(
146141
project_id="billing-project",
147142
if_exists="replace",
148143
)
149-
# TODO: We can avoid these API calls by using write disposition in the load
150-
# job. See: https://github.com/googleapis/python-bigquery-pandas/issues/118
151-
assert mock_bigquery_client.delete_table.called
152-
args, _ = mock_bigquery_client.delete_table.call_args
153-
table_delete: google.cloud.bigquery.TableReference = args[0]
154-
assert table_delete.project == "data-project"
155-
assert table_delete.dataset_id == "my_dataset"
156-
assert table_delete.table_id == "my_table"
157-
assert mock_bigquery_client.create_table.called
158-
args, _ = mock_bigquery_client.create_table.call_args
159-
table_create: google.cloud.bigquery.TableReference = args[0]
160-
assert table_create.project == "data-project"
161-
assert table_create.dataset_id == "my_dataset"
162-
assert table_create.table_id == "my_table"
144+
expected_load_method.assert_called_once()
163145

164146
# Check that billing project and destination table is set correctly.
165147
expected_load_method.assert_called_once()

0 commit comments

Comments
 (0)
Please sign in to comment.