Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: googleapis/python-bigquery-dataframes
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v0.7.0
Choose a base ref
...
head repository: googleapis/python-bigquery-dataframes
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v0.8.0
Choose a head ref
  • 3 commits
  • 9 files changed
  • 2 contributors

Commits on Oct 11, 2023

  1. fix: create session dataset for remote functions only when needed (#94)

    With this change BigFrames will not create a dataset upfront at the time of session creation, but instead leave it to the components which need the dataset to create it.
    
    Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
    - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
    - [ ] Ensure the tests and linter pass
    - [ ] Code coverage does not decrease (if any source code was changed)
    - [ ] Appropriate docs were updated (if necessary)
    
    Fixes #<issue_number_goes_here> 🦕
    shobsi authored Oct 11, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    1d385be View commit details

Commits on Oct 12, 2023

  1. feat: Support compression in to_parquet (#91)

    * feat: Support compression in to_parquet
    
    This changes the default behavior from no compression to snappy
    compression.
    
    * feat: Support compression in to_parquet
    
    BREAKING CHANGE: The default behavior of to_parquet is changing from no compression to 'snappy' compression.
    
    * fix exception message, add tests for not supported compression techniques
    shobsi authored Oct 12, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    a8c286f View commit details
  2. chore(main): release 0.8.0 (#99)

    🤖 I have created a release *beep* *boop*
    ---
    
    
    ## [0.8.0](https://togithub.com/googleapis/python-bigquery-dataframes/compare/v0.7.0...v0.8.0) (2023-10-12)
    
    
    ### ⚠ BREAKING CHANGES
    
    * The default behavior of `to_parquet` is changing from no compression to `'snappy'` compression.
    
    ### Features
    
    * Support compression in `to_parquet` ([a8c286f](https://togithub.com/googleapis/python-bigquery-dataframes/commit/a8c286f0995cc8cf2a4c44fb51855773ecf71f72))
    
    
    ### Bug Fixes
    
    * Create session dataset for remote functions only when needed ([#94](https://togithub.com/googleapis/python-bigquery-dataframes/issues/94)) ([1d385be](https://togithub.com/googleapis/python-bigquery-dataframes/commit/1d385be1c68342a66ecb9f28c5efc83c18d0e64c))
    
    ---
    This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
    release-please[bot] authored Oct 12, 2023

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    1ff2755 View commit details
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,22 @@

[1]: https://pypi.org/project/bigframes/#history

## [0.8.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.7.0...v0.8.0) (2023-10-12)


### ⚠ BREAKING CHANGES

* The default behavior of `to_parquet` is changing from no compression to `'snappy'` compression.

### Features

* Support compression in `to_parquet` ([a8c286f](https://github.com/googleapis/python-bigquery-dataframes/commit/a8c286f0995cc8cf2a4c44fb51855773ecf71f72))


### Bug Fixes

* Create session dataset for remote functions only when needed ([#94](https://github.com/googleapis/python-bigquery-dataframes/issues/94)) ([1d385be](https://github.com/googleapis/python-bigquery-dataframes/commit/1d385be1c68342a66ecb9f28c5efc83c18d0e64c))

## [0.7.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v0.6.0...v0.7.0) (2023-10-11)


18 changes: 16 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import typing
from typing import (
Callable,
Dict,
Iterable,
List,
Literal,
@@ -2270,7 +2271,13 @@ def to_numpy(

__array__ = to_numpy

def to_parquet(self, path: str, *, index: bool = True) -> None:
def to_parquet(
self,
path: str,
*,
compression: Optional[Literal["snappy", "gzip"]] = "snappy",
index: bool = True,
) -> None:
# TODO(swast): Can we support partition columns argument?
# TODO(chelsealin): Support local file paths.
# TODO(swast): Some warning that wildcard is recommended for large
@@ -2282,14 +2289,21 @@ def to_parquet(self, path: str, *, index: bool = True) -> None:
if "*" not in path:
raise NotImplementedError(ERROR_IO_REQUIRES_WILDCARD)

if compression not in {None, "snappy", "gzip"}:
raise ValueError("'{0}' is not valid for compression".format(compression))

export_options: Dict[str, Union[bool, str]] = {}
if compression:
export_options["compression"] = compression.upper()

result_table = self._run_io_query(
index=index, ordering_id=bigframes.core.io.IO_ORDERING_ID
)
export_data_statement = bigframes.core.io.create_export_data_statement(
f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}",
uri=path,
format="PARQUET",
export_options={},
export_options=export_options,
)
_, query_job = self._block.expr._session._start_query(export_data_statement)
self._set_internal_query_job(query_job)
39 changes: 28 additions & 11 deletions bigframes/remote_function.py
Original file line number Diff line number Diff line change
@@ -202,10 +202,22 @@ def create_bq_remote_function(
OPTIONS (
endpoint = "{endpoint}"
)"""

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO: Use session._start_query() so we get progress bar
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
query_job.result() # Wait for the job to complete.

logger.info(f"Created remote function {query_job.ddl_target_routine}")

def get_cloud_function_fully_qualified_parent(self):
@@ -465,17 +477,22 @@ def get_remote_function_specs(self, remote_function_name):
routines = self._bq_client.list_routines(
f"{self._gcp_project_id}.{self._bq_dataset}"
)
for routine in routines:
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
try:
for routine in routines:
if routine.reference.routine_id == remote_function_name:
# TODO(shobs): Use first class properties when they are available
# https://github.com/googleapis/python-bigquery/issues/1552
rf_options = routine._properties.get("remoteFunctionOptions")
if rf_options:
http_endpoint = rf_options.get("endpoint")
bq_connection = rf_options.get("connection")
if bq_connection:
bq_connection = os.path.basename(bq_connection)
break
except google.api_core.exceptions.NotFound:
# The dataset might not exist, in which case the http_endpoint doesn't, either.
# Note: list_routines doesn't make an API request until we iterate on the response object.
pass
return (http_endpoint, bq_connection)


7 changes: 1 addition & 6 deletions bigframes/session.py
Original file line number Diff line number Diff line change
@@ -381,17 +381,12 @@ def _create_and_bind_bq_session(self):
]
)

# Dataset for storing BQML models and remote functions, which don't yet
# Dataset for storing remote functions, which don't yet
# support proper session temporary storage yet
self._session_dataset = bigquery.Dataset(
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
)
self._session_dataset.location = self._location
self._session_dataset.default_table_expiration_ms = 24 * 60 * 60 * 1000

# TODO: handle case when the dataset does not exist and the user does
# not have permission to create one (bigquery.datasets.create IAM)
self.bqclient.create_dataset(self._session_dataset, exists_ok=True)

def close(self):
"""Terminated the BQ session, otherwises the session will be terminated automatically after
2 changes: 1 addition & 1 deletion bigframes/version.py
Original file line number Diff line number Diff line change
@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.7.0"
__version__ = "0.8.0"
23 changes: 18 additions & 5 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
@@ -134,15 +134,28 @@ def cleanup_datasets(bigquery_client: bigquery.Client) -> None:
)


def get_dataset_id(project_id: str):
"Get a fully qualified dataset id belonging to the given project."
dataset_id = f"{project_id}.{prefixer.create_prefix()}_dataset_id"
return dataset_id


@pytest.fixture(scope="session")
def dataset_id(bigquery_client: bigquery.Client):
"""Create (and cleanup) a temporary dataset."""
project_id = bigquery_client.project
dataset_id = f"{project_id}.{prefixer.create_prefix()}_dataset_id"
dataset = bigquery.Dataset(dataset_id)
bigquery_client.create_dataset(dataset)
dataset_id = get_dataset_id(bigquery_client.project)
bigquery_client.create_dataset(dataset_id)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture
def dataset_id_not_created(bigquery_client: bigquery.Client):
"""Return a temporary dataset object without creating it, and clean it up
after it has been used."""
dataset_id = get_dataset_id(bigquery_client.project)
yield dataset_id
bigquery_client.delete_dataset(dataset, delete_contents=True)
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture(scope="session")
43 changes: 43 additions & 0 deletions tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
@@ -408,6 +408,49 @@ def add_one(x):
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_explicit_dataset_not_created(
session, scalars_dfs, dataset_id_not_created, bq_cf_connection, functions_client
):
try:

@session.remote_function(
[int],
int,
dataset_id_not_created,
bq_cf_connection,
reuse=False,
)
def square(x):
return x * x

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal_ignore_ordering(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(session.bqclient, functions_client, square)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_udf_referring_outside_var(
session, scalars_dfs, dataset_id, bq_cf_connection, functions_client
90 changes: 84 additions & 6 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
@@ -793,7 +793,7 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
# Include wildcard so that multiple files can be written/read if > 1 GB.
# https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files
path = gcs_folder + "test_read_parquet_gcs*.parquet"
path = gcs_folder + test_read_parquet_gcs.__name__ + "*.parquet"
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# GEOGRAPHY not supported in parquet export.
df_in = df_in.drop(columns="geography_col")
@@ -823,6 +823,89 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder):
pd.testing.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
"compression",
[
None,
"gzip",
"snappy",
],
)
def test_read_parquet_gcs_compressed(
session: bigframes.Session, scalars_dfs, gcs_folder, compression
):
scalars_df, _ = scalars_dfs
# Include wildcard so that multiple files can be written/read if > 1 GB.
# https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files
path = (
gcs_folder
+ test_read_parquet_gcs_compressed.__name__
+ (f"_{compression}" if compression else "")
+ "*.parquet"
)
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# GEOGRAPHY not supported in parquet export.
df_in = df_in.drop(columns="geography_col")
# Make sure we can also serialize the order.
df_write = df_in.reset_index(drop=False)
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"
df_write.to_parquet(path, compression=compression, index=True)

df_out = (
session.read_parquet(path)
# Restore order.
.set_index(df_write.index.name).sort_index()
# Restore index.
.set_index(typing.cast(str, df_in.index.name))
)

# DATETIME gets loaded as TIMESTAMP in parquet. See:
# https://cloud.google.com/bigquery/docs/exporting-data#parquet_export_details
df_out = df_out.assign(
datetime_col=df_out["datetime_col"].astype("timestamp[us][pyarrow]")
)

# Make sure we actually have at least some values before comparing.
assert df_out.size != 0
pd_df_in = df_in.to_pandas()
pd_df_out = df_out.to_pandas()
pd.testing.assert_frame_equal(pd_df_in, pd_df_out)


@pytest.mark.parametrize(
"compression",
[
"brotli",
"lz4",
"zstd",
"unknown",
],
)
def test_read_parquet_gcs_compression_not_supported(
session: bigframes.Session, scalars_dfs, gcs_folder, compression
):
scalars_df, _ = scalars_dfs
# Include wildcard so that multiple files can be written/read if > 1 GB.
# https://cloud.google.com/bigquery/docs/exporting-data#exporting_data_into_one_or_more_files
path = (
gcs_folder
+ test_read_parquet_gcs_compression_not_supported.__name__
+ (f"_{compression}" if compression else "")
+ "*.parquet"
)
df_in: bigframes.dataframe.DataFrame = scalars_df.copy()
# GEOGRAPHY not supported in parquet export.
df_in = df_in.drop(columns="geography_col")
# Make sure we can also serialize the order.
df_write = df_in.reset_index(drop=False)
df_write.index.name = f"ordering_id_{random.randrange(1_000_000)}"

with pytest.raises(
ValueError, match=f"'{compression}' is not valid for compression"
):
df_write.to_parquet(path, compression=compression, index=True)


def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
path = gcs_folder + "test_read_json_gcs_bq_engine_w_index*.json"
@@ -894,11 +977,6 @@ def test_session_id(session):
# TODO(chelsealin): Verify the session id can be binded with a load job.


def test_session_dataset_exists_and_configured(session: bigframes.Session):
dataset = session.bqclient.get_dataset(session._session_dataset_id)
assert dataset.default_table_expiration_ms == 24 * 60 * 60 * 1000


@pytest.mark.flaky(retries=2)
def test_to_close_session():
session = bigframes.Session()
5 changes: 5 additions & 0 deletions third_party/bigframes_vendored/pandas/core/frame.py
Original file line number Diff line number Diff line change
@@ -129,6 +129,7 @@ def to_parquet(
self,
path: str,
*,
compression: Optional[Literal["snappy", "gzip"]] = "snappy",
index: bool = True,
) -> None:
"""Write a DataFrame to the binary Parquet format.
@@ -143,6 +144,10 @@ def to_parquet(
If the data size is more than 1GB, you must use a wildcard to export
the data into multiple files and the size of the files varies.
compression (str, default 'snappy'):
Name of the compression to use. Use ``None`` for no compression.
Supported options: ``'gzip'``, ``'snappy'``.
index (bool, default True):
If ``True``, include the dataframe's index(es) in the file output.
If ``False``, they will not be written to the file.