Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: convert StreamedResultSet to Pandas Dataframe #226

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@
"google.api_core": ("https://googleapis.dev/python/google-api-core/latest/", None,),
"grpc": ("https://grpc.github.io/grpc/python/", None),
"proto-plus": ("https://proto-plus-python.readthedocs.io/en/latest/", None),
"pandas": ("https://pandas.pydata.org/pandas-docs/stable/", None),
}


Expand Down
67 changes: 67 additions & 0 deletions google/cloud/spanner_v1/_pandas_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helper module for working with the pandas library."""

try:
import pandas

pandas_import_error = None
except ImportError as err:
pandas = None
pandas_import_error = err


def check_pandas_import():
if pandas is None:
raise ImportError(
"The pandas module is required for this method.\n"
"Try running 'pip3 install pandas'"
) from pandas_import_error


def to_dataframe(result_set):
"""This functions converts the query results into pandas dataframe

larkee marked this conversation as resolved.
Show resolved Hide resolved
:type result_set: :class:`~google.cloud.spanner_v1.StreamedResultSet`
:param result_set: complete response data returned from a read/query

:rtype: pandas.DataFrame
:returns: Dataframe with the help of a mapping dictionary which maps every spanner datatype to a pandas compatible datatype.
"""
check_pandas_import()

# Download all results first, so that the fields property is populated.
data = list(result_set)

columns_dict = {}
column_list = []
for item in result_set.fields:
column_list.append(item.name)
columns_dict[item.name] = item.type_.code

# Creating dataframe using column headers and list of data rows
df = pandas.DataFrame(data, columns=column_list)

# Convert TIMESTAMP and DATE columns to appropriate type. The
# datetime64[ns, UTC] dtype is null-safe.
for k, v in columns_dict.items():
if v.name == "TIMESTAMP" or v.name == "DATE":
try:
df[k] = df[k].to_datetime()
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please use a more specific exception? Exception is way too broad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please add some comments about why we have these two different methods of converting to datetime. I don't understand it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also also, please add back in the "localize" logic to the TIMESTAMP type. TIMESTAMP is intended to represent UTC datetimes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also also also, we need tests for the datetime64[ns, UTC] data type. I want to make sure the data type is the one we expect in the test (not just that we got a row and it didn't error)

For example, in pandas-gbq we test that the dataframe we got matches one that we manually created and localized in the test.

https://github.com/pydata/pandas-gbq/blob/46c579ac21879b431c8568b49e68624f4a5ea25e/tests/unit/test_timestamp.py

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also also, please add back in the "localize" logic to the TIMESTAMP type. TIMESTAMP is intended to represent UTC datetimes.

added

df[k]=df[k].astype('datetime64[ns]')
df[k]=df[k].dt.tz_localize("UTC")

return df
12 changes: 10 additions & 2 deletions google/cloud/spanner_v1/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from google.cloud import exceptions
from google.cloud.spanner_v1 import PartialResultSet
from google.cloud.spanner_v1 import TypeCode
from google.cloud.spanner_v1 import _pandas_helpers
import six

# pylint: disable=ungrouped-imports
Expand Down Expand Up @@ -147,13 +148,20 @@ def __iter__(self):
except StopIteration:
return

def to_dataframe(self):
"""Creates a pandas DataFrame of all rows in the result set

:rtype: pandas.DataFrame
:returns: DataFrame created from the result set
"""
return _pandas_helpers.to_dataframe(self)

def one(self):
"""Return exactly one result, or raise an exception.

