Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: convert StreamedResultSet to Pandas Dataframe #226

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 76 additions & 0 deletions google/cloud/spanner_v1/_pandas_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import logging
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
import six

class pandas_df():
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved

def pd_dataframe(self):
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the response in a pandas dataframe of the StreamedResultSet object"""
try :
from pandas import DataFrame
except ImportError:
logging.error("Pandas module not found. It is needed for converting query result to Dataframe.\n Try running 'pip3 install pandas'")
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved

code_to_spanner_dtype_dict = {
1 : 'BOOL',
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
2 : 'INT64',
3 : 'FLOAT64',
4 : 'TIMESTAMP',
5 : 'DATE',
6 : 'STRING',
7 : 'BYTES',
8 : 'ARRAY',
10 : 'NUMERIC'
}
response = six.next(self._response_iterator)
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
if self._metadata is None: # first response
metadata = self._metadata = response.metadata

#Creating dictionary to store column name maping of spanner to pandas dataframe
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
columns_dict={}
try :
for item in metadata.row_type.fields :
columns_dict[item.name]=code_to_spanner_dtype_dict[item.type_.code]
except :
logging.warning("Not able to create spanner to pandas fields mapping")

larkee marked this conversation as resolved.
Show resolved Hide resolved
#Creating list of columns to be mapped with the data
column_list=[k for k,v in columns_dict.items()]

#Creating list of data values to be converted to dataframe
values = list(response.values)
if self._pending_chunk is not None:
values[0] = self._merge_chunk(values[0])
if response.chunked_value:
self._pending_chunk = values.pop()
self._merge_values(values)

width = len(column_list)

# list to store each row as a sub-list
data=[]
while len(values)/width > 0 :
data.append(values[:width])
values=values[width:]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

#Creating dataframe using column headers and list of data rows
df = DataFrame(data,columns=column_list)

#Mapping dictionary to map every spanner datatype to a pandas compatible datatype
mapping_dict={
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
'INT64':'int64',
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
'STRING':'object',
'BOOL':'bool',
'BYTES':'object',
'ARRAY':'object',
'DATE':'datetime64[ns, UTC]',
'FLOAT64':'float64',
'NUMERIC':'object',
'TIMESTAMP':'datetime64[ns, UTC]'
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
}
for k,v in columns_dict.items() :
try:
df[k]= df[k].astype(mapping_dict[v])
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
except KeyError:
print("Spanner Datatype not present in datatype mapping dictionary")
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved

return df
9 changes: 8 additions & 1 deletion google/cloud/spanner_v1/streamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
from google.cloud import exceptions
from google.cloud.spanner_v1 import TypeCode
import six
import logging
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved

# pylint: disable=ungrouped-imports
from google.cloud.spanner_v1._helpers import _parse_value
from google.cloud.spanner_v1._pandas_helpers import pandas_df

# pylint: enable=ungrouped-imports

Expand Down Expand Up @@ -143,6 +145,11 @@ def __iter__(self):
while iter_rows:
yield iter_rows.pop(0)

def to_dataframe(self):
"""Returns the response in a pandas dataframe of the StreamedResultSet object"""
Hardikr23 marked this conversation as resolved.
Show resolved Hide resolved
df = pandas_df.pd_dataframe(self)
return df

def one(self):
"""Return exactly one result, or raise an exception.

Expand Down Expand Up @@ -302,4 +309,4 @@ def _merge_struct(lhs, rhs, type_):
def _merge_by_type(lhs, rhs, type_):
"""Helper for '_merge_chunk'."""
merger = _MERGE_BY_TYPE[type_.code]
return merger(lhs, rhs, type_)
return merger(lhs, rhs, type_)