:raises: :exc:`NotFound`: If there are no results.
:raises: :exc:`ValueError`: If there are multiple results.
:raises: :exc:`RuntimeError`: If consumption has already occurred,
in whole or in part.
:raises: :exc:`RuntimeError`: If consumption has already occurred,in whole or in part.
"""
answer = self.one_or_none()
if answer is None:
Expand Down
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def default(session):
*session.posargs,
)

session.install("-e", ".[tracing]", "-c", constraints_path)
session.install("-e", ".[pandas,tracing]", "-c", constraints_path)

# Run py.test against the unit tests with OpenTelemetry.
session.run(
Expand Down Expand Up @@ -168,7 +168,7 @@ def system(session):
# Install all test dependencies, then install this package into the
# virtualenv's dist-packages.
session.install("mock", "pytest", "google-cloud-testutils", "-c", constraints_path)
session.install("-e", ".[tracing]", "-c", constraints_path)
session.install("-e", ".[pandas,tracing]", "-c", constraints_path)

# Run py.test against the system tests.
if system_test_exists:
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
"sqlparse >= 0.3.0",
]
extras = {
"libcst": "libcst >= 0.2.5",
"pandas": ["pandas >= 0.25.3"],
"tracing": [
"opentelemetry-api >= 0.11b0",
"opentelemetry-sdk >= 0.11b0",
"opentelemetry-instrumentation >= 0.11b0",
],
"libcst": "libcst >= 0.2.5",
}


Expand Down
1 change: 1 addition & 0 deletions testing/constraints-3.6.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ google-api-core==1.22.2
google-cloud-core==1.4.1
grpc-google-iam-v1==0.12.3
libcst==0.2.5
pandas==0.25.3
proto-plus==1.13.0
sqlparse==0.3.0
opentelemetry-api==0.11b0
Expand Down
88 changes: 88 additions & 0 deletions tests/system/test_system_pandasdf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.cloud import spanner_v1
from google.auth.credentials import AnonymousCredentials
import pytest

# for referrence
TABLE_NAME = "testTable"
COLUMNS = ["id", "name"]
VALUES = [[1, "Alice"], [2, "Bob"]]


TABLE_NAME_1 = "Functional_Alltypes"
COLUMNS_1 = ["id", "bool_col", "date", "float_col", "string_col", "timestamp_col"]
VALUES_1 = [
[1, True, "2016-02-09", 2.2, "David", "2002-02-10T15:30:00.45Z"],
[2, False, "2016-10-10", 2.5, "Ryan", "2009-02-12T10:06:00.45Z"],
[10, True, "2019-01-06", None, None, None],
[12, True, "2018-02-02", 2.6, None, None],
[None, None, None, None, None, None],
]


@pytest.fixture
def snapshot_obj():
try:
spanner_client = spanner_v1.Client(
project="test-project",
client_options={"api_endpoint": "0.0.0.0:9010"},
credentials=AnonymousCredentials(),
)
instance_id = "test-instance"
instance = spanner_client.instance(instance_id)
database_id = "test-database"
database = instance.database(database_id)
with database.snapshot() as snapshot:
return snapshot

except:
pytest.skip("Cloud Spanner Emulator configuration is incorrect")


@pytest.mark.parametrize(("limit"), [(0), (1), (2)])
def test_df(limit, snapshot_obj):
results = snapshot_obj.execute_sql(
"Select * from testTable limit {limit}".format(limit=limit)
)
df = results.to_dataframe()
assert len(df) == limit


@pytest.mark.parametrize(("value"), [2])
def test_rows_with_no_null_values(value, snapshot_obj):
results = snapshot_obj.execute_sql(
"Select * from Functional_Alltypes where id IS NOT NULL AND bool_col IS NOT NULL AND date IS NOT NULL and float_col IS NOT NULL and string_col IS NOT NULL and timestamp_col IS NOT NULL "
)
df = results.to_dataframe()
assert len(df) == value


@pytest.mark.parametrize(("value"), [2])
def test_rows_with_one_or_more_null_values(value, snapshot_obj):
results = snapshot_obj.execute_sql(
"Select * from Functional_Alltypes where id IS NOT NULL AND string_col IS NULL AND timestamp_col IS NULL "
)
df = results.to_dataframe()
assert len(df) == value


@pytest.mark.parametrize(("value"), [1])
def test_rows_with_all_null_values(value, snapshot_obj):
results = snapshot_obj.execute_sql(
"Select * from Functional_Alltypes where id IS NULL AND bool_col IS NULL AND date IS NULL and float_col IS NULL and string_col IS NULL and timestamp_col IS NULL "
)
df = results.to_dataframe()
assert len(df) == value
121 changes: 121 additions & 0 deletions tests/unit/test__pandas_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import unittest


class TestPandasDataFrame(unittest.TestCase):
def _getTargetClass(self):
from google.cloud.spanner_v1.streamed import StreamedResultSet

return StreamedResultSet

def _make_one(self, *args, **kwargs):
return self._getTargetClass()(*args, **kwargs)

@staticmethod
def _make_scalar_field(name, type_):
from google.cloud.spanner_v1 import StructType
from google.cloud.spanner_v1 import Type

return StructType.Field(name=name, type_=Type(code=type_))

@staticmethod
def _make_value(value):
from google.cloud.spanner_v1._helpers import _make_value_pb

return _make_value_pb(value)

@staticmethod
def _make_result_set_metadata(fields=(), transaction_id=None):
from google.cloud.spanner_v1 import ResultSetMetadata
from google.cloud.spanner_v1 import StructType

metadata = ResultSetMetadata(row_type=StructType(fields=[]))
for field in fields:
metadata.row_type.fields.append(field)
if transaction_id is not None:
metadata.transaction.id = transaction_id
return metadata

@staticmethod
def _make_result_set_stats(query_plan=None, **kw):
from google.cloud.spanner_v1 import ResultSetStats
from google.protobuf.struct_pb2 import Struct
from google.cloud.spanner_v1._helpers import _make_value_pb

query_stats = Struct(
fields={key: _make_value_pb(value) for key, value in kw.items()}
)
return ResultSetStats(query_plan=query_plan, query_stats=query_stats)

def test_multiple_rows(self):
from google.cloud.spanner_v1 import TypeCode

iterator = _MockCancellableIterator()
streamed = self._make_one(iterator)
FIELDS = [
self._make_scalar_field("Name", TypeCode.STRING),
self._make_scalar_field("Age", TypeCode.INT64),
]
metadata = streamed._metadata = self._make_result_set_metadata(FIELDS)
stats = streamed._stats = self._make_result_set_stats()
streamed._rows[:] = [["Alice", 1], ["Bob", 2], ["Adam", 3]]
df_obj = streamed.to_dataframe()
assert len(df_obj) == 3

def test_single_rows(self):
from google.cloud.spanner_v1 import TypeCode

iterator = _MockCancellableIterator()
streamed = self._make_one(iterator)
FIELDS = [
self._make_scalar_field("Name", TypeCode.STRING),
self._make_scalar_field("Age", TypeCode.INT64),
]
metadata = streamed._metadata = self._make_result_set_metadata(FIELDS)
stats = streamed._stats = self._make_result_set_stats()
streamed._rows[:] = [["Alice", 1]]
df_obj = streamed.to_dataframe()
assert len(df_obj) == 1

def test_no_rows(self):
from google.cloud.spanner_v1 import TypeCode

iterator = _MockCancellableIterator()
streamed = self._make_one(iterator)
FIELDS = [
self._make_scalar_field("Name", TypeCode.STRING),
self._make_scalar_field("Age", TypeCode.INT64),
]
metadata = streamed._metadata = self._make_result_set_metadata(FIELDS)
stats = streamed._stats = self._make_result_set_stats()
streamed._rows[:] = []
df_obj = streamed.to_dataframe()
assert len(df_obj) == 0


class _MockCancellableIterator(object):

cancel_calls = 0

def __init__(self, *values):
self.iter_values = iter(values)

def next(self):
return next(self.iter_values)

def __next__(self): # pragma: NO COVER Py3k
return self.next()